Subscriptions¶
The subscriptions available here are quick solutions to common problems that come up with the async producer/consumer model.
API¶
- class rtcbot.subscriptions.CallbackSubscription(callback, loop=None, runDirect=False)[source]¶
Bases:
object
Sometimes you don’t want to await anything, you just want to run a callback upon an event. The CallbackSubscription allows you to do precisely that:
@CallbackSubscription async def mycallback(value): print(value) cam = CVCamera() cam.subscribe(mycallback)
Note
This is no longer necessary: you can just pass a function to subscribe, and it will automatically be wrapped in a CallbackSubscription.
- class rtcbot.subscriptions.DelayedSubscription(SubscriptionWriter, subscription=None)[source]¶
Bases:
object
In some instances, you want to subscribe to something, but don’t actually want to start gathering the data until the data is needed.
This is especially common in something like audio streaming: if you were to subscribe to an audio stream right now, and get() the data only after a certain time, then there would be a large audio delay, because by default the audio subscription queues data.
This is common in the audio of an RTCConnection, where get is called only once the connection is established:
s = Microphone().subscribe() conn = RTCConnection() conn.audio.putSubscription(s) # Big audio delay!
Instead, what you want to do is delay subscribing until get is called the first time, which would wait until the connection is ready to start sending data:
s = DelayedSubscription(Microphone()) conn = RTCConnection() conn.audio.putSubscription(s) # Calls Microphone.subscribe() on first get()
One caveat is that calling unsubscribe will not work on the DelayedSubscription - you must use unsubscribe as given in the DelayedSubscription! That means:
m = Microphone() s = DelayedSubscription(m) m.unsubscribe(s) # ERROR! s.unsubscribe() # correct!
- Parameters
SubscriptionWriter (BaseSubscriptionWriter) – An object with a subscribe method
subscription ((optional)) – The subscription to subscribe. If given, calls SubscriptionWriter.subscribe(subscription)
- class rtcbot.subscriptions.EventSubscription[source]¶
Bases:
object
This is a subscription that is fired once - upon the first insert.
- class rtcbot.subscriptions.GetterSubscription(callback)[source]¶
Bases:
object
You might have a function which behaves like a get(), but it is just a function. The GetterSubscription is a wrapper that calls your function on get():
@GetterSubscription async def myfunction(): asyncio.sleep(1) return "hello!" await myfunction.get() # returns "hello!"
- class rtcbot.subscriptions.MostRecentSubscription[source]¶
Bases:
object
The MostRecentSubscription always returns the most recently added element. If you get an element and immediately call get again, it will wait until the next element is received, it will not return elements that were already processed.
It is not threadsafe.
- class rtcbot.subscriptions.RebatchSubscription(samples, axis=0, subscription=None)[source]¶
Bases:
object
In certain cases, data comes with a suboptimal batch size. For example, audio coming from an RTCConnection is always of shape (960,2), with 2 channels, and 960 samples per batch. This subscription allows you to change the frame size by mixing and matching batches. For example:
s = RebatchSubscription(samples=1024,axis=0) s.put_nowait(np.zeros((960,2))) # asyncio.TimeoutError - the RebatchSubscription does # not have enough data to create a batch of size 1024 rebatched = await asyncio.wait_for(s.get(),timeout=5) # After adding another batch of 960, get returns a frame of goal shape s.put_nowait(np.zeros((960,2))) rebatched = await s.get() print(rebatched.shape) # (1024,2)
The RebatchSubscription takes samples from the second data frame’s dimension 1 to create a new batch of the correct size.