Source code for safecor._inputs_daemon
""" \author Tristan Israël """
import glob
import threading
import serial
import time
import struct
import msgpack
try:
from evdev import InputDevice, ecodes, InputEvent
except ImportError as e:
print("The package evdev is not available. Functionalities will be missing.")
class InputDevice:
""" This is a fake InputDevice due to missing dependancy"""
from . import SysLogger, Mouse, SingletonMeta, InputType
from . import MqttClient, Constants
INPUT_EVENT_FORMAT = "HHI" # HH = type, code, I = value (unsigned int)
INPUT_EVENT_SIZE = struct.calcsize(INPUT_EVENT_FORMAT)
[docs]
class InputsDaemon(metaclass=SingletonMeta):
""" This class monitors mouse, touch device and keyboard inputs and serialize filtered information through the XenBus.
The communication channel (pvchan) *inputs* is used between sys-usb and the Dom0.
**This class is not intended to be used in a project, it is directly managed by the core**.
Mouse
=====
Information on mouse position and buttons are all transmitted each time an event occurs. It means that the
information are sent as a data structure containing x and y position and the buttons state.
"""
#__mouse = Mouse()
__xenbus_socket_path = None
#__xenbus_iface_ready = False
__xenbus_socket = None
__monitored_inputs = []
#__mouse_max_x = 1
#__mouse_max_y = 1
#__last_x = 0
#__last_y = 0
__can_run = True
#__mqtt_client = None
[docs]
def start(self, mqtt_client: MqttClient):
""" Starts the inputs daemon
As soon as it starts, the daemon looks for mouses, touchscreens and keyboards.
When it finds a device it starts monitoring it in a thread and serializes all
input events on the pv channel ``inputs``.
"""
#self.__mqtt_client = mqtt_client
SysLogger("Input Daemon").info("Starting input daemon")
self.__can_run = True
# Start XenBus messaging
self.__xenbus_socket_path = Constants.DOMU_INPUT_SOCKET_FILEPATH
if not self.__connect_xenbus():
return
# Start inputs listeners
threading.Thread(target=self.__find_mouse).start()
threading.Thread(target=self.__find_touchscreen).start()
threading.Thread(target=self.__find_keyboard).start()
[docs]
def stop(self):
""" Stops the inputs daemon """
self.__can_run = False
self.__disconnect_xenbus()
def __find_mouse(self):
# This function is ran in a specific thread
SysLogger("Input Daemon").info("Looking for a mouse...")
while True:
inputs = glob.glob("/dev/input/event*")
for input_ in inputs:
#Logger().debug("Fichier {}".format(input))
try:
dev = InputDevice(input_)
type_input = self.__input_type(dev)
if type_input == InputType.MOUSE and input_ not in self.__monitored_inputs:
threading.Thread(target= self.__monitor_mouse, args=(dev,)).start()
self.__monitored_inputs.append(input_)
except Exception:
pass # Ignore all errors silently
# Wait a little and start over
time.sleep(0.5)
def __find_touchscreen(self):
SysLogger("Input Daemon").info("Looking for a touchscreen...")
while self.__can_run:
inputs = glob.glob("/dev/input/event*")
for input_ in inputs:
try:
dev = InputDevice(input_)
type_input = self.__input_type(dev)
if type_input == InputType.TOUCH and input_ not in self.__monitored_inputs:
# We get min and max horizontal values to determine the resolution
caps = dev.capabilities()
# We filter in cas the device does not have the required capabilities
if not any(t[0] == ecodes.ABS_MT_POSITION_X for t in caps[ecodes.EV_ABS]):
continue
#self.__mouse_max_x = caps[ecodes.EV_ABS][ecodes.ABS_X][1].max
#self.__mouse_max_y = caps[ecodes.EV_ABS][ecodes.ABS_Y][1].max
#print("max_x={}, max_y={}".format(self.mouse.max_x, self.mouse.max_y))
threading.Thread(target= self.__monitor_touchscreen, args=(dev,)).start()
self.__monitored_inputs.append(input_)
except Exception as e:
SysLogger("Input Daemon").warn(f"Error during touchscreen search. {e}")
pass # Ignore all errors silently
# Wait a little and start over
time.sleep(0.5)
def __serialize_event(self, type_input, event):
""" Serializes an event as packed bytes in order to transmit. """
# event.type : unsigned 16bit
# event.code : unsigned 16bit
# event.value : signed 32bit
payload = msgpack.packb([type_input, event.type, event.code, event.value])
return payload +b'\n'
def __monitor_mouse(self, mouse):
SysLogger("Input Daemon").info(f"Monitor the mouse {mouse.name}")
try:
for event in mouse.read_loop():
if event.type in [ecodes.EV_KEY, ecodes.EV_REL, ecodes.EV_ABS, ecodes.EV_SYN]:
serialized = self.__serialize_event(InputType.MOUSE, event)
self.__xenbus_socket.write(serialized)
#print(f"Sent: {serialized.strip()}")
if not self.__can_run:
return
except Exception:
SysLogger("Input Daemon").warn(f"The mouse {mouse.name} is not available anymore")
self.__monitored_inputs.remove(mouse.path)
def __monitor_touchscreen(self, touch):
SysLogger("Input Daemon").info(f"Monitor the touchscreen {touch.name}")
""" filtered_events = [
ecodes.EV_KEY,
ecodes.EV_MSC,
ecodes.EV_ABS,
ecodes.EV_SYN,
ecodes.ABS_MT_SLOT,
ecodes.ABS_MT_POSITION_X,
ecodes.ABS_MT_POSITION_Y,
ecodes.ABS_MT_TRACKING_ID
] """
try:
for event in touch.read_loop():
serialized = self.__serialize_event(InputType.TOUCH, event)
self.__xenbus_socket.write(serialized)
#print(f"Sent: {serialized.strip()}")
if not self.__can_run:
return
except Exception:
SysLogger("Input Daemon").warn(f"The touchscreen {touch.name} is not available anymore")
self.__monitored_inputs.remove(touch.path)
def __find_keyboard(self):
# This function is ran in a specific thread
SysLogger("Input Daemon").info("Looking for a keyboard...")
while True:
inputs = glob.glob("/dev/input/event*")
for input_ in inputs:
#Logger().debug("Fichier {}".format(input))
try:
dev = InputDevice(input_)
type_input = self.__input_type(dev)
if type_input == InputType.KEYBOARD and input_ not in self.__monitored_inputs:
threading.Thread(target= self.__monitor_keyboard, args=(dev,)).start()
self.__monitored_inputs.append(input_)
except Exception:
pass # Ignore all errors silently
# Wait a little and start over
time.sleep(0.5)
def __monitor_keyboard(self, keyboard):
SysLogger("Input Daemon").info(f"Monitor the keyboard {keyboard.name}")
for event in keyboard.read_loop():
if (event.type == ecodes.EV_KEY and event.code in [ ecodes.KEY_ESC, ecodes.KEY_1, ecodes.KEY_2, ecodes.KEY_3, ecodes.KEY_4, ecodes.KEY_5, ecodes.KEY_6, ecodes.KEY_7, ecodes.KEY_8, ecodes.KEY_9, ecodes.KEY_0, ecodes.KEY_MINUS, ecodes.KEY_EQUAL, ecodes.KEY_BACKSPACE, ecodes.KEY_TAB, ecodes.KEY_Q, ecodes.KEY_W, ecodes.KEY_E, ecodes.KEY_R, ecodes.KEY_T, ecodes.KEY_Y, ecodes.KEY_U, ecodes.KEY_I, ecodes.KEY_O, ecodes.KEY_P, ecodes.KEY_LEFTBRACE, ecodes.KEY_RIGHTBRACE, ecodes.KEY_ENTER, ecodes.KEY_LEFTCTRL, ecodes.KEY_A, ecodes.KEY_S, ecodes.KEY_D, ecodes.KEY_F, ecodes.KEY_G, ecodes.KEY_H, ecodes.KEY_J, ecodes.KEY_K, ecodes.KEY_L, ecodes.KEY_SEMICOLON, ecodes.KEY_APOSTROPHE, ecodes.KEY_GRAVE, ecodes.KEY_LEFTSHIFT, ecodes.KEY_BACKSLASH, ecodes.KEY_Z, ecodes.KEY_X, ecodes.KEY_C, ecodes.KEY_V, ecodes.KEY_B, ecodes.KEY_N, ecodes.KEY_M, ecodes.KEY_COMMA, ecodes.KEY_DOT, ecodes.KEY_SLASH, ecodes.KEY_RIGHTSHIFT, ecodes.KEY_KPASTERISK, ecodes.KEY_LEFTALT, ecodes.KEY_SPACE, ecodes.KEY_CAPSLOCK, ecodes.KEY_F1, ecodes.KEY_F2, ecodes.KEY_F3, ecodes.KEY_F4, ecodes.KEY_F5, ecodes.KEY_F6, ecodes.KEY_F7, ecodes.KEY_F8, ecodes.KEY_F9, ecodes.KEY_F10, ecodes.KEY_NUMLOCK, ecodes.KEY_SCROLLLOCK, ecodes.KEY_KP7, ecodes.KEY_KP8, ecodes.KEY_KP9, ecodes.KEY_KPMINUS, ecodes.KEY_KP4, ecodes.KEY_KP5, ecodes.KEY_KP6, ecodes.KEY_KPPLUS, ecodes.KEY_KP1, ecodes.KEY_KP2, ecodes.KEY_KP3, ecodes.KEY_KP0, ecodes.KEY_KPDOT, ecodes.KEY_ZENKAKUHANKAKU, ecodes.KEY_102ND, ecodes.KEY_F11, ecodes.KEY_F12, ecodes.KEY_RO, ecodes.KEY_KATAKANA, ecodes.KEY_HIRAGANA, ecodes.KEY_HENKAN, ecodes.KEY_KATAKANAHIRAGANA, ecodes.KEY_MUHENKAN, ecodes.KEY_KPJPCOMMA, ecodes.KEY_KPENTER, ecodes.KEY_RIGHTCTRL, ecodes.KEY_KPSLASH, ecodes.KEY_SYSRQ, ecodes.KEY_RIGHTALT, ecodes.KEY_HOME, ecodes.KEY_UP, ecodes.KEY_PAGEUP, ecodes.KEY_LEFT, ecodes.KEY_RIGHT, ecodes.KEY_END, ecodes.KEY_DOWN, ecodes.KEY_PAGEDOWN, ecodes.KEY_INSERT, ecodes.KEY_DELETE, ecodes.KEY_MUTE, ecodes.KEY_VOLUMEDOWN, ecodes.KEY_VOLUMEUP, ecodes.KEY_POWER, ecodes.KEY_KPEQUAL, ecodes.KEY_PAUSE, ecodes.KEY_KPCOMMA, ecodes.KEY_HANGUEL, ecodes.KEY_HANJA, ecodes.KEY_YEN, ecodes.KEY_LEFTMETA, ecodes.KEY_RIGHTMETA, ecodes.KEY_COMPOSE, ecodes.KEY_STOP, ecodes.KEY_AGAIN, ecodes.KEY_PROPS, ecodes.KEY_UNDO, ecodes.KEY_FRONT, ecodes.KEY_COPY, ecodes.KEY_OPEN, ecodes.KEY_PASTE, ecodes.KEY_FIND, ecodes.KEY_CUT, ecodes.KEY_HELP, ecodes.KEY_CALC, ecodes.KEY_SLEEP, ecodes.KEY_WWW, ecodes.KEY_SCREENLOCK, ecodes.KEY_BACK, ecodes.KEY_FORWARD, ecodes.KEY_EJECTCD, ecodes.KEY_NEXTSONG, ecodes.KEY_PLAYPAUSE, ecodes.KEY_PREVIOUSSONG, ecodes.KEY_STOPCD, ecodes.KEY_REFRESH, ecodes.KEY_EDIT, ecodes.KEY_SCROLLUP, ecodes.KEY_SCROLLDOWN, ecodes.KEY_KPLEFTPAREN, ecodes.KEY_KPRIGHTPAREN, ecodes.KEY_F13, ecodes.KEY_F14, ecodes.KEY_F15, ecodes.KEY_F16, ecodes.KEY_F17, ecodes.KEY_F18, ecodes.KEY_F19, ecodes.KEY_F20, ecodes.KEY_F21, ecodes.KEY_F22, ecodes.KEY_F23, ecodes.KEY_F24 ]
or (event.type == ecodes.EV_MSC and event.code == ecodes.MSC_SCAN)
):
serialized = self.__serialize_event(InputType.KEYBOARD, event)
self.__xenbus_socket.write(serialized)
#print(f"Sent: {serialized.strip()}")
if not self.__can_run:
return
def __input_type(self, input_:InputDevice):
caps = input_.capabilities()
keys = caps.get(ecodes.EV_KEY)
if keys is None:
return InputType.UNKNOWN
if ecodes.KEY_ESC in keys:
return InputType.KEYBOARD
elif ecodes.BTN_LEFT in keys:
return InputType.MOUSE
elif ecodes.BTN_TOUCH in keys:
return InputType.TOUCH
def __connect_xenbus(self) -> bool:
#Ouvre le flux avec la socket
SysLogger("Input Daemon").debug(f"Open Xenbus I/O channel {self.__xenbus_socket_path}")
try:
self.__xenbus_socket = serial.Serial(port= self.__xenbus_socket_path)
#self.__xenbus_iface_ready = True
SysLogger("Input Daemon").info("I/O channel is open")
return True
except serial.SerialException as e:
self.__xenbus_socket = None
SysLogger("Input Daemon").error(f"Impossible to open the serial port {self.__xenbus_socket_path}")
SysLogger("Input Daemon").error(str(e))
return False
def __disconnect_xenbus(self):
if self.__xenbus_socket is not None:
SysLogger("Input Daemon").info("Close Xenbus I/O socket")
self.__xenbus_socket.close()