RTC Connection

API

class rtcbot.connection.ConnectionAudioHandler(rtc)[source]

Bases: rtcbot.base.base.SubscriptionProducerConsumer

Allows usage of RTCConnection as follows:

r = RTCConnection()
audioSubscription = r.audio.subscribe()

r.audio.putSubscription(audioSubscription)

It uses the first incoming audio stream for subscribe(), and creates a single outgoing audio stream.

Subscribing to the tracks can be done

addTrack(subscription=None, sampleRate=48000, canSkip=True)[source]

Allows to send multiple audio tracks in a single connection. Each call to putTrack adds the track to the connection. For simple usage, where you only have a single audio stream, just use putSubscription - it automatically calls putTrack for you.

close()[source]

Cleans up and closes the object.

closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
offerToReceive(num=1)[source]

Set the number of tracks that you can receive

onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

onTrack(callback=None)[source]

Callback that gets called each time a audio track is received:

@r.audio.onTrack
def onTrack(track):
    print(track)

The callback actually works exactly as a subscribe(), so you can do:

subscription = r.audio.onTrack()
await subscription.get()

Note that if you have more than one track, you will need to tell rtcbot how many tracks to prepare to receive:

r.audio.offerToReceive(2)
putSubscription(subscription)[source]

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
put_nowait(data)

This function allows you to directly send data to the object, without needing to go through a subscription:

while True:
    data = get_data()
    myobj.put_nowait(data)

The put_nowait() method is the simplest way to process a new chunk of data.

Note

If there is currently an active subscription initialized through putSubscription(), it is immediately stopped, and the object waits only for put_nowait():

myobj.putSubscription(s)
myobj.put_nowait(mydata) # unsubscribes from s

assert myobj.subscription is None
ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

stopSubscription()

Stops reading the current subscription:

q = asyncio.Queue()
myobj.putSubscription(q)

assert myobj.subscription == q

myobj.stopSubscription()

assert myobj.subscription is None

# You can then subscribe again (or put_nowait)
myobj.putSubscription(q)
assert myobj.subscription == q

The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.

subscribe(subscription=None)[source]

Allows subscribing to new data as it comes in, returning a subscription (see Subscriptions):

s = myobj.subscribe()
while True:
    data = await s.get()
    print(data)

There can be multiple subscriptions active at the same time, each of which get identical data. Each call to subscribe() returns a new, independent subscription:

s1 = myobj.subscribe()
s2 = myobj.subscribe()
while True:
    assert await s1.get()== await s2.get()

This function can also be used as a callback:

@myobj.subscribe
def newData(data):
    print("Got data:",data)

If passed an argument, it attempts to use the given callback/coroutine/subscription to notify of incoming data.

Parameters:subscription (optional) –
An optional existing subscription to subscribe to. This can be one of 3 things:
  1. An object which has the method put_nowait (see Subscriptions):
    q = asyncio.Queue()
    myobj.subscribe(q)
    while True:
        data = await q.get()
        print(data)
    
  2. A callback function - this will be called the moment new data is inserted:
    @myobj.subscribe
    def myfunction(data):
        print(data)
    
  3. An coroutine callback - A future of this coroutine is created on each insert:
    @myobj.subscribe
    async def myfunction(data):
        await asyncio.sleep(5)
        print(data)
    
Returns:A subscription. If one was passed in, returns the passed in subscription:
q = asyncio.Queue()
ret = thing.subscribe(q)
assert ret==q
subscription

Returns the currently active subscription:

q = asyncio.Queue()
myobj.putSubscription(q)
assert myobj.subscription == q

myobj.stopSubscription()
assert myobj.subscription is None

myobj.put_nowait(data)
assert myobj.subscription is None
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters:subscription (optional) – Anything that was passed into/returned from subscribe().
unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.

class rtcbot.connection.ConnectionVideoHandler(rtc)[source]

Bases: rtcbot.base.base.SubscriptionProducerConsumer

Example:

Allows usage of RTCConnection as follows:

r = RTCConnection()
frameSubscription = r.video.subscribe()

r.video.putSubscription(frameSubscription)

It uses the first incoming video stream for subscribe(), and creates a single outgoing video stream.

Subscribing to the tracks can be done

addTrack(frameSubscription=None, fps=None, canSkip=True)[source]

Allows to send multiple video tracks in a single connection. Each call to putTrack adds the track to the connection. For simple usage, where you only have a single video stream, just use putSubscription - it automatically calls putTrack for you.

close()[source]

Cleans up and closes the object.

closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
offerToReceive(num=1)[source]

Set the number of tracks that you can receive

onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

onTrack(callback=None)[source]

Callback that gets called each time a video track is received:

@r.video.onTrack
def onTrack(track):
    print(track)

The callback actually works exactly as a subscribe(), so you can do:

subscription = r.video.onTrack()
await subscription.get()

Note that if you have more than one track, you will need to tell rtcbot how many tracks to prepare to receive:

r.video.offerToReceive(2)
putSubscription(subscription)[source]

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
put_nowait(data)

This function allows you to directly send data to the object, without needing to go through a subscription:

while True:
    data = get_data()
    myobj.put_nowait(data)

The put_nowait() method is the simplest way to process a new chunk of data.

Note

If there is currently an active subscription initialized through putSubscription(), it is immediately stopped, and the object waits only for put_nowait():

myobj.putSubscription(s)
myobj.put_nowait(mydata) # unsubscribes from s

assert myobj.subscription is None
ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

stopSubscription()

Stops reading the current subscription:

q = asyncio.Queue()
myobj.putSubscription(q)

assert myobj.subscription == q

myobj.stopSubscription()

assert myobj.subscription is None

# You can then subscribe again (or put_nowait)
myobj.putSubscription(q)
assert myobj.subscription == q

The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.

subscribe(subscription=None)[source]

Allows subscribing to new data as it comes in, returning a subscription (see Subscriptions):

s = myobj.subscribe()
while True:
    data = await s.get()
    print(data)

There can be multiple subscriptions active at the same time, each of which get identical data. Each call to subscribe() returns a new, independent subscription:

s1 = myobj.subscribe()
s2 = myobj.subscribe()
while True:
    assert await s1.get()== await s2.get()

This function can also be used as a callback:

@myobj.subscribe
def newData(data):
    print("Got data:",data)

If passed an argument, it attempts to use the given callback/coroutine/subscription to notify of incoming data.

Parameters:subscription (optional) –
An optional existing subscription to subscribe to. This can be one of 3 things:
  1. An object which has the method put_nowait (see Subscriptions):
    q = asyncio.Queue()
    myobj.subscribe(q)
    while True:
        data = await q.get()
        print(data)
    
  2. A callback function - this will be called the moment new data is inserted:
    @myobj.subscribe
    def myfunction(data):
        print(data)
    
  3. An coroutine callback - A future of this coroutine is created on each insert:
    @myobj.subscribe
    async def myfunction(data):
        await asyncio.sleep(5)
        print(data)
    
Returns:A subscription. If one was passed in, returns the passed in subscription:
q = asyncio.Queue()
ret = thing.subscribe(q)
assert ret==q
subscription

Returns the currently active subscription:

q = asyncio.Queue()
myobj.putSubscription(q)
assert myobj.subscription == q

myobj.stopSubscription()
assert myobj.subscription is None

myobj.put_nowait(data)
assert myobj.subscription is None
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters:subscription (optional) – Anything that was passed into/returned from subscribe().
unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.

class rtcbot.connection.DataChannel(rtcDataChannel, json=True)[source]

Bases: rtcbot.base.base.SubscriptionProducerConsumer

Represents a data channel. You can put_nowait messages into it, and subscribe to messages coming from it.

close()[source]

Cleans up and closes the object.

closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
name
onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

putSubscription(subscription)

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
put_nowait(data)

This function allows you to directly send data to the object, without needing to go through a subscription:

while True:
    data = get_data()
    myobj.put_nowait(data)

The put_nowait() method is the simplest way to process a new chunk of data.

Note

If there is currently an active subscription initialized through putSubscription(), it is immediately stopped, and the object waits only for put_nowait():

myobj.putSubscription(s)
myobj.put_nowait(mydata) # unsubscribes from s

assert myobj.subscription is None
ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

stopSubscription()

Stops reading the current subscription:

q = asyncio.Queue()
myobj.putSubscription(q)

assert myobj.subscription == q

myobj.stopSubscription()

assert myobj.subscription is None

# You can then subscribe again (or put_nowait)
myobj.putSubscription(q)
assert myobj.subscription == q

The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.

subscribe(subscription=None)

Allows subscribing to new data as it comes in, returning a subscription (see Subscriptions):

s = myobj.subscribe()
while True:
    data = await s.get()
    print(data)

There can be multiple subscriptions active at the same time, each of which get identical data. Each call to subscribe() returns a new, independent subscription:

s1 = myobj.subscribe()
s2 = myobj.subscribe()
while True:
    assert await s1.get()== await s2.get()

This function can also be used as a callback:

@myobj.subscribe
def newData(data):
    print("Got data:",data)

If passed an argument, it attempts to use the given callback/coroutine/subscription to notify of incoming data.

Parameters:subscription (optional) –
An optional existing subscription to subscribe to. This can be one of 3 things:
  1. An object which has the method put_nowait (see Subscriptions):
    q = asyncio.Queue()
    myobj.subscribe(q)
    while True:
        data = await q.get()
        print(data)
    
  2. A callback function - this will be called the moment new data is inserted:
    @myobj.subscribe
    def myfunction(data):
        print(data)
    
  3. An coroutine callback - A future of this coroutine is created on each insert:
    @myobj.subscribe
    async def myfunction(data):
        await asyncio.sleep(5)
        print(data)
    
Returns:A subscription. If one was passed in, returns the passed in subscription:
q = asyncio.Queue()
ret = thing.subscribe(q)
assert ret==q
subscription

Returns the currently active subscription:

q = asyncio.Queue()
myobj.putSubscription(q)
assert myobj.subscription == q

myobj.stopSubscription()
assert myobj.subscription is None

myobj.put_nowait(data)
assert myobj.subscription is None
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters:subscription (optional) – Anything that was passed into/returned from subscribe().
unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.

class rtcbot.connection.RTCConnection(defaultChannelOrdered=True, loop=None, rtcConfiguration=<sphinx.ext.autodoc.importer._MockObject object>)[source]

Bases: rtcbot.base.base.SubscriptionProducerConsumer

addDataChannel(name, ordered=True)[source]

Adds a data channel to the connection. Note that the RTCConnection adds a “default” channel automatically, which you can subscribe to directly.

audio

Convenience function - you can subscribe to it to get audio once a stream is received

close()[source]

If the loop is running, returns a future that will close the connection. Otherwise, runs the loop temporarily to complete closing.

closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
getDataChannel(name)[source]

Returns the data channel with the given name. Please note that the “default” channel is considered special, and is not returned.

getLocalDescription(description=None)[source]

Gets the description to send on. Creates an initial description if no remote description was passed, and creates a response if a remote was given,

onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onDataChannel(callback=None)[source]

Acts as a subscriber…

onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

putSubscription(subscription)

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
put_nowait(data)

This function allows you to directly send data to the object, without needing to go through a subscription:

while True:
    data = get_data()
    myobj.put_nowait(data)

The put_nowait() method is the simplest way to process a new chunk of data.

Note

If there is currently an active subscription initialized through putSubscription(), it is immediately stopped, and the object waits only for put_nowait():

myobj.putSubscription(s)
myobj.put_nowait(mydata) # unsubscribes from s

assert myobj.subscription is None
ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

send(msg)[source]

Send is an alias for put_nowait - makes it easier for people new to rtcbot to understand what is going on

setRemoteDescription(description)[source]
stopSubscription()

Stops reading the current subscription:

q = asyncio.Queue()
myobj.putSubscription(q)

assert myobj.subscription == q

myobj.stopSubscription()

assert myobj.subscription is None

# You can then subscribe again (or put_nowait)
myobj.putSubscription(q)
assert myobj.subscription == q

The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.

subscribe(subscription=None)

Allows subscribing to new data as it comes in, returning a subscription (see Subscriptions):

s = myobj.subscribe()
while True:
    data = await s.get()
    print(data)

There can be multiple subscriptions active at the same time, each of which get identical data. Each call to subscribe() returns a new, independent subscription:

s1 = myobj.subscribe()
s2 = myobj.subscribe()
while True:
    assert await s1.get()== await s2.get()

This function can also be used as a callback:

@myobj.subscribe
def newData(data):
    print("Got data:",data)

If passed an argument, it attempts to use the given callback/coroutine/subscription to notify of incoming data.

Parameters:subscription (optional) –
An optional existing subscription to subscribe to. This can be one of 3 things:
  1. An object which has the method put_nowait (see Subscriptions):
    q = asyncio.Queue()
    myobj.subscribe(q)
    while True:
        data = await q.get()
        print(data)
    
  2. A callback function - this will be called the moment new data is inserted:
    @myobj.subscribe
    def myfunction(data):
        print(data)
    
  3. An coroutine callback - A future of this coroutine is created on each insert:
    @myobj.subscribe
    async def myfunction(data):
        await asyncio.sleep(5)
        print(data)
    
Returns:A subscription. If one was passed in, returns the passed in subscription:
q = asyncio.Queue()
ret = thing.subscribe(q)
assert ret==q
subscription

Returns the currently active subscription:

q = asyncio.Queue()
myobj.putSubscription(q)
assert myobj.subscription == q

myobj.stopSubscription()
assert myobj.subscription is None

myobj.put_nowait(data)
assert myobj.subscription is None
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters:subscription (optional) – Anything that was passed into/returned from subscribe().
unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.

video

Convenience function - you can subscribe to it to get video frames once they show up