Camera¶
The Camera API allows you to subscribe to video frames coming in from a webcam.
To use this API, you will need to either have OpenCV installed
(for use with CVCamera
and CVDisplay
), have picamera
installed to use PiCamera
.
To install OpenCV on Ubuntu 18.04 or Raspbian Buster, use the following command:
sudo apt-get install python3-opencv
On Raspbian Stretch or older Ubuntu, you can install it with:
sudo apt-get install python-opencv
If using Windows or Mac, it is recommended that you use Anaconda, and install OpenCV from there.
If on a Raspberry Pi, you don’t need OpenCV at all to use the official Pi Camera.
CVCamera¶
The CVCamera uses a webcam connected to your computer, and gathers video frames using OpenCV:
import asyncio
from rtcbot import CVCamera, CVDisplay
camera = CVCamera()
display = CVDisplay()
display.putSubscription(camera)
try:
asyncio.get_event_loop().run_forever()
finally:
camera.close()
display.close()
The frames are gathered as BGR numpy arrays, so you can perform any OpenCV functions you’d like on them. For example, the following code shows the video in black and white:
import asyncio
from rtcbot import CVCamera, CVDisplay
import cv2
camera = CVCamera()
display = CVDisplay()
@camera.subscribe
def onFrame(frame):
bwframe = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
display.put_nowait(bwframe)
try:
asyncio.get_event_loop().run_forever()
finally:
camera.close()
display.close()
Warning
There is currently an issue with threading in OpenCV that makes CVDisplay not work on Mac.
PiCamera¶
This allows you to use the official raspberry pi camera. You can use it in exactly the same way as the OpenCV camera above, and it returns exactly the same data for the frame:
import asyncio
from rtcbot import PiCamera, CVDisplay
camera = PiCamera()
display = CVDisplay()
display.putSubscription(camera)
try:
asyncio.get_event_loop().run_forever()
finally:
camera.close()
display.close()
This means that if not using CVDisplay, you don’t even need OpenCV installed to stream from you raspberry pi.
PiCamera2¶
This allows you to use the official raspberry pi camera, with libcamera stack (legacy camera interface disabled). This is default since Raspberry Pi OS bullseye, PiCamera2 also works with 64-bit OS. You can use the parameter hflip=1 to flip the camera horizontally, vflip=1 to flip vertically, or both to rotate 180 degrees. You can use it in exactly the same way as the OpenCV camera above, and it returns exactly the same data for the frame:
import asyncio
from rtcbot import PiCamera2, CVDisplay
camera = PiCamera2()
display = CVDisplay()
display.putSubscription(camera)
try:
asyncio.get_event_loop().run_forever()
finally:
camera.close()
display.close()
This means that if not using CVDisplay, you don’t even need OpenCV installed to stream from you raspberry pi.
API¶
- class rtcbot.camera.CVCamera(width=320, height=240, cameranumber=0, fps=30, preprocessframe=<function CVCamera.<lambda>>, loop=None)[source]¶
Bases:
ThreadedSubscriptionProducer
Uses a camera supported by OpenCV.
When initializing, can give an optional function which preprocesses frames as they are read, and returns the modified versions thereof. Please note that the preprocessing happens synchronously in the camera capture thread, so any processing should be relatively fast, and should avoid pure python code due to the GIL. Numpy and openCV functions should be OK.
- close()¶
Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.
The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.
- property closed¶
Returns whether the object was closed. This includes both thrown exceptions, and clean exits.
- property error¶
If there is an error that causes the underlying process to crash, this property will hold the actual
Exception
that was thrown:if myobject.error is not None: print("Oh no! There was an error:",myobject.error)
This property is offered for convenience, but usually, you will want to subscribe to the error by using
onError()
, which will notify your app when the issue happens.Note
If the error is not None, the object is considered crashed, and no longer processing data.
- async get()¶
Behaves similarly to
subscribe().get()
. On the first call, creates a default subscription, and all subsequent calls toget()
use that subscription.If
unsubscribe()
is called, the subscription is deleted, so a subsequent call toget()
will create a new one:data = await myobj.get() # Creates subscription on first call data = await myobj.get() # Same subscription myobj.unsubscribe() data2 = await myobj.get() # A new subscription
The above code is equivalent to the following:
defaultSubscription = myobj.subscribe() data = await defaultSubscription.get() data = await defaultSubscription.get() myobj.unsubscribe(defaultSubscription) newDefaultSubscription = myobj.subscribe() data = await newDefaultSubscription.get()
- onClose(subscription=None)¶
This is mainly useful for connections - they can be closed remotely. This allows handling the close event.
@myobj.onClose def closeCallback(): print("Closed!)
Be aware that this is equivalent to explicitly awaiting the object:
await myobj
- onError(subscription=None)¶
Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.
To catch these errors, when an unhandled exception happens, the error event is fired, with the associated
Exception
. This function allows you to subscribe to these events:@myobj.onError def error_happened(err): print("Crap, stuff just crashed: ",err)
The
onError()
function behaves in the same way as asubscribe()
, which means that you can pass it a coroutine, or even directly await it:err = await myobj.onError()
- onReady(subscription=None)¶
Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.
This function allows you to listen for this event:
@myobj.onReady def readyCallback(): print("Ready!)
The function works in exactly the same way as a
subscribe()
, meaning that you can pass it a coroutine, or even await it directly:await myobj.onReady()
Note
The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.
- property ready¶
This is True when the class has been fully initialized, and is ready to process data:
if not myobject.ready: print("Not ready to process data")
This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the
onReady()
function, which will allow you to set up a callback/coroutine to wait until initialized.Note
You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.
- subscribe(subscription=None)[source]¶
Subscribe to new frames as they come in. By default returns a MostRecentSubscription object, which can be awaited to get the most recent frame, and skips missed frames.
Note that all subscribers get the same frame data numpy array, so if you are going to modify the values of the array itself, please do so in a copy!:
# Set up a camera and subscribe to new frames cam = CVCamera() subs = cam.subscribe() async def mytask(): # Wait for the next frame myframe = await subs.get() # Do stuff with the frame
If you want to have a different subscription type, you can pass anything which has a put_nowait method, which is called each time a frame comes in:
subs = cam.subscribe(asyncio.Queue()) # asyncio queue has a put_nowait method await subs.get()
- unsubscribe(subscription=None)¶
Removes the given subscription, so that it no longer gets updated:
subs = myobj.subscribe() myobj.unsubscribe(subs)
If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.
- Parameters
subscription (optional) – Anything that was passed into/returned from
subscribe()
.
- unsubscribeAll()¶
Removes all currently active subscriptions, including the default one if it was intialized.
- class rtcbot.camera.CVDisplay(name=None, loop=None)[source]¶
Bases:
BaseSubscriptionConsumer
Displays the frames in an openCV imshow window
Warning
Due to an issue with threading in OpenCV on Mac, CVDisplay does not work on Mac.
- close()¶
Cleans up and closes the object.
- property closed¶
Returns whether the object was closed. This includes both thrown exceptions, and clean exits.
- property error¶
If there is an error that causes the underlying process to crash, this property will hold the actual
Exception
that was thrown:if myobject.error is not None: print("Oh no! There was an error:",myobject.error)
This property is offered for convenience, but usually, you will want to subscribe to the error by using
onError()
, which will notify your app when the issue happens.Note
If the error is not None, the object is considered crashed, and no longer processing data.
- onClose(subscription=None)¶
This is mainly useful for connections - they can be closed remotely. This allows handling the close event.
@myobj.onClose def closeCallback(): print("Closed!)
Be aware that this is equivalent to explicitly awaiting the object:
await myobj
- onError(subscription=None)¶
Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.
To catch these errors, when an unhandled exception happens, the error event is fired, with the associated
Exception
. This function allows you to subscribe to these events:@myobj.onError def error_happened(err): print("Crap, stuff just crashed: ",err)
The
onError()
function behaves in the same way as asubscribe()
, which means that you can pass it a coroutine, or even directly await it:err = await myobj.onError()
- onReady(subscription=None)¶
Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.
This function allows you to listen for this event:
@myobj.onReady def readyCallback(): print("Ready!)
The function works in exactly the same way as a
subscribe()
, meaning that you can pass it a coroutine, or even await it directly:await myobj.onReady()
Note
The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.
- putSubscription(subscription)¶
Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:
q = asyncio.Queue() # an asyncio.Queue has a get() coroutine myobj.putSubscription(q) q.put_nowait(data)
Equivalent to doing the following in the background:
while True: myobj.put_nowait(await q.get())
You can replace a currently running subscription with a new one at any point in time:
q1 = asyncio.Queue() myobj.putSubscription(q1) assert myobj.subscription == q1 q2 = asyncio.Queue() myobj.putSubscription(q2) assert myobj.subscription == q2
- put_nowait(data)¶
This function allows you to directly send data to the object, without needing to go through a subscription:
while True: data = get_data() myobj.put_nowait(data)
The
put_nowait()
method is the simplest way to process a new chunk of data.Note
If there is currently an active subscription initialized through
putSubscription()
, it is immediately stopped, and the object waits only forput_nowait()
:myobj.putSubscription(s) myobj.put_nowait(mydata) # unsubscribes from s assert myobj.subscription is None
- property ready¶
This is True when the class has been fully initialized, and is ready to process data:
if not myobject.ready: print("Not ready to process data")
This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the
onReady()
function, which will allow you to set up a callback/coroutine to wait until initialized.Note
You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.
- stopSubscription()¶
Stops reading the current subscription:
q = asyncio.Queue() myobj.putSubscription(q) assert myobj.subscription == q myobj.stopSubscription() assert myobj.subscription is None # You can then subscribe again (or put_nowait) myobj.putSubscription(q) assert myobj.subscription == q
The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.
- property subscription¶
Returns the currently active subscription:
q = asyncio.Queue() myobj.putSubscription(q) assert myobj.subscription == q myobj.stopSubscription() assert myobj.subscription is None myobj.put_nowait(data) assert myobj.subscription is None
- class rtcbot.camera.PiCamera(rotation=0, **kwargs)[source]¶
Bases:
CVCamera
Instead of using OpenCV camera support, uses the picamera library for direct access to the Raspberry Pi’s CSI camera.
The interface is identical to CVCamera. When testing code on a desktop computer, it can be useful to have the code automatically choose the correct camera:
try: import picamera # picamera import will fail if not on pi cam = PiCamera() except ImportError: cam = CVCamera()
This enables simple drop-in replacement between the two.
- close()¶
Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.
The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.
- property closed¶
Returns whether the object was closed. This includes both thrown exceptions, and clean exits.
- property error¶
If there is an error that causes the underlying process to crash, this property will hold the actual
Exception
that was thrown:if myobject.error is not None: print("Oh no! There was an error:",myobject.error)
This property is offered for convenience, but usually, you will want to subscribe to the error by using
onError()
, which will notify your app when the issue happens.Note
If the error is not None, the object is considered crashed, and no longer processing data.
- async get()¶
Behaves similarly to
subscribe().get()
. On the first call, creates a default subscription, and all subsequent calls toget()
use that subscription.If
unsubscribe()
is called, the subscription is deleted, so a subsequent call toget()
will create a new one:data = await myobj.get() # Creates subscription on first call data = await myobj.get() # Same subscription myobj.unsubscribe() data2 = await myobj.get() # A new subscription
The above code is equivalent to the following:
defaultSubscription = myobj.subscribe() data = await defaultSubscription.get() data = await defaultSubscription.get() myobj.unsubscribe(defaultSubscription) newDefaultSubscription = myobj.subscribe() data = await newDefaultSubscription.get()
- onClose(subscription=None)¶
This is mainly useful for connections - they can be closed remotely. This allows handling the close event.
@myobj.onClose def closeCallback(): print("Closed!)
Be aware that this is equivalent to explicitly awaiting the object:
await myobj
- onError(subscription=None)¶
Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.
To catch these errors, when an unhandled exception happens, the error event is fired, with the associated
Exception
. This function allows you to subscribe to these events:@myobj.onError def error_happened(err): print("Crap, stuff just crashed: ",err)
The
onError()
function behaves in the same way as asubscribe()
, which means that you can pass it a coroutine, or even directly await it:err = await myobj.onError()
- onReady(subscription=None)¶
Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.
This function allows you to listen for this event:
@myobj.onReady def readyCallback(): print("Ready!)
The function works in exactly the same way as a
subscribe()
, meaning that you can pass it a coroutine, or even await it directly:await myobj.onReady()
Note
The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.
- property ready¶
This is True when the class has been fully initialized, and is ready to process data:
if not myobject.ready: print("Not ready to process data")
This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the
onReady()
function, which will allow you to set up a callback/coroutine to wait until initialized.Note
You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.
- subscribe(subscription=None)¶
Subscribe to new frames as they come in. By default returns a MostRecentSubscription object, which can be awaited to get the most recent frame, and skips missed frames.
Note that all subscribers get the same frame data numpy array, so if you are going to modify the values of the array itself, please do so in a copy!:
# Set up a camera and subscribe to new frames cam = CVCamera() subs = cam.subscribe() async def mytask(): # Wait for the next frame myframe = await subs.get() # Do stuff with the frame
If you want to have a different subscription type, you can pass anything which has a put_nowait method, which is called each time a frame comes in:
subs = cam.subscribe(asyncio.Queue()) # asyncio queue has a put_nowait method await subs.get()
- unsubscribe(subscription=None)¶
Removes the given subscription, so that it no longer gets updated:
subs = myobj.subscribe() myobj.unsubscribe(subs)
If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.
- Parameters
subscription (optional) – Anything that was passed into/returned from
subscribe()
.
- unsubscribeAll()¶
Removes all currently active subscriptions, including the default one if it was intialized.
- class rtcbot.camera.PiCamera2(hflip=False, vflip=False, **kwargs)[source]¶
Bases:
CVCamera
Instead of using OpenCV camera support, uses the picamera2 library for direct access to the Raspberry Pi’s CSI camera.
The interface is identical to CVCamera. When testing code on a desktop computer, it can be useful to have the code automatically choose the correct camera:
try: import picamera2 # picamera2 import will fail if not on pi cam = PiCamera2() except ImportError: cam = CVCamera()
This enables simple drop-in replacement between the two.
You can use the parameter hflip=True to flip the camera horizontally, vflip=True to flip vertically, or both to rotate 180 degrees.
- close()¶
Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.
The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.
- property closed¶
Returns whether the object was closed. This includes both thrown exceptions, and clean exits.
- property error¶
If there is an error that causes the underlying process to crash, this property will hold the actual
Exception
that was thrown:if myobject.error is not None: print("Oh no! There was an error:",myobject.error)
This property is offered for convenience, but usually, you will want to subscribe to the error by using
onError()
, which will notify your app when the issue happens.Note
If the error is not None, the object is considered crashed, and no longer processing data.
- async get()¶
Behaves similarly to
subscribe().get()
. On the first call, creates a default subscription, and all subsequent calls toget()
use that subscription.If
unsubscribe()
is called, the subscription is deleted, so a subsequent call toget()
will create a new one:data = await myobj.get() # Creates subscription on first call data = await myobj.get() # Same subscription myobj.unsubscribe() data2 = await myobj.get() # A new subscription
The above code is equivalent to the following:
defaultSubscription = myobj.subscribe() data = await defaultSubscription.get() data = await defaultSubscription.get() myobj.unsubscribe(defaultSubscription) newDefaultSubscription = myobj.subscribe() data = await newDefaultSubscription.get()
- onClose(subscription=None)¶
This is mainly useful for connections - they can be closed remotely. This allows handling the close event.
@myobj.onClose def closeCallback(): print("Closed!)
Be aware that this is equivalent to explicitly awaiting the object:
await myobj
- onError(subscription=None)¶
Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.
To catch these errors, when an unhandled exception happens, the error event is fired, with the associated
Exception
. This function allows you to subscribe to these events:@myobj.onError def error_happened(err): print("Crap, stuff just crashed: ",err)
The
onError()
function behaves in the same way as asubscribe()
, which means that you can pass it a coroutine, or even directly await it:err = await myobj.onError()
- onReady(subscription=None)¶
Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.
This function allows you to listen for this event:
@myobj.onReady def readyCallback(): print("Ready!)
The function works in exactly the same way as a
subscribe()
, meaning that you can pass it a coroutine, or even await it directly:await myobj.onReady()
Note
The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.
- property ready¶
This is True when the class has been fully initialized, and is ready to process data:
if not myobject.ready: print("Not ready to process data")
This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the
onReady()
function, which will allow you to set up a callback/coroutine to wait until initialized.Note
You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.
- subscribe(subscription=None)¶
Subscribe to new frames as they come in. By default returns a MostRecentSubscription object, which can be awaited to get the most recent frame, and skips missed frames.
Note that all subscribers get the same frame data numpy array, so if you are going to modify the values of the array itself, please do so in a copy!:
# Set up a camera and subscribe to new frames cam = CVCamera() subs = cam.subscribe() async def mytask(): # Wait for the next frame myframe = await subs.get() # Do stuff with the frame
If you want to have a different subscription type, you can pass anything which has a put_nowait method, which is called each time a frame comes in:
subs = cam.subscribe(asyncio.Queue()) # asyncio queue has a put_nowait method await subs.get()
- unsubscribe(subscription=None)¶
Removes the given subscription, so that it no longer gets updated:
subs = myobj.subscribe() myobj.unsubscribe(subs)
If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.
- Parameters
subscription (optional) – Anything that was passed into/returned from
subscribe()
.
- unsubscribeAll()¶
Removes all currently active subscriptions, including the default one if it was intialized.