from aiortc import (
RTCPeerConnection,
RTCSessionDescription,
RTCConfiguration,
RTCIceServer,
exceptions,
)
import asyncio
import logging
import json
from .base import (
SubscriptionProducerConsumer,
SubscriptionProducer,
SubscriptionClosed,
NoClosedSubscription,
)
from .tracks import VideoSender, AudioSender, AudioReceiver, VideoReceiver
from .subscriptions import MostRecentSubscription
[docs]class DataChannel(SubscriptionProducerConsumer):
"""
Represents a data channel. You can put_nowait messages into it,
and subscribe to messages coming from it.
"""
_log = logging.getLogger("rtcbot.RTCConnection.DataChannel")
def __init__(self, rtcDataChannel, json=True):
super().__init__(asyncio.Queue, asyncio.Queue, logger=self._log)
self._rtcDataChannel = rtcDataChannel
# Directly put messages
self._rtcDataChannel.on("message", self._put_preprocess)
self._json = json
self._log.debug("Ready State %s", self._rtcDataChannel.readyState)
if self._rtcDataChannel.readyState == "open":
# Make sure we pass messages forward
asyncio.ensure_future(self._messageSender())
else:
self._log.debug("Waiting for channel to open")
self._rtcDataChannel.on(
"open", lambda: asyncio.ensure_future(self._messageSender())
)
@self._rtcDataChannel.transport.transport.on("statechange")
async def stateChange():
state = self._rtcDataChannel.transport.transport.state
self._log.debug("Channel State: %s", state)
if state == "closed" or state == "failed":
self.close()
async def _messageSender(self):
self._log.debug("Channel open, ready to send")
self._setReady(True)
while not self._shouldClose:
try:
msg = await self._get()
if self._json:
msg = json.dumps(msg)
self._log.debug("Sending message %s", msg)
self._rtcDataChannel.send(msg)
except SubscriptionClosed:
pass
except exceptions.InvalidStateError:
self._close()
break
# The while loop should exit here
self._setReady(False)
self._log.debug("Stopping message sender")
def _put_preprocess(self, data):
self._log.debug("Received '%s'", data)
if self._json:
try:
data = json.loads(data)
except:
self._log.debug("Failed to read json data %s", data)
try:
self._put_nowait(data)
except:
self._log.exception("Error in message handler")
@property
def name(self):
return self._rtcDataChannel.label
[docs] def close(self):
# self._rtcDataChannel.close()
super().close()
[docs]class ConnectionVideoHandler(SubscriptionProducerConsumer):
"""
:Example:
Allows usage of RTCConnection as follows::
r = RTCConnection()
frameSubscription = r.video.subscribe()
r.video.putSubscription(frameSubscription)
It uses the first incoming video stream for subscribe(),
and creates a single outgoing video stream.
Subscribing to the tracks can be done
"""
_log = logging.getLogger("rtcbot.RTCConnection.ConnectionVideoHandler")
def __init__(self, rtc):
super().__init__(
directPutSubscriptionType=MostRecentSubscription,
defaultSubscriptionType=MostRecentSubscription,
logger=self._log,
)
self._senders = set()
self._receivers = set()
self._defaultSender = None
self._defaultReceiver = None
# The defaultSender subscribes to this
self._defaultSenderSubscription = NoClosedSubscription(self._get)
self._trackSubscriber = SubscriptionProducer(
logger=self._log.getChild("trackSubscriber")
)
self._rtc = rtc
self._offerToReceive = 0
[docs] def onTrack(self, callback=None):
"""
Callback that gets called each time a video track is received::
@r.video.onTrack
def onTrack(track):
print(track)
The callback actually works exactly as a subscribe(), so you can do::
subscription = r.video.onTrack()
await subscription.get()
Note that if you have more than one track, you will need to tell rtcbot how
many tracks to prepare to receive::
r.video.offerToReceive(2)
"""
self.offerToReceive()
return self._trackSubscriber.subscribe(callback)
[docs] def addTrack(self, frameSubscription=None, fps=None, canSkip=True):
"""
Allows to send multiple video tracks in a single connection.
Each call to putTrack *adds* the track to the connection.
For simple usage, where you only have a single video stream,
just use `putSubscription` - it automatically calls putTrack for you.
"""
self._log.debug("Adding video track to connection")
s = VideoSender(fps=fps, canSkip=True)
if frameSubscription is not None:
s.putSubscription(frameSubscription)
elif self._defaultSender is None:
s.putSubscription(self._defaultSenderSubscription)
if self._defaultSender is None:
self._defaultSender = s
self._defaultSender.onClose(self.close)
self._rtc.addTrack(s.videoStreamTrack)
self._senders.add(s)
return s
[docs] def putSubscription(self, subscription):
# We need to make sure that when we put:
# 1) there is an actual video track to put to!
# 2) the track is subscribed to the VideoHandler
super().putSubscription(subscription)
if self._defaultSender is None:
self.addTrack()
# Make sure that this subscription is active on the default track
self._defaultSender.putSubscription(self._defaultSenderSubscription)
def _onTrack(self, track):
"""
Internal raw track receiver
"""
self._log.debug("Received video track from connection")
track = VideoReceiver(track)
if self._defaultReceiver is None: # The default receiver track is the first one
self._defaultReceiver = track
self._defaultReceiver.subscribe(self._put_nowait)
self._defaultReceiver.onClose(self.close)
self._receivers.add(track)
self._trackSubscriber._put_nowait(track)
[docs] def close(self):
if not self.closed:
for t in self._senders:
t.close()
for t in self._receivers:
t.close()
self._trackSubscriber.close()
super().close()
[docs] def offerToReceive(self, num=1):
"""
Set the number of tracks that you can receive
"""
if self._offerToReceive < num:
self._offerToReceive = num
[docs] def subscribe(self, subscription=None):
self.offerToReceive()
return super().subscribe(subscription)
[docs]class ConnectionAudioHandler(SubscriptionProducerConsumer):
"""
Allows usage of RTCConnection as follows::
r = RTCConnection()
audioSubscription = r.audio.subscribe()
r.audio.putSubscription(audioSubscription)
It uses the first incoming audio stream for subscribe(),
and creates a single outgoing audio stream.
Subscribing to the tracks can be done
"""
_log = logging.getLogger("rtcbot.RTCConnection.ConnectionAudioHandler")
def __init__(self, rtc):
super().__init__(
directPutSubscriptionType=asyncio.Queue,
defaultSubscriptionType=asyncio.Queue,
logger=self._log,
)
self._senders = set()
self._receivers = set()
self._defaultSender = None
self._defaultReceiver = None
# The defaultSender subscribes to this
self._defaultSenderSubscription = NoClosedSubscription(self._get)
self._trackSubscriber = SubscriptionProducer(
logger=self._log.getChild("trackSubscriber")
)
self._rtc = rtc
self._offerToReceive = 0
[docs] def onTrack(self, callback=None):
"""
Callback that gets called each time a audio track is received::
@r.audio.onTrack
def onTrack(track):
print(track)
The callback actually works exactly as a subscribe(), so you can do::
subscription = r.audio.onTrack()
await subscription.get()
Note that if you have more than one track, you will need to tell rtcbot how
many tracks to prepare to receive::
r.audio.offerToReceive(2)
"""
self.offerToReceive()
return self._trackSubscriber.subscribe(callback)
[docs] def addTrack(self, subscription=None, sampleRate=48000, canSkip=True):
"""
Allows to send multiple audio tracks in a single connection.
Each call to putTrack *adds* the track to the connection.
For simple usage, where you only have a single audio stream,
just use `putSubscription` - it automatically calls putTrack for you.
"""
self._log.debug("Adding audio track to connection")
s = AudioSender(sampleRate=sampleRate, canSkip=True)
if subscription is not None:
s.putSubscription(subscription)
elif self._defaultSender is None:
s.putSubscription(self._defaultSenderSubscription)
if self._defaultSender is None:
self._defaultSender = s
self._defaultSender.onClose(self.close)
self._rtc.addTrack(s.audioStreamTrack)
self._senders.add(s)
return s
[docs] def putSubscription(self, subscription):
# We need to make sure that when we put:
# 1) there is an actual track to put to!
# 2) the track is subscribed to the handler
super().putSubscription(subscription)
if self._defaultSender is None:
self.addTrack()
# Make sure that this subscription is active on the default track
self._defaultSender.putSubscription(self._defaultSenderSubscription)
def _onTrack(self, track):
"""
Internal raw track receiver
"""
self._log.debug("Received audio track from connection")
track = AudioReceiver(track)
if self._defaultReceiver is None: # The default receiver track is the first one
self._defaultReceiver = track
self._defaultReceiver.subscribe(self._put_nowait)
self._defaultReceiver.onClose(self.close)
self._receivers.add(track)
self._trackSubscriber._put_nowait(track)
[docs] def close(self):
if not self.closed:
for t in self._senders:
t.close()
for t in self._receivers:
t.close()
self._trackSubscriber.close()
super().close()
[docs] def offerToReceive(self, num=1):
"""
Set the number of tracks that you can receive
"""
if self._offerToReceive < num:
self._offerToReceive = num
[docs] def subscribe(self, subscription=None):
self.offerToReceive()
return super().subscribe(subscription)
[docs]class RTCConnection(SubscriptionProducerConsumer):
_log = logging.getLogger("rtcbot.RTCConnection")
def __init__(
self,
defaultChannelOrdered=True,
loop=None,
rtcConfiguration=RTCConfiguration(
[RTCIceServer(urls="stun:stun.l.google.com:19302")]
),
):
super().__init__(
directPutSubscriptionType=asyncio.Queue,
defaultSubscriptionType=asyncio.Queue,
logger=self._log,
)
self._loop = loop
if self._loop is None:
self._loop = asyncio.get_event_loop()
self._dataChannels = {}
# These allow us to easily signal when the given events happen
self._dataChannelSubscriber = SubscriptionProducer(
logger=self._log.getChild("dataChannelSubscriber")
)
self._rtc = RTCPeerConnection(configuration=rtcConfiguration)
self._rtc.on("datachannel", self._onDatachannel)
self._rtc.on("iceconnectionstatechange", self._onIceConnectionStateChange)
self._rtc.on("track", self._onTrack)
self._hasRemoteDescription = False
self._defaultChannelOrdered = defaultChannelOrdered
self._videoHandler = ConnectionVideoHandler(self._rtc)
self._audioHandler = ConnectionAudioHandler(self._rtc)
# When the video/audio handlers get closed, close the entire connection
self._videoHandler.onClose(self.close)
self._audioHandler.onClose(self.close)
[docs] async def getLocalDescription(self, description=None):
"""
Gets the description to send on. Creates an initial description
if no remote description was passed, and creates a response if
a remote was given,
"""
if self._hasRemoteDescription or description is not None:
# This means that we received an offer - either the remote description
# was already set, or we passed in a description. In either case,
# instead of initializing a new connection, we prepare a response
if not self._hasRemoteDescription:
await self.setRemoteDescription(description)
self._log.debug("Creating response to connection offer")
try:
answer = await self._rtc.createAnswer()
except AttributeError:
self._log.exception(
"\n>>> Looks like the offer didn't include the necessary info to set up audio/video. See RTCConnection.video.offerToReceive(). <<<\n\n"
)
raise
await self._rtc.setLocalDescription(answer)
return {
"sdp": self._rtc.localDescription.sdp,
"type": self._rtc.localDescription.type,
}
# There was no remote description, which means that we are initializing the
# connection.
# Before starting init, we create a default data channel for the connection
self._log.debug("Setting up default data channel")
channel = DataChannel(
self._rtc.createDataChannel("default", ordered=self._defaultChannelOrdered)
)
# Subscribe the default channel directly to our own inputs and outputs.
# We have it listen to our own self._get, and write to our self._put_nowait
channel.putSubscription(NoClosedSubscription(self._get))
channel.subscribe(self._put_nowait)
channel.onReady(lambda: self._setReady(channel.ready))
channel.onClose(self.close)
self._dataChannels[channel.name] = channel
# Make sure we offer to receive video and audio if if isn't set up yet with
# all the receiving transceivers
if len(self.video._senders) < self.video._offerToReceive:
self._log.debug("Offering to receive video")
for i in range(self.video._offerToReceive - len(self.video._senders)):
self._rtc.addTransceiver("video", "recvonly")
if len(self.audio._senders) < self.audio._offerToReceive:
self._log.debug("Offering to receive audio")
for i in range(self.audio._offerToReceive - len(self.audio._senders)):
self._rtc.addTransceiver("audio", "recvonly")
self._log.debug("Creating new connection offer")
offer = await self._rtc.createOffer()
await self._rtc.setLocalDescription(offer)
return {
"sdp": self._rtc.localDescription.sdp,
"type": self._rtc.localDescription.type,
}
[docs] async def setRemoteDescription(self, description):
self._log.debug("Setting remote connection description")
await self._rtc.setRemoteDescription(RTCSessionDescription(**description))
self._hasRemoteDescription = True
async def _onIceConnectionStateChange(self):
self._log.debug("ICE state: %s", self._rtc.iceConnectionState)
if self._rtc.iceConnectionState == "failed":
self._setError(self._rtc.iceConnectionState)
await self.close()
def _onDatachannel(self, channel):
"""
When a data channel comes in, adds it to the data channels, and sets up its messaging and stuff.
"""
channel = DataChannel(channel)
self._log.debug("Got channel: %s", channel.name)
if channel.name == "default":
# Subscribe the default channel directly to our own inputs and outputs.
# We have it listen to our own self._get, and write to our self._put_nowait
channel.putSubscription(NoClosedSubscription(self._get))
channel.subscribe(self._put_nowait)
channel.onReady(lambda: self._setReady(channel.ready))
channel.onClose(self.close)
else:
self._dataChannelSubscriber.put_nowait(channel)
self._dataChannels[channel.name] = channel
def _onTrack(self, track):
self._log.debug("Received %s track from connection", track.kind)
if track.kind == "audio":
self._audioHandler._onTrack(track)
elif track.kind == "video":
self._videoHandler._onTrack(track)
[docs] def onDataChannel(self, callback=None):
"""
Acts as a subscriber...
"""
return self._dataChannelSubscriber.subscribe(callback)
[docs] def addDataChannel(self, name, ordered=True):
"""
Adds a data channel to the connection. Note that the RTCConnection adds a "default" channel
automatically, which you can subscribe to directly.
"""
self._log.debug("Adding data channel to connection")
if name in self._dataChannels or name == "default":
raise KeyError("Data channel %s already exists", name)
dc = DataChannel(self._rtc.createDataChannel(name, ordered=ordered))
self._dataChannels[name] = dc
return dc
[docs] def getDataChannel(self, name):
"""
Returns the data channel with the given name. Please note that the "default" channel is considered special,
and is not returned.
"""
if name == "default":
raise KeyError(
"Default channel not available for 'get'. Use the RTCConnection's subscribe and put_nowait methods for access to it."
)
return self._dataChannels[name]
@property
def video(self):
"""
Convenience function - you can subscribe to it to get video frames once they show up
"""
return self._videoHandler
@property
def audio(self):
"""
Convenience function - you can subscribe to it to get audio once a stream is received
"""
return self._audioHandler
[docs] def close(self):
"""
If the loop is running, returns a future that will close the connection. Otherwise, runs
the loop temporarily to complete closing.
"""
if self.closed:
if self._loop.is_running():
async def donothing():
pass
return asyncio.ensure_future(donothing())
return None
self._log.debug("Closing connection")
super().close()
# And closes all tracks
self.video.close()
self.audio.close()
for dc in self._dataChannels:
self._dataChannels[dc].close()
self._dataChannelSubscriber.close()
if self._loop.is_running():
self._log.debug("Loop is running - close will return a future!")
return asyncio.ensure_future(self._rtc.close())
else:
self._loop.run_until_complete(self._rtc.close())
return None
[docs] def send(self, msg):
"""
Send is an alias for put_nowait - makes it easier for people new to rtcbot to understand
what is going on
"""
self.put_nowait(msg)