Source code for rtcbot.base.thread

import asyncio
import concurrent.futures
import logging
import threading

from .base import BaseSubscriptionConsumer, BaseSubscriptionProducer, SubscriptionClosed
from .events import threadedEventHandler


[docs]class ThreadedSubscriptionProducer(BaseSubscriptionProducer, threadedEventHandler): def __init__( self, defaultSubscriptionType=asyncio.Queue, logger=None, loop=None, daemonThread=True, ): threadedEventHandler.__init__(self, logger, loop) BaseSubscriptionProducer.__init__(self, defaultSubscriptionType, logger=logger) self._producerThread = threading.Thread(target=self._producer) self._producerThread.daemon = daemonThread self._producerThread.start() def _put_nowait(self, data): """ To be called by the producer thread to insert data. """ self._loop.call_soon_threadsafe(super()._put_nowait, data) def _producer(self): """ This is the function run in another thread. You override the function with your own logic. The base implementation is used for testing """ import queue self.testQueue = queue.Queue() self.testResultQueue = queue.Queue() # We are ready! self._setReady(True) while not self._shouldClose: # In real code, there should be a timeout in get to make sure _shouldClose is not True try: # WTF: why does timeout not work here? self._put_nowait(self.testQueue.get(timeout=3)) except queue.Empty: pass self.testResultQueue.put("<<END>>") self._setReady(False) def _close(self): """ Can be called by the external thread to close in a threadsafe manner """ self._loop.call_soon_threadsafe(super()._close)
[docs] def close(self): """ Shuts down data gathering, and closes all subscriptions. Note that it is not recommended to call this in an async function, since it waits until the background thread joins. The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program. """ super().close() self._producerThread.join()
[docs]class ThreadedSubscriptionConsumer(BaseSubscriptionConsumer, threadedEventHandler): def __init__( self, directPutSubscriptionType=asyncio.Queue, logger=None, loop=None, daemonThread=True, ): threadedEventHandler.__init__(self, logger, loop) BaseSubscriptionConsumer.__init__( self, directPutSubscriptionType, logger=logger ) if logger is None: self.__sclog = logging.getLogger(self.__class__.__name__).getChild( "ThreadedSubscriptionConsumer" ) else: self.__sclog = logger.getChild("ThreadedSubscriptionConsumer") self._taskLock = threading.Lock() self._consumerThread = threading.Thread(target=self._consumer) self._consumerThread.daemon = daemonThread self._consumerThread.start() def _get(self): """ This is not a coroutine - it is to be called in the worker thread. If the worker thread is to be shut down, raises a SubscriptionClosed exception. """ timedout = False while not self._shouldClose: with self._taskLock: # Only create a new task if it was finished, and did not time out if not timedout: self._getTask = asyncio.run_coroutine_threadsafe( self._subscription.get(), self._loop ) timedout = False try: return self._getTask.result(1) except (asyncio.CancelledError, concurrent.futures.CancelledError): self.__sclog.debug("Subscription cancelled - checking for new tasks") except (asyncio.TimeoutError, concurrent.futures.TimeoutError): self.__sclog.debug("No incoming data for 1 second...") timedout = True except SubscriptionClosed: self.__sclog.debug( "Incoming stream closed... Checking for new subscription" ) self.__sclog.debug( "close() was called on the aio thread. raising SubscriptionClosed." ) raise SubscriptionClosed("ThreadedSubscriptionConsumer has been closed") def _consumer(self): """ This is the function that is to be overloaded by the superclass to read data. It is run in a separate thread. It should call self._get() to get the next datapoint coming from a subscription. The default implementation is used for testing """ import queue self.testQueue = queue.Queue() # We are ready! self._setReady(True) try: while True: data = self._get() self.testQueue.put(data) except SubscriptionClosed: self.testQueue.put("<<END>>") self._setReady(False)
[docs] def putSubscription(self, subscription): with self._taskLock: super().putSubscription(subscription)
def _close(self): """ Can be called by the external thread to close in a threadsafe manner """ self._loop.call_soon_threadsafe(super()._close)
[docs] def close(self): """ The object is meant to be used as a singleton, which is initialized at the start of your code, and is closed when exiting the program. Make sure to run close on exit, since sometimes Python has trouble exiting from multiple threads without having them closed explicitly. """ with self._taskLock: super().close() self._consumerThread.join()