Running Blocking Code¶
RTCBot uses python’s asyncio event loop. This means that Python runs in a loop, handling events as they come in, all in a single thread. Any long-running operation must be specially coded to be async, so that it does not block operation of the event loop.
A Common Issue¶
Suppose that you have a sensor that you want to use with RTCBot. Your goal is to retrieve values from the sensor, and then send the results to the browser.
We will use the function
get_sensor_data to represent a sensor which takes half a second to retrieve data:
import time import random def get_sensor_data(): time.sleep(0.5) # Represents an operation that takes half a second to complete return random.random()
Suppose we were to add this function to the code used for the video streaming tutorial. We will send the sensor reading once a second:
If you try this code, the video will freeze for half a second each second, while the sensor is being queried (i.e. while
time.sleep(0.5) is being run).
This is because all of RTCBot’s tasks happen in the same thread, and while reading the sensor, RTCBot is not sending video frames!
To fix this issue, the sensor needs to be read in a different thread, so that the event loop is not blocked. The sensor data then needs to be moved to the main thread, where it can be used by rtcbot.
Producing Data in Another Thread¶
Thankfully, RTCBot has built-in helper classes that set everything up for you here. The
ThreadedSubscriptionProducer runs in a system thread, allowing arbitrary blocking code, and has built-in mechanisms that let you queue up data for use from the asyncio event loop.
The “bad” code:
import time import random import asyncio def get_sensor_data(): time.sleep(0.5) # Represents an operation that takes half a second to complete return random.random() async def send_sensor_data(): while True: await asyncio.sleep(1) data = get_sensor_data() conn.put_nowait(data) # Send data to browser asyncio.ensure_future(send_sensor_data())
can be fixed by moving the sensor-querying code into a
import time import random import asyncio from rtcbot.base import ThreadedSubscriptionProducer def get_sensor_data(): time.sleep(0.5) # Represents an operation that takes half a second to complete return random.random() class MySensor(ThreadedSubscriptionProducer): def _producer(self): self._setReady(True) # Notify that ready to start gathering data while not self._shouldClose: # Keep gathering until close is requested time.sleep(1) data = get_sensor_data() # Send the data to the asyncio thread, # so it can be retrieved with await mysensor.get() self._put_nowait(data) self._setReady(False) # Notify that sensor is no longer operational mysensor = MySensor() async def send_sensor_data(): while True: data = await mysensor.get() # we await the output of MySensor in a loop conn.put_nowait(data) asyncio.ensure_future(send_sensor_data()) ... async def cleanup(app=None): await conn.close() camera.close() mysensor.close()
Consuming Data in Another Thread¶
RTCBot has an equivalent mechanism for ingesting data - you can retrieve data, and then use it to control things with blocking code.
import time def set_output_value(value): time.sleep(0.5) # Represents an operation that takes half a second to complete print(value) from rtcbot.base import ThreadedSubscriptionConsumer, SubscriptionClosed class MyOutput(ThreadedSubscriptionConsumer): def _consumer(self): self._setReady(True) while not self._shouldClose: try: data = self._get() set_output_value(data) except SubscriptionClosed: break self._setReady(False) myoutput = MyOutput()
You can now use
myoutput.put_nowait in rtcbot to queue up data, which will be retrieved from the consumer thread.
This tutorial introduced the
ThreadedSubscriptionConsumer classes, which allow you to use blocking code with the asyncio event loop. These functions allow handling the connection in the main thread, and doing all actions that might take a while in separate threads.