Base

RTCBot is heavily based upon the concept of data producers, and data consumers. To that end, all classes that produce data, such as cameras, microphones, and incoming data streams are considered producers, and all classes that consume data, such as speakers, video displays or outgoing data streams are considered consumers.

This section of the documentation is built to describe the backend base classes upon which all of the data streams are based, and to help you create your own producers and consumers with an API compatible with the rest of RTCBot.

There are 3 main base classes types

  1. BaseSubscriptionProducer and BaseSubscriptionConsumer

  2. ThreadedSubscriptionProducer and ThreadedSubscriptionConsumer

  3. MultiprocessSubscriptionProducer

The three types allow setting up your own data acquisition and processing code loops without needing to worry about the asyncio loop (Threaded) or even the GIL (Multiprocess), but also come with the downside of increasing complexity and communication overhead.

API

Note

Unlike elsewhere in RTCBot’s documentation, inherited members are not shown here, so some functions available from a class might be hidden if they were defined in a parent.

class rtcbot.base.events.baseEventHandler(logger)[source]

Bases: object

This class handles base events

_setError(value)[source]

Sets the error state of the class to an error that was caught while processing data.

After the error is set, the class is assumed to be in a closed state, meaning that any background processes either crashed or were shut down.

Warning

Only call this if you are subclassing baseEventHandler.

_setReady(value=True)[source]

Sets the ready to a given value, and fires all subscriptions created with onReady(). Call this when your producer/consumer is fully initialized.

Warning

Only call this if you are subclassing baseEventHandler.

close()[source]

Fires the onClose event

property closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

property error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

onClose(subscription=None)[source]

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)[source]

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)[source]

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

property ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

class rtcbot.base.events.threadedEventHandler(logger, loop=None)[source]

Bases: baseEventHandler

A threadsafe version of baseEventHandler.

_setError(err)[source]

Threadsafe version of baseEventHandler._setError().

_setReady(value)[source]

Threadsafe version of baseEventHandler._setReady().

class rtcbot.base.base.BaseSubscriptionConsumer(directPutSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None)[source]

Bases: baseEventHandler

A base class upon which consumers of subscriptions can be built.

The BaseSubscriptionConsumer class handles the logic of switching incoming subscriptions mid-stream and all the other annoying stuff.

async _get()[source]

Warning

Only call this if you are subclassing BaseSubscriptionConsumer.

This function is to be awaited by a subclass to get the next datapoint from the active subscription. It internally handles the subscription for you, and transparently manages the user switching a subscription during runtime:

myobj.putSubscription(x)
#  await self._get() waits on next datapoint from x
myobj.putSubscription(y)
# _get transparently switched to waiting on y
Raises

SubscriptionClosed – If close() was called, this error is raised, signalling your data processing function to clean up and exit.

Returns

The next datapoint that was put or subscribed to from the currently active subscription.

close()[source]

Cleans up and closes the object.

putSubscription(subscription)[source]

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
put_nowait(data)[source]

This function allows you to directly send data to the object, without needing to go through a subscription:

while True:
    data = get_data()
    myobj.put_nowait(data)

The put_nowait() method is the simplest way to process a new chunk of data.

Note

If there is currently an active subscription initialized through putSubscription(), it is immediately stopped, and the object waits only for put_nowait():

myobj.putSubscription(s)
myobj.put_nowait(mydata) # unsubscribes from s

assert myobj.subscription is None
stopSubscription()[source]

Stops reading the current subscription:

q = asyncio.Queue()
myobj.putSubscription(q)

assert myobj.subscription == q

myobj.stopSubscription()

assert myobj.subscription is None

# You can then subscribe again (or put_nowait)
myobj.putSubscription(q)
assert myobj.subscription == q

The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.

property subscription

Returns the currently active subscription:

q = asyncio.Queue()
myobj.putSubscription(q)
assert myobj.subscription == q

myobj.stopSubscription()
assert myobj.subscription is None

myobj.put_nowait(data)
assert myobj.subscription is None
class rtcbot.base.base.BaseSubscriptionProducer(defaultSubscriptionClass=<class 'asyncio.queues.Queue'>, defaultAutosubscribe=False, logger=None)[source]

Bases: baseEventHandler

This is a base class upon which all things that emit data in RTCBot are built.

This class offers all the machinery necessary to keep track of subscriptions to the incoming data. The most important methods from a user’s perspective are the subscribe(), get() and close() functions, which manage subscriptions to the data, and finally close everything.

From an subclass’s perspective, the most important pieces are the _put_nowait() method, and the _shouldClose and _ready attributes.

Once the subclass is ready, it should set _ready to True, and when receiving data, it should call _put_nowait() to insert it. Finally, it should either listen to _shouldClose or override the close method to stop producing data.

Example

A sample basic class that builds on the BaseSubscriptionProvider:

class MyProvider(BaseSubscriptionProvider):
    def __init__(self):
        super().__init__()

        # Add data in the background
        asyncio.ensure_future(self._dataProducer)

    async def _dataProducer(self):
        self._ready = True
        while not self._shouldClose:
            data = await get_data_here()
            self._put_nowait(data)
        self._ready = False
    def close():
        super().close()
        stop_gathering_data()

# you can now subscribe to the data
s = MyProvider().subscribe()
Parameters
  • defaultSubscriptionClass (optional) –

    The subscription type to return by default if subscribe() is called without arguments. By default, it uses asyncio.Queue:

    sp = SubscriptionProducer(defaultSubscriptionClass=asyncio.Queue)
    q = sp.subscribe()
    
    q is asyncio.Queue # True
    

  • defaultAutosubscribe (bool,optional) – Calling get() creates a default subscription on first time it is called. Sometimes the data is very critical, and you want the default subscription to be created right away, so it never misses data. Be aware, though, if your defaultSubscriptionClass is asyncio.Queue, if get() is never called, such as when someone just uses subscribe(), it will just keep piling up queued data! To avoid this, it is False by default.

  • logger (optional) – Your class logger - it gets a child of this logger for debug messages. If nothing is passed, creates a root logger for your class, and uses a child for that.

  • ready (bool,optional) – Your producer probably doesn’t need setup time, so this is set to True automatically, which automatically sets _ready. If you need to do background tasks, set this to False.

_close()[source]

This function allows closing from the handler itself. Don’t call close() directly when implementing producers or consumers. call _close instead.

_put_nowait(element)[source]

Used by subclasses to add data to all subscriptions. This method internally calls all registered callbacks for you, so you only need to worry about the single function call.

Warning

Only call this if you are subclassing BaseSubscriptionProducer.

_shouldClose

Whether or not close() was called, and the user wants the class to stop gathering data. Should only be accessed from a subclass.

close()[source]

Shuts down the data gathering, and removes all subscriptions.

async get()[source]

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
subscribe(subscription=None)[source]

Allows subscribing to new data as it comes in, returning a subscription (see Subscriptions):

s = myobj.subscribe()
while True:
    data = await s.get()
    print(data)

There can be multiple subscriptions active at the same time, each of which get identical data. Each call to subscribe() returns a new, independent subscription:

s1 = myobj.subscribe()
s2 = myobj.subscribe()
while True:
    assert await s1.get()== await s2.get()

This function can also be used as a callback:

@myobj.subscribe
def newData(data):
    print("Got data:",data)

If passed an argument, it attempts to use the given callback/coroutine/subscription to notify of incoming data.

Parameters

subscription (optional) –

An optional existing subscription to subscribe to. This can be one of 3 things:
  1. An object which has the method put_nowait (see Subscriptions):

    q = asyncio.Queue()
    myobj.subscribe(q)
    while True:
        data = await q.get()
        print(data)
    
  2. A callback function - this will be called the moment new data is inserted:

    @myobj.subscribe
    def myfunction(data):
        print(data)
    
  3. An coroutine callback - A future of this coroutine is created on each insert:

    @myobj.subscribe
    async def myfunction(data):
        await asyncio.sleep(5)
        print(data)
    

Returns

A subscription. If one was passed in, returns the passed in subscription:

q = asyncio.Queue()
ret = thing.subscribe(q)
assert ret==q

unsubscribe(subscription=None)[source]

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters

subscription (optional) – Anything that was passed into/returned from subscribe().

unsubscribeAll()[source]

Removes all currently active subscriptions, including the default one if it was intialized.

class rtcbot.base.base.NoClosedSubscription(awaitable)[source]

Bases: object

NoClosedSubscription wraps a callback, and doesn’t pass forward SubscriptionClosed errors - it converts them to asyncio.CancelledError. This allows exiting the application in a clean way.

exception rtcbot.base.base.SubscriptionClosed[source]

Bases: Exception

This error is returned internally by _get() in all subclasses of BaseSubscriptionConsumer when close() is called, and signals the consumer to shut down. For more detail, see BaseSubscriptionConsumer._get().

class rtcbot.base.base.SubscriptionConsumer(directPutSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None)[source]

Bases: BaseSubscriptionConsumer

class rtcbot.base.base.SubscriptionProducer(defaultSubscriptionClass=<class 'asyncio.queues.Queue'>, defaultAutosubscribe=False, logger=None)[source]

Bases: BaseSubscriptionProducer

class rtcbot.base.base.SubscriptionProducerConsumer(directPutSubscriptionType=<class 'asyncio.queues.Queue'>, defaultSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None, defaultAutosubscribe=False)[source]

Bases: BaseSubscriptionConsumer, BaseSubscriptionProducer

This base class represents an object which is both a producer and consumer. This is common with two-way connections.

Here, you call _get() to consume the incoming data, and _put_nowait() to produce outgoing data.

_close()[source]

This function allows closing from the handler itself. Don’t call close() directly when implementing producers or consumers. call _close instead.

close()[source]

Cleans up and closes the object.

class rtcbot.base.thread.ThreadedSubscriptionConsumer(directPutSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None, loop=None, daemonThread=True)[source]

Bases: BaseSubscriptionConsumer, threadedEventHandler

close()[source]

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.

Make sure to run close on exit, since sometimes Python has trouble exiting from multiple threads without having them closed explicitly.

putSubscription(subscription)[source]

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
class rtcbot.base.thread.ThreadedSubscriptionProducer(defaultSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None, loop=None, daemonThread=True)[source]

Bases: BaseSubscriptionProducer, threadedEventHandler

close()[source]

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.

class rtcbot.base.multiprocess.ProcessSubscriptionConsumer(directPutSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None, loop=None, daemonProcess=True, joinTimeout=1)[source]

Bases: BaseSubscriptionConsumer

close()[source]

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when shutting down.

putSubscription(subscription)[source]

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
class rtcbot.base.multiprocess.ProcessSubscriptionProducer(defaultSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None, loop=None, daemonProcess=True, joinTimeout=1)[source]

Bases: BaseSubscriptionProducer

close()[source]

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when shutting down.

class rtcbot.base.multiprocess.ProcessSubscriptionProducerConsumer(directPutSubscriptionType=<class 'asyncio.queues.Queue'>, defaultSubscriptionType=<class 'asyncio.queues.Queue'>, logger=None, defaultAutosubscribe=False, loop=None, daemonProcess=True, joinTimeout=1)[source]

Bases: BaseSubscriptionConsumer, BaseSubscriptionProducer

This base class represents an object which is both a producer and consumer, run as a separate process. This is common with two-way connections. Here, you call _get() to consume the incoming data, and _put_nowait() to produce outgoing data.

close()[source]

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when shutting down.

putSubscription(subscription)[source]

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
class rtcbot.base.multiprocess.internalSubscriptionMessage(type, value)[source]

Bases: object