Source code for rtcbot.inputs
from inputs import devices
import logging
import asyncio
from .base import ProcessSubscriptionProducer, SubscriptionClosed
# The messages to filter by default
defaultFilter = lambda x: x["code"] != "SYN_REPORT"
[docs]class InputDevice(ProcessSubscriptionProducer):
"""
A thin wrapper over :mod:`inputs`, which permits getting events in an asynchronous manner.
"""
_log = logging.getLogger("rtcbot.InputDevice")
def __init__(self, device, eventFilter=defaultFilter, loop=None):
self._device = device
self._eventFilter = eventFilter
# We use a joinTimeout of 0.1, because it is very likely to be blocked
super().__init__(asyncio.Queue, logger=self._log, loop=loop, joinTimeout=0.1)
def _producer(self):
"""
I really wish there were a non-blocking way to do this... As it stands,
this code relies on the daemon property of the thread to kill a blocked read call...
Nevertheless, it does work, on linux. It looks like inputs uses a process on macs,
which might not exit cleanly...
"""
self._setReady(True)
self._log.debug("Started listening to input device events")
while not self._shouldClose:
events = self._device.read()
for event in events:
e = {
"timestamp": event.timestamp,
"code": event.code,
"state": event.state,
"event": event.ev_type,
}
if self._eventFilter(e):
self._put_nowait(e)
self._setReady(False)
[docs]class Gamepad(InputDevice):
def __init__(self, eventFilter=defaultFilter, loop=None):
super().__init__(devices.gamepads[0], eventFilter=eventFilter, loop=loop)
[docs]class Mouse(InputDevice):
def __init__(self, eventFilter=defaultFilter, loop=None):
super().__init__(devices.mice[0], eventFilter=eventFilter, loop=loop)
[docs]class Keyboard(InputDevice):
def __init__(self, eventFilter=defaultFilter, loop=None):
super().__init__(devices.keyboards[0], eventFilter=eventFilter, loop=loop)