Source code for rtcbot.websocket

import asyncio
import logging
import json

import aiohttp
from aiohttp import web

from .base import SubscriptionProducerConsumer, SubscriptionClosed


[docs]class Websocket(SubscriptionProducerConsumer): """ Wraps an aiohttp websocket to have an API matching RTCBot. The websocket can be given either a URL to connect to:: ws = Websocket("http://localhost:8080/ws") msg = await ws.get() It can also be used in a server context to complete the connection:: @routes.get("/ws") async def websocketHandler(request): ws = Websocket(request) msg = await ws.get() Naturally, just like all other parts of rtcbot, you can also `subscribe` and `putSubscription` instead of manually calling `get` and `put_nowait`. """ _log = logging.getLogger("rtcbot.Websocket") def __init__(self, url_or_request, json=True, loop=None): super().__init__(asyncio.Queue, asyncio.Queue, logger=self._log) self._loop = loop if self._loop is None: self._loop = asyncio.get_event_loop() self._json = json self.ws = None self._client = None asyncio.ensure_future(self._wsSender(url_or_request)) async def _wsSender(self, url_or_request): try: if isinstance(url_or_request, str): # Connect to a remote websocket self._client = aiohttp.ClientSession(loop=self._loop) self.ws = await self._client.ws_connect(url_or_request) else: # Handle an incoming websocket as server software. self.ws = web.WebSocketResponse(autoclose=False) await self.ws.prepare(url_or_request) self._log.debug("Connection established") # Create a separate coroutine for sending messages asyncio.ensure_future(self._wsReceiver()) # And send out messages while not self._shouldClose: try: msg = await self._get() if self._json: msg = json.dumps(msg) self._log.debug("Sending message %s", msg) await self.ws.send_str(msg) except SubscriptionClosed: break self._log.debug("Stopping websocket sender") except Exception as e: self._setError(e) self._setReady(False) async def _wsReceiver(self): # Once this coroutine starts, fire the ready event self._setReady(True) async for msg in self.ws: self._log.debug(msg) if msg.type == aiohttp.WSMsgType.TEXT: data = msg.data self._log.debug("Received '%s'", data) if self._json: try: data = json.loads(data) except: self._log.debug("Failed to read json data %s", data) try: self._put_nowait(data) except: self._log.exception("Error in message handler") elif msg.type == aiohttp.WSMsgType.ERROR: self._setError(self.ws.exception()) break elif msg.type == aiohttp.WSMsgType.CLOSE: break if self._shouldClose: break self._log.debug("Stopping websocket receiver") await self._clientClose() async def _clientClose(self): if self.ws is not None and not self.ws.closed: await self.ws.close() if self._client is not None and not self._client.closed: await self._client.close() super().close() self._log.debug("Finished closing websocket")
[docs] def close(self): if self.ws is not None and not self.closed: if self._loop.is_running(): self._log.debug("Loop is running - close will return a future!") return asyncio.ensure_future(self._clientClose()) else: self._loop.run_until_complete(self._clientClose()) return None