Source code for safecor._disk_monitor

""" \author Tristan Israël """

import pyudev
from . import Logger, MqttClient, Topics, NotificationFactory, DiskState

[docs] class DiskMonitor(): """ This class monitors a folder in order to detect the addition and removal of files and folders. It works in polling mode because it needs to be compatible with XEN's 9pfs protocol which does not handle filesystem notifications. This class must be instanciated with the mount points directory that must be monitored (for instance ``/mnt``). Use case: Storage connection ============================ When an external storage is connected, a new mount point is automatically created by the system (see ``udev`` or ``mdev``) in the mount points directory. For instance, if the storage is named ``NO NAME``, the mount point will be ``/mnt/NO NAME``. In such a case, the notification ``DISK_STATE`` will be sent. .. seealso:: - :class:`Topics` Use case: Storage disconnection =============================== When an external storage is removed, the mount point is removed by the system in the mount points directory (``/mnt``). In such a case, the notification ``DISK_STATE`` will be sent. """ def __init__(self, folder:str, mqtt_client:MqttClient): Logger().setup("Disk monitor", mqtt_client) self.__disks = {} self.__folder = folder self.__mqtt_client = mqtt_client self.__is_started = False
[docs] def device_event(self, action, device): if action == 'add': if device.device_type == "partition": if device.get('ID_FS_LABEL'): mount_point = device.device_node disk_name = device.get('ID_FS_LABEL') Logger().info(f"USB Device connected. Partition detected: {mount_point} with name {disk_name}", "Disk monitor") self.__disks[mount_point] = disk_name payload = NotificationFactory.create_notification_disk_state(disk_name, DiskState.CONNECTED) self.__mqtt_client.publish(Topics.DISK_STATE, payload) elif action == 'remove': if device.device_type == "partition": mount_point = device.device_node if mount_point in self.__disks: Logger().info(f"USB Device disconnected on {device.device_node}", "Disk monitor") disk_name = self.__disks.pop(mount_point) payload = NotificationFactory.create_notification_disk_state(disk_name, DiskState.DISCONNECTED) self.__mqtt_client.publish(Topics.DISK_STATE, payload)
[docs] def start(self): Logger().debug(f"Starting disks monitoring on mount point {self.__folder}", "Disk monitor") context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) monitor.filter_by(subsystem='block') self.__is_started = True for device in iter(monitor.poll, None): if not self.__is_started: break self.device_event(device.action, device)
[docs] def stop(self): self.__is_started = False