Camera

The Camera API allows you to subscribe to video frames coming in from a webcam. To use this API, you will need to either have OpenCV installed (for use with CVCamera and CVDisplay), have picamera installed to use PiCamera.

To install OpenCV on Ubuntu 18.04 or Raspbian Buster, use the following command:

sudo apt-get install python3-opencv

On Raspbian Stretch or older Ubuntu, you can install it with:

sudo apt-get install python-opencv

If using Windows or Mac, it is recommended that you use Anaconda, and install OpenCV from there.

If on a Raspberry Pi, you don’t need OpenCV at all to use the official Pi Camera.

CVCamera

The CVCamera uses a webcam connected to your computer, and gathers video frames using OpenCV:

import asyncio
from rtcbot import CVCamera, CVDisplay

camera = CVCamera()
display = CVDisplay()

display.putSubscription(camera)

try:
    asyncio.get_event_loop().run_forever()
finally:
    camera.close()
    display.close()

The frames are gathered as BGR numpy arrays, so you can perform any OpenCV functions you’d like on them. For example, the following code shows the video in black and white:

import asyncio
from rtcbot import CVCamera, CVDisplay
import cv2

camera = CVCamera()
display = CVDisplay()


@camera.subscribe
def onFrame(frame):
    bwframe = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    display.put_nowait(bwframe)


try:
    asyncio.get_event_loop().run_forever()
finally:
    camera.close()
    display.close()

Warning

There is currently an issue with threading in OpenCV that makes CVDisplay not work on Mac.

PiCamera

This allows you to use the official raspberry pi camera. You can use it in exactly the same way as the OpenCV camera above, and it returns exactly the same data for the frame:

import asyncio
from rtcbot import PiCamera, CVDisplay

camera = PiCamera()
display = CVDisplay()

display.putSubscription(camera)

try:
    asyncio.get_event_loop().run_forever()
finally:
    camera.close()
    display.close()

This means that if not using CVDisplay, you don’t even need OpenCV installed to stream from you raspberry pi.

PiCamera2

This allows you to use the official raspberry pi camera, with libcamera stack (legacy camera interface disabled). This is default since Raspberry Pi OS bullseye, PiCamera2 also works with 64-bit OS. You can use the parameter hflip=1 to flip the camera horizontally, vflip=1 to flip vertically, or both to rotate 180 degrees. You can use it in exactly the same way as the OpenCV camera above, and it returns exactly the same data for the frame:

import asyncio
from rtcbot import PiCamera2, CVDisplay

camera = PiCamera2()
display = CVDisplay()

display.putSubscription(camera)

try:
    asyncio.get_event_loop().run_forever()
finally:
    camera.close()
    display.close()

This means that if not using CVDisplay, you don’t even need OpenCV installed to stream from you raspberry pi.

API

class rtcbot.camera.CVCamera(width=320, height=240, cameranumber=0, fps=30, preprocessframe=<function CVCamera.<lambda>>, loop=None)[source]

Bases: ThreadedSubscriptionProducer

Uses a camera supported by OpenCV.

When initializing, can give an optional function which preprocesses frames as they are read, and returns the modified versions thereof. Please note that the preprocessing happens synchronously in the camera capture thread, so any processing should be relatively fast, and should avoid pure python code due to the GIL. Numpy and openCV functions should be OK.

close()

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.

property closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

property error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

async get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

property ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

subscribe(subscription=None)[source]

Subscribe to new frames as they come in. By default returns a MostRecentSubscription object, which can be awaited to get the most recent frame, and skips missed frames.

Note that all subscribers get the same frame data numpy array, so if you are going to modify the values of the array itself, please do so in a copy!:

# Set up a camera and subscribe to new frames
cam = CVCamera()
subs = cam.subscribe()

async def mytask():

    # Wait for the next frame
    myframe = await subs.get()

    # Do stuff with the frame

If you want to have a different subscription type, you can pass anything which has a put_nowait method, which is called each time a frame comes in:

subs = cam.subscribe(asyncio.Queue()) # asyncio queue has a put_nowait method
await subs.get()
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters

subscription (optional) – Anything that was passed into/returned from subscribe().

unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.

class rtcbot.camera.CVDisplay(name=None, loop=None)[source]

Bases: BaseSubscriptionConsumer

Displays the frames in an openCV imshow window

Warning

Due to an issue with threading in OpenCV on Mac, CVDisplay does not work on Mac.

close()

Cleans up and closes the object.

property closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

property error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

putSubscription(subscription)

Given a subscription, such that await subscription.get() returns successive pieces of data, keeps reading the subscription forever:

q = asyncio.Queue() # an asyncio.Queue has a get() coroutine
myobj.putSubscription(q)

q.put_nowait(data)

Equivalent to doing the following in the background:

while True:
    myobj.put_nowait(await q.get())

You can replace a currently running subscription with a new one at any point in time:

q1 = asyncio.Queue()
myobj.putSubscription(q1)

assert myobj.subscription == q1

q2 = asyncio.Queue()
myobj.putSubscription(q2)

assert myobj.subscription == q2
put_nowait(data)

This function allows you to directly send data to the object, without needing to go through a subscription:

while True:
    data = get_data()
    myobj.put_nowait(data)

The put_nowait() method is the simplest way to process a new chunk of data.

Note

If there is currently an active subscription initialized through putSubscription(), it is immediately stopped, and the object waits only for put_nowait():

myobj.putSubscription(s)
myobj.put_nowait(mydata) # unsubscribes from s

assert myobj.subscription is None
property ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

stopSubscription()

Stops reading the current subscription:

q = asyncio.Queue()
myobj.putSubscription(q)

assert myobj.subscription == q

myobj.stopSubscription()

assert myobj.subscription is None

# You can then subscribe again (or put_nowait)
myobj.putSubscription(q)
assert myobj.subscription == q

The object is not affected, other than no longer listening to the subscription, and not processing new data until something is inserted.

property subscription

Returns the currently active subscription:

q = asyncio.Queue()
myobj.putSubscription(q)
assert myobj.subscription == q

myobj.stopSubscription()
assert myobj.subscription is None

myobj.put_nowait(data)
assert myobj.subscription is None
class rtcbot.camera.PiCamera(rotation=0, **kwargs)[source]

Bases: CVCamera

Instead of using OpenCV camera support, uses the picamera library for direct access to the Raspberry Pi’s CSI camera.

The interface is identical to CVCamera. When testing code on a desktop computer, it can be useful to have the code automatically choose the correct camera:

try:
    import picamera # picamera import will fail if not on pi
    cam = PiCamera()
except ImportError:
    cam = CVCamera()

This enables simple drop-in replacement between the two.

close()

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.

property closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

property error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

async get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

property ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

subscribe(subscription=None)

Subscribe to new frames as they come in. By default returns a MostRecentSubscription object, which can be awaited to get the most recent frame, and skips missed frames.

Note that all subscribers get the same frame data numpy array, so if you are going to modify the values of the array itself, please do so in a copy!:

# Set up a camera and subscribe to new frames
cam = CVCamera()
subs = cam.subscribe()

async def mytask():

    # Wait for the next frame
    myframe = await subs.get()

    # Do stuff with the frame

If you want to have a different subscription type, you can pass anything which has a put_nowait method, which is called each time a frame comes in:

subs = cam.subscribe(asyncio.Queue()) # asyncio queue has a put_nowait method
await subs.get()
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters

subscription (optional) – Anything that was passed into/returned from subscribe().

unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.

class rtcbot.camera.PiCamera2(hflip=False, vflip=False, **kwargs)[source]

Bases: CVCamera

Instead of using OpenCV camera support, uses the picamera2 library for direct access to the Raspberry Pi’s CSI camera.

The interface is identical to CVCamera. When testing code on a desktop computer, it can be useful to have the code automatically choose the correct camera:

try:
    import picamera2 # picamera2 import will fail if not on pi
    cam = PiCamera2()
except ImportError:
    cam = CVCamera()

This enables simple drop-in replacement between the two.

You can use the parameter hflip=True to flip the camera horizontally, vflip=True to flip vertically, or both to rotate 180 degrees.

close()

Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins.

The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program.

property closed

Returns whether the object was closed. This includes both thrown exceptions, and clean exits.

property error

If there is an error that causes the underlying process to crash, this property will hold the actual Exception that was thrown:

if myobject.error is not None:
    print("Oh no! There was an error:",myobject.error)

This property is offered for convenience, but usually, you will want to subscribe to the error by using onError(), which will notify your app when the issue happens.

Note

If the error is not None, the object is considered crashed, and no longer processing data.

async get()

Behaves similarly to subscribe().get(). On the first call, creates a default subscription, and all subsequent calls to get() use that subscription.

If unsubscribe() is called, the subscription is deleted, so a subsequent call to get() will create a new one:

data = await myobj.get() # Creates subscription on first call
data = await myobj.get() # Same subscription
myobj.unsubscribe()
data2 = await myobj.get() # A new subscription

The above code is equivalent to the following:

defaultSubscription = myobj.subscribe()
data = await defaultSubscription.get()
data = await defaultSubscription.get()
myobj.unsubscribe(defaultSubscription)
newDefaultSubscription = myobj.subscribe()
data = await newDefaultSubscription.get()
onClose(subscription=None)

This is mainly useful for connections - they can be closed remotely. This allows handling the close event.

@myobj.onClose
def closeCallback():
    print("Closed!)

Be aware that this is equivalent to explicitly awaiting the object:

await myobj
onError(subscription=None)

Since most data processing happens in the background, the object might encounter an error, and the data processing might crash. If there is a crash, the object is considered dead, and no longer gathering data.

To catch these errors, when an unhandled exception happens, the error event is fired, with the associated Exception. This function allows you to subscribe to these events:

@myobj.onError
def error_happened(err):
    print("Crap, stuff just crashed: ",err)

The onError() function behaves in the same way as a subscribe(), which means that you can pass it a coroutine, or even directly await it:

err = await myobj.onError()
onReady(subscription=None)

Creating the class does not mean that the object is ready to process data. When created, the object starts an initialization procedure in the background, and once this procedure is complete, and any spawned background workers are ready to process data, it fires a ready event.

This function allows you to listen for this event:

@myobj.onReady
def readyCallback():
    print("Ready!)

The function works in exactly the same way as a subscribe(), meaning that you can pass it a coroutine, or even await it directly:

await myobj.onReady()

Note

The object will automatically handle any subscriptions or inserts that happen while it is initializing, so you generally don’t need to worry about the ready event, unless you need exact control.

property ready

This is True when the class has been fully initialized, and is ready to process data:

if not myobject.ready:
    print("Not ready to process data")

This property is offered for convenience, but if you want to be notifed when ready to process data, you will want to use the onReady() function, which will allow you to set up a callback/coroutine to wait until initialized.

Note

You usually don’t need to check the ready state, since all functions for getting/putting data will work even if the class is still starting up in the background.

subscribe(subscription=None)

Subscribe to new frames as they come in. By default returns a MostRecentSubscription object, which can be awaited to get the most recent frame, and skips missed frames.

Note that all subscribers get the same frame data numpy array, so if you are going to modify the values of the array itself, please do so in a copy!:

# Set up a camera and subscribe to new frames
cam = CVCamera()
subs = cam.subscribe()

async def mytask():

    # Wait for the next frame
    myframe = await subs.get()

    # Do stuff with the frame

If you want to have a different subscription type, you can pass anything which has a put_nowait method, which is called each time a frame comes in:

subs = cam.subscribe(asyncio.Queue()) # asyncio queue has a put_nowait method
await subs.get()
unsubscribe(subscription=None)

Removes the given subscription, so that it no longer gets updated:

subs = myobj.subscribe()
myobj.unsubscribe(subs)

If no argument is given, removes the default subscription created by get(). If none exists, then does nothing.

Parameters

subscription (optional) – Anything that was passed into/returned from subscribe().

unsubscribeAll()

Removes all currently active subscriptions, including the default one if it was intialized.