""" \author Tristan Israël """
from enum import StrEnum
import json
import threading
import select
import traceback
from typing import Literal, Callable, Optional
try:
import serial
except ImportError:
pass
import paho.mqtt.client as mqtt
from paho.mqtt.reasoncodes import ReasonCode
from paho.mqtt.properties import Properties
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
from . import SysLogger
DEBUG = False
TYPE_CHECKING = True
[docs]
class ConnectionType(StrEnum):
''' @brief Cette énumération permet d'identifier le type de connexion utilisée.
'''
UNIX_SOCKET = "unix_socket"
SERIAL_PORT = "serial_port"
TCP_DEBUG = "tcp"
class SerialSocket():
''' @brief Cette classe implémente une socket série.
La socket série est utilisée pour communiquer sur un port série dans un DomU
ou sur une socket de domaine UNIX sur le Dom0.
'''
def __init__(self, path:str, baudrate:int):
self.serial = serial.Serial(port=path, baudrate=baudrate, timeout=1.0, write_timeout=0)
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
def recv(self, buffer_size: int) -> bytes:
#print(f"Wants to read {buffer_size} bytes")
if self.serial is not None and self.serial.is_open:
data = self.serial.read(buffer_size)
#print(f"Read {len(data)} bytes")
return data
return b''
def in_waiting(self) -> int:
return self.serial.in_waiting
def pending(self) -> int:
if self.serial is not None:
return self.serial.out_waiting
return 0
def send(self, buffer: bytes) -> int:
if self.serial is not None and self.serial.is_open:
#print("send:{}".format(buffer))
sent = self.serial.write(buffer)
return sent if sent is not None else 0
return 0
def close(self) -> None:
traceback.print_stack()
if self.serial is not None and self.serial.is_open:
self.serial.reset_input_buffer()
self.serial.reset_output_buffer()
self.serial.close()
def fileno(self) -> int:
return self.serial.fileno()
def setblocking(self, flag: bool) -> None:
pass
[docs]
class SerialMQTTClient(mqtt.Client):
''' @brief Cette classe permet au client MQTT de communiquer sur un port série.
'''
on_connection_lost: Optional[Callable[[], None]] = None
def __init__(self, path:str, baudrate:int, *args, **kwargs):
super().__init__(callback_api_version=CallbackAPIVersion.VERSION2, *args, **kwargs)
self.path = path
self.baudrate = baudrate
self._sock = None
[docs]
def disconnect(self, reasoncode: ReasonCode | None = None, properties: Properties | None = None):
self._send_disconnect(reasoncode, properties)
self.__do_loop()
[docs]
def close(self):
self.disconnect()
self._sock_close()
#if self.sock_ is not None:
# self.sock_.close()
[docs]
def loop_start(self) -> MQTTErrorCode:
self._thread_terminate = False
self._thread = threading.Thread(target=self.__do_loop)
self._thread.daemon = True
self._thread.start()
return MQTTErrorCode.MQTT_ERR_SUCCESS
def __do_loop(self):
rc = MQTTErrorCode.MQTT_ERR_SUCCESS
timeout = 1.0
while not self._thread_terminate:
if self._sock is None:
SysLogger("MQTT client").warn("No socket found, exiting loop")
if self.on_connection_lost is not None:
self.on_connection_lost()
return
# if bytes are pending do not wait in select
pending_bytes = self._sock.pending()
if pending_bytes > 0:
timeout = 0.0
rlist, _, _ = select.select([self._sock], [], [], timeout)
if rlist and self._sock.in_waiting() > 0:
rc = self.loop_read()
if rc != MQTTErrorCode.MQTT_ERR_SUCCESS:
SysLogger("MQTT client").warn(f"Read error {rc}")
break
if self.want_write():
rc = self.loop_write()
if rc != MQTTErrorCode.MQTT_ERR_SUCCESS:
SysLogger("MQTT client").warn(f"Write error {rc}")
break
rc = self.loop_misc()
if rc != MQTTErrorCode.MQTT_ERR_SUCCESS:
SysLogger("MQTT client").warn(f"Misc error {rc}")
if rc == MQTTErrorCode.MQTT_ERR_CONN_LOST:
SysLogger("MQTT client").warn("Connection lost.")
if self.on_connection_lost is not None:
self.on_connection_lost()
break
#time.sleep(0.2)
[docs]
def loop_stop(self) -> MQTTErrorCode:
self._thread_terminate = True
if self._thread is not None:
self._thread.join()
return MQTTErrorCode.MQTT_ERR_SUCCESS
return MQTTErrorCode.MQTT_ERR_UNKNOWN
def _create_socket(self):
try:
self._sock = SerialSocket(self.path, self.baudrate)
self._sockpairR = self._sock
return self._sock
except Exception as e:
SysLogger("MQTT client").warn("An error occured while opening the serial port")
SysLogger("MQTT client").warn(str(e))
return None
[docs]
class MqttClient():
''' @brief Cette classe permet d'échanger des messages MQTT au sein du système.
'''
connection_type:ConnectionType = ConnectionType.UNIX_SOCKET
connection_string:str = ""
identifier:str = "unknown"
on_connected: Optional[Callable[[], None]] = None
on_message: Optional[Callable[[str, dict], None]] = None
on_subscribed: Optional[Callable[[], None]] = None
on_log: Optional[Callable[[int, str], None]] = None
__message_callbacks = []
__connected_callbacks = []
connected = False
is_starting = False
__debugging = False
def __init__(self, identifier:str, connection_type:ConnectionType = ConnectionType.UNIX_SOCKET, connection_string:str = "", debugging = False):
self.mqtt_client = None
self.identifier = identifier
self.connection_type = connection_type
self.connection_string = connection_string
self.__debugging = debugging
self.__subscriptions = []
def __del__(self):
self.stop()
[docs]
def start(self):
if self.is_starting or self.connected:
return
self.is_starting = True
SysLogger("MQTT client").info(f"Starting MQTT client {self.identifier}")
if self.connection_type != ConnectionType.SERIAL_PORT:
self.mqtt_client = mqtt.Client(
callback_api_version=CallbackAPIVersion.VERSION2,
client_id=self.identifier,
transport=self.__get_transport_type(),
reconnect_on_failure=True
)
self.mqtt_client.on_connect = self.__on_connected
self.mqtt_client.on_message = self.__on_message
self.mqtt_client.on_subscribe = self.__on_subscribe
self.mqtt_client.on_disconnect = self.__on_disconnected
if self.__debugging:
self.mqtt_client.on_log = self.__on_log
mqtt_host = "undefined"
try:
if self.connection_type == ConnectionType.TCP_DEBUG:
mqtt_host = "localhost"
self.mqtt_client.connect(host=mqtt_host, keepalive=30)
elif self.connection_type == ConnectionType.UNIX_SOCKET:
mqtt_host = self.connection_string
self.mqtt_client.connect(host=mqtt_host, port=1, keepalive=30)
else:
SysLogger("MQTT client").error(f"The connection type {self.connection_type} is not handled")
return
except Exception as e:
SysLogger("MQTT client").warn(f"Could not connect to the MQTT broker on {mqtt_host}")
SysLogger("MQTT client").warn(str(e))
return
self.mqtt_client.loop_start()
elif self.connection_type == ConnectionType.SERIAL_PORT:
self.mqtt_client = SerialMQTTClient(
client_id=self.identifier,
path=self.connection_string,
baudrate=115200,
reconnect_on_failure=True
)
self.mqtt_client.on_connect = self.__on_connected
self.mqtt_client.on_message = self.__on_message
self.mqtt_client.on_disconnect = self.__on_disconnected
self.mqtt_client.on_subscribe = self.__on_subscribe
self.mqtt_client.on_connection_lost = self.__on_connection_lost
if self.__debugging:
self.mqtt_client.on_log = self.__on_log
if DEBUG:
self.mqtt_client.on_log = self.__on_log
self.mqtt_client.connect(host="localhost", port=1, keepalive=5)
self.mqtt_client.loop_start()
else:
SysLogger("MQTT client").error(f"The connection type {self.connection_type} is not handled")
return
[docs]
def add_connected_callback(self, callback):
self.__connected_callbacks.append(callback)
[docs]
def add_message_callback(self, callback):
self.__message_callbacks.append(callback)
[docs]
def del_message_callback(self, callback):
self.__message_callbacks.remove(callback)
[docs]
def reset_message_callbacks(self):
self.__message_callbacks.clear()
[docs]
def stop(self):
if self.mqtt_client is not None:
#print("Quit Mqtt client")
try:
self.mqtt_client.disconnect()
self.mqtt_client.loop_stop()
if self.connection_type == ConnectionType.SERIAL_PORT:
self.mqtt_client.close()
except:
#Ignore exceptions when closing
pass
finally:
self.connected = False
self.is_starting = False
[docs]
def subscribe(self, topic:str) -> tuple[MQTTErrorCode, int | None]:
#print(f"Subscribed to {topic}")
self.__subscriptions.append(topic)
return self.mqtt_client.subscribe(topic)
[docs]
def unsubscribe(self, topic:str):
""" Unsubscribes a client from a specific topic """
#print(f"Unsubscribed from {topic}")
self.__subscriptions.remove(topic)
return self.mqtt_client.unsubscribe(topic)
[docs]
def unsubscribe_all(self):
""" Unsubscribes a client from all topics """
for topic in self.__subscriptions:
self.unsubscribe(topic)
[docs]
def publish(self, topic:str, payload:dict):
data = json.dumps(payload, ensure_ascii=True)
#print(data)
#print(len(data))
self.mqtt_client.publish(topic=topic, payload=data)
def __get_transport_type(self) -> Literal['tcp', 'unix']:
if self.connection_type == ConnectionType.TCP_DEBUG:
return "tcp"
else:
return "unix"
def __on_log(self, client, userdata, level, buf):
if self.on_log is not None:
self.on_log(level, buf)
def __on_message(self, client:mqtt.Client, userdata, msg:mqtt.MQTTMessage):
try:
payload = json.loads(msg.payload.decode("utf-8"))
if self.on_message is not None:
self.on_message(msg.topic, payload)
for cb in self.__message_callbacks:
cb(msg.topic, payload)
except Exception as e:
SysLogger("MQTT client").warn("[MQTT Client] Uncaught Exception when handling message:")
SysLogger("MQTT client").warn(f"Topic : {msg.topic}")
SysLogger("MQTT client").warn(f"Payload : {msg.payload}")
SysLogger("MQTT client").warn(f"Exception : {e}")
SysLogger("MQTT client").warn("** Notice that the error may come from the client callback **")
def __on_connected(self, client:mqtt.Client, userdata, connect_flags, reason_code, properties):
SysLogger("MQTT client").info("Connected to the MQTT broker")
self.connected = True
self.is_starting = False
if self.on_connected is not None:
self.on_connected()
for cb in self.__connected_callbacks:
cb()
def __on_subscribe(self, client, userdata, mid, reason_code_list, properties):
#print("Subscribed to a topic")
if self.on_subscribed is not None:
self.on_subscribed(mid)
def __on_connection_lost(self):
SysLogger("MQTT client").warn("Connection lost, trying to reconnect.")
self.is_starting = False
self.connected = False
self.start()
def __on_disconnected(self, *args):
SysLogger("MQTT client").info("Disconnected from the broker")
#print("Arguments:")
#for arg in args:
# print(arg)
#print("Disconnect from the broker")
#self.mqtt_client.close()