Subscriptions

The subscriptions available here are quick solutions to common problems that come up with the async producer/consumer model.

API

class rtcbot.subscriptions.CallbackSubscription(callback, loop=None, runDirect=False)[source]

Bases: object

Sometimes you don’t want to await anything, you just want to run a callback upon an event. The CallbackSubscription allows you to do precisely that:

@CallbackSubscription
async def mycallback(value):
    print(value)

cam = CVCamera()
cam.subscribe(mycallback)

Note

This is no longer necessary: you can just pass a function to subscribe, and it will automatically be wrapped in a CallbackSubscription.

put_nowait(element)[source]
class rtcbot.subscriptions.DelayedSubscription(SubscriptionWriter, subscription=None)[source]

Bases: object

In some instances, you want to subscribe to something, but don’t actually want to start gathering the data until the data is needed.

This is especially common in something like audio streaming: if you were to subscribe to an audio stream right now, and get() the data only after a certain time, then there would be a large audio delay, because by default the audio subscription queues data.

This is common in the audio of an RTCConnection, where get is called only once the connection is established:

s = Microphone().subscribe()
conn = RTCConnection()
conn.audio.putSubscription(s) # Big audio delay!

Instead, what you want to do is delay subscribing until get is called the first time, which would wait until the connection is ready to start sending data:

s = DelayedSubscription(Microphone())
conn = RTCConnection()
conn.audio.putSubscription(s) # Calls Microphone.subscribe() on first get()

One caveat is that calling unsubscribe will not work on the DelayedSubscription - you must use unsubscribe as given in the DelayedSubscription! That means:

m = Microphone()
s = DelayedSubscription(m)
m.unsubscribe(s) # ERROR!

s.unsubscribe() # correct!
Parameters
  • SubscriptionWriter (BaseSubscriptionWriter) – An object with a subscribe method

  • subscription ((optional)) – The subscription to subscribe. If given, calls SubscriptionWriter.subscribe(subscription)

async get()[source]
unsubscribe()[source]
class rtcbot.subscriptions.EventSubscription[source]

Bases: object

This is a subscription that is fired once - upon the first insert.

async get()[source]
put_nowait(value)[source]
class rtcbot.subscriptions.GetterSubscription(callback)[source]

Bases: object

You might have a function which behaves like a get(), but it is just a function. The GetterSubscription is a wrapper that calls your function on get():

@GetterSubscription
async def myfunction():
    asyncio.sleep(1)
    return "hello!"

await myfunction.get()
# returns "hello!"
async get()[source]
class rtcbot.subscriptions.MostRecentSubscription[source]

Bases: object

The MostRecentSubscription always returns the most recently added element. If you get an element and immediately call get again, it will wait until the next element is received, it will not return elements that were already processed.

It is not threadsafe.

async get()[source]

Gets the most recently added element

put_nowait(element)[source]

Adds the given element to the subscription.

class rtcbot.subscriptions.RebatchSubscription(samples, axis=0, subscription=None)[source]

Bases: object

In certain cases, data comes with a suboptimal batch size. For example, audio coming from an RTCConnection is always of shape (960,2), with 2 channels, and 960 samples per batch. This subscription allows you to change the frame size by mixing and matching batches. For example:

s = RebatchSubscription(samples=1024,axis=0)
s.put_nowait(np.zeros((960,2)))

# asyncio.TimeoutError - the RebatchSubscription does
# not have enough data to create a batch of size 1024
rebatched = await asyncio.wait_for(s.get(),timeout=5)

# After adding another batch of 960, get returns a frame of goal shape
s.put_nowait(np.zeros((960,2)))
rebatched = await s.get()
print(rebatched.shape) # (1024,2)

The RebatchSubscription takes samples from the second data frame’s dimension 1 to create a new batch of the correct size.

async get()[source]
put_nowait(data)[source]