Running Blocking Code¶
RTCBot uses python’s asyncio event loop. This means that Python runs in a loop, handling events as they come in, all in a single thread. Any long-running operation must be specially coded to be async, so that it does not block operation of the event loop.
A Common Issue¶
Suppose that you have a sensor that you want to use with RTCBot. Your goal is to retrieve values from the sensor, and then send the results to the browser.
We will use the function get_sensor_data
to represent a sensor which takes half a second to retrieve data:
import time
import random
def get_sensor_data():
time.sleep(0.5) # Represents an operation that takes half a second to complete
return random.random()
We will base this code on the original single-connection video-streaming tutorial for simplicity. We will send the sensor reading once a second:
from aiohttp import web
routes = web.RouteTableDef()
from rtcbot import RTCConnection, getRTCBotJS, CVCamera
camera = CVCamera()
# For this example, we use just one global connection
conn = RTCConnection()
conn.video.putSubscription(camera)
+import time
+import random
+import asyncio
+
+
+def get_sensor_data():
+ time.sleep(0.5) # Represents an operation that takes half a second to complete
+ return random.random()
+
+
+async def send_sensor_data():
+ while True:
+ await asyncio.sleep(1)
+ data = get_sensor_data()
+ conn.put_nowait(data) # Send data to browser
+
+
+asyncio.ensure_future(send_sensor_data())
# Serve the RTCBot javascript library at /rtcbot.js
@routes.get("/rtcbot.js")
async def rtcbotjs(request):
return web.Response(content_type="application/javascript", text=getRTCBotJS())
# This sets up the connection
@routes.post("/connect")
async def connect(request):
clientOffer = await request.json()
serverResponse = await conn.getLocalDescription(clientOffer)
return web.json_response(serverResponse)
@routes.get("/")
async def index(request):
return web.Response(
content_type="text/html",
text="""
<html>
<head>
<title>RTCBot: Video</title>
<script src="/rtcbot.js"></script>
</head>
<body style="text-align: center;padding-top: 30px;">
<video autoplay playsinline muted controls></video>
<p>
Open the browser's developer tools to see console messages (CTRL+SHIFT+C)
</p>
<script>
var conn = new rtcbot.RTCConnection();
conn.video.subscribe(function(stream) {
document.querySelector("video").srcObject = stream;
});
+ conn.subscribe(m => console.log("Received from python:", m));
+
async function connect() {
let offer = await conn.getLocalDescription();
// POST the information to /connect
let response = await fetch("/connect", {
method: "POST",
cache: "no-cache",
body: JSON.stringify(offer)
});
await conn.setRemoteDescription(await response.json());
console.log("Ready!");
}
connect();
</script>
</body>
</html>
""",
)
async def cleanup(app=None):
await conn.close()
camera.close()
conn.onClose(cleanup)
app = web.Application()
app.add_routes(routes)
app.on_shutdown.append(cleanup)
web.run_app(app)
If you try this code, the video will freeze for half a second each second, while the sensor is being queried (i.e. while time.sleep(0.5)
is being run).
This is because all of RTCBot’s tasks happen in the same thread, and while reading the sensor, RTCBot is not sending video frames!
To fix this issue, the sensor needs to be read in a different thread, so that the event loop is not blocked. The sensor data then needs to be moved to the main thread, where it can be used by rtcbot.
Producing Data in Another Thread¶
Thankfully, RTCBot has built-in helper classes that set everything up for you here. The ThreadedSubscriptionProducer
runs in a system thread, allowing arbitrary blocking code, and has built-in mechanisms that let you queue up data for use from the asyncio event loop.
The code that blocks the connection:
import time
import random
import asyncio
def get_sensor_data():
time.sleep(0.5) # Represents an operation that takes half a second to complete
return random.random()
async def send_sensor_data():
while True:
await asyncio.sleep(1)
data = get_sensor_data()
conn.put_nowait(data) # Send data to browser
asyncio.ensure_future(send_sensor_data())
can be fixed by moving the sensor-querying code into a ThreadedSubscriptionProducer
:
import time
import random
import asyncio
from rtcbot.base import ThreadedSubscriptionProducer
def get_sensor_data():
time.sleep(0.5) # Represents an operation that takes half a second to complete
return random.random()
class MySensor(ThreadedSubscriptionProducer):
def _producer(self):
self._setReady(True) # Notify that ready to start gathering data
while not self._shouldClose: # Keep gathering until close is requested
time.sleep(1)
data = get_sensor_data()
# Send the data to the asyncio thread,
# so it can be retrieved with await mysensor.get()
self._put_nowait(data)
self._setReady(False) # Notify that sensor is no longer operational
mysensor = MySensor()
async def send_sensor_data():
while True:
data = await mysensor.get() # we await the output of MySensor in a loop
conn.put_nowait(data)
asyncio.ensure_future(send_sensor_data())
...
async def cleanup(app=None):
await conn.close()
camera.close()
mysensor.close()
Consuming Data in Another Thread¶
RTCBot has an equivalent mechanism for ingesting data - you can retrieve data, and then use it to control things with blocking code.
import time
def set_output_value(value):
time.sleep(0.5) # Represents an operation that takes half a second to complete
print(value)
from rtcbot.base import ThreadedSubscriptionConsumer, SubscriptionClosed
class MyOutput(ThreadedSubscriptionConsumer):
def _consumer(self):
self._setReady(True)
while not self._shouldClose:
try:
data = self._get()
set_output_value(data)
except SubscriptionClosed:
break
self._setReady(False)
myoutput = MyOutput()
You can now use myoutput.put_nowait
in rtcbot to queue up data, which will be retrieved from the consumer thread.
Summary¶
This tutorial introduced the ThreadedSubscriptionProducer
and ThreadedSubscriptionConsumer
classes, which allow you to use blocking code with the asyncio event loop. These functions allow handling the connection in the main thread, and doing all actions that might take a while in separate threads.