""" \author Tristan Israël """
import time
import threading
import os
import base64
import zlib
import unicodedata
import subprocess
from pathlib import Path
from queue import Queue
from . import Constants, MqttClient, Topics, MqttHelper
from . import FileHelper, ResponseFactory, ComponentState
try:
from . import DiskMonitor
except ImportError:
print("SysUsbController cannot use DiskMonitor")
from . import Logger, NotificationFactory, DiskState
try:
from . import InputsDaemon
NO_INPUTS_MONITORING = False
except ImportError:
NO_INPUTS_MONITORING = True
print("Importing ControleurVmSysUsb without inputs monitoring nor benchmarking capacity")
[docs]
class SysUsbController():
""" This class handles messages exchanged between sys-usb and the Dom0 or the other Domains"""
__nb_mqtt_conn = 0
__can_run = True
__messages_queue = Queue()
__read_files_queue = Queue()
__copy_files_queue = Queue()
__high_priority_messages = [ Topics.LIST_DISKS, Topics.LIST_FILES, Topics.DISCOVER_COMPONENTS, Topics.PING, Topics.SYS_USB_CLEAR_QUEUES, Topics.MOUNT_FILE ]
def __init__(self, mqtt_client:MqttClient):
self.mqtt_client = mqtt_client
self.__disk_monitor = None
self.__thread_read_files = threading.Thread(target=self.__read_files_worker, name="read_file_worker")
self.__thread_copy_files = threading.Thread(target=self.__copy_files_worker, name="copy_file_worker")
self.__thread_messages = threading.Thread(target=self.__message_worker, name="message_worker")
def __del__(self):
self.__can_run = False
[docs]
def start(self):
self.mqtt_client.on_connected = self.__on_mqtt_connected
self.mqtt_client.on_message = self.__on_mqtt_message
self.mqtt_client.start()
self.__can_run = True
[docs]
def stop(self):
self.__can_run = False
InputsDaemon().stop()
def __on_mqtt_connected(self):
Logger().setup("USB controller", self.mqtt_client)
Logger().debug("Starting Safecor disk controller")
self.mqtt_client.subscribe(f"{Topics.LIST_DISKS}/request")
self.mqtt_client.subscribe(f"{Topics.LIST_FILES}/request")
self.mqtt_client.subscribe(f"{Topics.COPY_FILE}/request")
self.mqtt_client.subscribe(f"{Topics.READ_FILE}/request")
self.mqtt_client.subscribe(f"{Topics.DELETE_FILE}/request")
self.mqtt_client.subscribe(f"{Topics.MOUNT_FILE}/request")
self.mqtt_client.subscribe(f"{Topics.UNMOUNT}/request")
#self.mqtt_client.subscribe(f"{Topics.BENCHMARK}/request")
self.mqtt_client.subscribe(f"{Topics.FILE_FINGERPRINT}/request")
self.mqtt_client.subscribe(f"{Topics.CREATE_FILE}/request")
self.mqtt_client.subscribe(f"{Topics.DISCOVER_COMPONENTS}/request")
self.mqtt_client.subscribe(f"{Topics.PING}/request")
self.mqtt_client.subscribe(f"{Topics.SYS_USB_CLEAR_QUEUES}/request")
# Démarrage de la surveillance des entrées
if not NO_INPUTS_MONITORING:
InputsDaemon().start(self.mqtt_client)
#ControleurBenchmark().setup(self.mqtt_client)
# Start threads
self.__thread_read_files.start()
self.__thread_copy_files.start()
self.__thread_messages.start()
self.__disk_monitor = DiskMonitor(Constants.USB_MOUNT_POINT, self.mqtt_client)
threading.Thread(target=self.__disk_monitor.start).start()
self.__handle_discover_components()
def __on_mqtt_message(self, topic:str, payload:dict):
#Logger().debug("Message received : topic={}, payload={}".format(topic, payload))
base_topic, _ = topic.rsplit("/", 1)
# Some messages have a high priority, we don't put them into the queue
# These tasks should only be small tasks, otherwise they go in the queue
if base_topic in self.__high_priority_messages:
threading.Thread(target=self.__handle_message(base_topic, payload)).start()
return
self.__messages_queue.put((base_topic, payload))
def __message_worker(self):
#Logger().debug("Handle message {}".format(topic))
while self.__can_run:
if not self.__messages_queue.empty():
message = self.__messages_queue.get() # We get a tuple
topic = message[0]
payload = message[1]
self.__handle_message(topic, payload)
time.sleep(0.1)
####
# Traitement des commandes
#
def __handle_message(self, topic:str, payload:dict):
if topic == Topics.LIST_DISKS:
self.__handle_list_disks(topic)
elif topic == Topics.LIST_FILES:
self.__handle_list_files(topic, payload)
elif topic == Topics.COPY_FILE:
self.__handle_copy_file(topic, payload)
elif topic == Topics.READ_FILE:
self.__handle_read_file(topic, payload)
#elif topic == Topics.DELETE_FILE:
# self.__handle_remove_file(topic, payload)
elif topic == Topics.BENCHMARK:
self.__handle_benchmark(topic, payload)
elif topic == Topics.FILE_FINGERPRINT:
self.__handle_file_fingerprint(topic, payload)
elif topic == Topics.CREATE_FILE:
self.__handle_create_file(topic, payload)
elif topic == Topics.DISCOVER_COMPONENTS:
self.__handle_discover_components()
elif topic == Topics.DELETE_FILE:
self.__handle_delete_file(payload)
elif topic == Topics.PING:
self.__handle_ping(payload)
elif topic == Topics.SYS_USB_CLEAR_QUEUES:
self.__handle_clear_queue()
elif topic == Topics.MOUNT_FILE:
self.__handle_mount_file(payload)
elif topic == Topics.UNMOUNT:
self.__handle_unmount(payload)
def __handle_list_disks(self, topic:str) -> None:
Logger().debug("Disks list requested")
# Get list of mount points
disks = FileHelper.get_disks_list()
#print(disks)
# Génère la réponse
response = ResponseFactory.create_response_disks_list(disks)
self.mqtt_client.publish(f"{topic}/response", response)
def __handle_list_files(self, topic:str, payload: dict) -> None:
# Vérifie les arguments de la commande
if not MqttHelper.check_payload(payload, ["disk", "recursive", "from_dir"]):
Logger().error(f"Missing arguments for {topic}")
return
disk = payload.get("disk", "")
recursive = payload.get("recursive", False)
from_dir = payload.get("from_dir", "")
if disk == Constants.STR_REPOSITORY:
return
# Récupère la liste des fichiers
fichiers = FileHelper.get_files_list(disk, recursive, from_dir)
# Génère la réponse
response = ResponseFactory.create_response_list_files(disk, fichiers)
self.mqtt_client.publish(f"{topic}/response", response)
def __handle_read_file(self, topic:str, payload: dict):
# La copie de fichier consiste à lire le fichier depuis le support externe
# et à le placer dans le dépôt du système.
# Une empreinte est calculée lors de la copie et transmise dans la réponse
# Cette fonction est réentrante, le système gère le nombre de copies parallèles
# en fonction des ressources disponibles.
# Les arguments de la commande sont :
# - nom_fichier au format disk:filename
# - nom_disque_destination
if not MqttHelper.check_payload(payload, ["disk", "filepath"]):
Logger().error(f"Missing argument(s) for the command {topic}")
return
source_disk = payload.get("disk", "")
filepath = payload.get("filepath", "")
repository_path:str = Constants.DOMU_REPOSITORY_PATH
#source_location = f"{Constants.USB_MOUNT_POINT}/{source_disk}"
mount_point = FileHelper.get_mount_point(source_disk)
#source_location = f"{Constants.USB_MOUNT_POINT}/{source_disk}"
source_fingerprint = FileHelper.calculate_fingerprint(f"{mount_point}/{filepath}")
dest_parent_path = Path(f"{repository_path}/{filepath}").parent
if not dest_parent_path.exists():
#print(f"Create directory {dest_parent_path} in {mount_point}")
os.makedirs(dest_parent_path.as_posix(), exist_ok= True)
self.__read_files_queue.put(
{
"source_location": mount_point,
"source_disk": source_disk,
"filepath": filepath,
"repository_path": repository_path,
"source_fingerprint": source_fingerprint
}
)
#self.task_runner.run_task(self.__do_read_file, args=(source_location, source_disk, filepath, repository_path, source_fingerprint,))
#def __handle_remove_file(self, topic:str, payload: dict):
#Logger().error("The command remove file is not implemented")
# Not implemented on sys-usb
# pass
def __handle_copy_file(self, topic:str, payload:dict):
if not MqttHelper.check_payload(payload, ["disk", "filepath", "destination"]):
Logger().error(f"Missing arguments for the request {topic}")
return
source_disk = payload.get("disk", "")
filepath = payload.get("filepath", "")
target_disk = payload.get("destination", "")
source_location = f"{Constants.USB_MOUNT_POINT}/{source_disk}"
destination_location = f"{Constants.USB_MOUNT_POINT}/{target_disk}"
try:
self.__copy_files_queue.put(
{
"source_location": source_location,
"source_disk": source_disk,
"filepath": filepath,
"target_disk": target_disk,
"destination_location": destination_location
}
)
#self.task_runner.run_task(self.__do_copy_file, args=(source_location, filepath, target_disk, destination_location,))
except Exception:
Logger().error(f"An error occured while copying the file {filepath} on {target_disk}")
#FichierHelper.copy_file(disk, filepath, destination_folder)
def __handle_benchmark(self, topic:str, payload:dict):
module = payload.get("module")
if module is None:
Logger().error(f"Argument missing: module. Topic is {topic}")
return
def __handle_file_fingerprint(self, topic:str, payload:dict):
disk = payload.get("disk")
filepath = payload.get("filepath")
if disk is not None and disk == Constants.STR_REPOSITORY:
# Ignored
return
if disk is None:
Logger().error(f"fArgument missing: disk. Topic is {topic}")
return
if filepath is None:
Logger().error(f"Argument missing: filepath. Topic is {topic}")
return
Logger().debug(f"Calculate fingerprint of the file {filepath} on the disk {disk}")
mount_point = Constants.USB_MOUNT_POINT
fingerprint = FileHelper.calculate_fingerprint(f"{mount_point}/{disk}/{filepath}")
Logger().info(f"The fingerprint of the file {disk} on the disk {filepath} is {fingerprint}")
response = ResponseFactory.create_response_file_fingerprint(filepath, disk, fingerprint)
self.mqtt_client.publish(f"{topic}/response", response)
def __handle_create_file(self, topic:str, payload:dict):
disk = payload.get("disk", "")
filepath = payload.get("filepath", "")
base64_data = payload.get("data", "")
if not MqttHelper.check_payload(payload, ["disk", "filepath", "data"]):
Logger().error("Missing argument in the create_file command")
return
if disk == Constants.STR_REPOSITORY:
# Ignored
return
decoded = base64.b64decode(base64_data)
# Check if data were compressed, and uncompress if needed
compressed = payload.get("compressed", False)
if compressed:
data = zlib.decompress(decoded)
else:
data = decoded
mount_point = Constants.USB_MOUNT_POINT
complete_filepath = f"{mount_point}/{disk}/{filepath}"
Logger().debug(f"Create a file {filepath} of size {len(data)} octets on disk {disk}")
try:
with open(complete_filepath, 'wb') as f:
f.write(data)
f.close()
except Exception as e:
Logger().error(f"An error occured while writing to file {complete_filepath}")
Logger().error(str(e))
response = ResponseFactory.create_response_create_file(filepath, disk, "", False)
self.mqtt_client.publish(f"{topic}/response", response)
return
# On envoie la notification de succès
fingerprint = FileHelper.calculate_fingerprint(complete_filepath)
response = ResponseFactory.create_response_create_file(complete_filepath, disk, fingerprint, True)
self.mqtt_client.publish(f"{topic}/response", response)
def __handle_discover_components(self) -> None:
comp1 = ResponseFactory.create_entry_component_state(Constants.SAFECOR_DISK_CONTROLLER, "System disk controller", "sys-usb", ComponentState.READY, "core")
comp2 = ResponseFactory.create_entry_component_state(Constants.SAFECOR_INPUT_CONTROLLER, "Input controller", "sys-usb", ComponentState.READY, "core")
#comp3 = ResponseFactory.create_entry_component_state(Constants.IO_BENCHMARK, "sys-usb", "System I/O benchmark", ComponentState.READY, "core")
payload = ResponseFactory.create_response_component_state([comp1, comp2])
self.mqtt_client.publish(f"{Topics.DISCOVER_COMPONENTS}/response", payload)
def __handle_delete_file(self, payload):
if not MqttHelper.check_payload(payload, ["disk", "filepath"]):
Logger().error(f"The command {Topics.DELETE_FILE} misses argument(s)", "sys-usb")
return
disk = payload["disk"]
if disk == Constants.STR_REPOSITORY:
# This file is the repository so we ignore it
return
filepath = payload["filepath"]
#mount_point = Constants.USB_MOUNT_POINT
mount_point = FileHelper.get_mount_point(disk)
storage_filepath = f"{mount_point}/{filepath}"
if not FileHelper().remove_file(storage_filepath):
Logger().error(f"Removal of file {filepath} from the disk {disk} failed")
else:
notif = NotificationFactory.create_notification_deleted_file(disk, filepath)
self.mqtt_client.publish(Topics.DELETED_FILE, notif)
Logger().info(f"Removed file {filepath} from the disk {disk}")
def __handle_ping(self, payload):
ping_id = payload.get("id", "")
data = payload.get("data", "")
sent_at = payload.get("sent_at", "")
payload = ResponseFactory.create_response_ping(ping_id, "sys-usb", data, sent_at)
self.mqtt_client.publish(f"{Topics.PING}/response", payload)
def __read_files_worker(self):
while self.__can_run:
if not self.__read_files_queue.empty():
next_file = self.__read_files_queue.get()
source_location = next_file.get("source_location", "")
source_disk = next_file.get("source_disk", "")
filepath = next_file.get("filepath", "")
repository_path = next_file.get("repository_path", "")
source_fingerprint = next_file.get("source_fingerprint", "")
self.__do_read_file(source_location, source_disk, filepath, repository_path, source_fingerprint)
time.sleep(0.1)
def __copy_files_worker(self):
while self.__can_run:
if not self.__copy_files_queue.empty():
next_file = self.__copy_files_queue.get()
source_location = next_file.get("source_location", "")
filepath = next_file.get("filepath", "")
target_disk = next_file.get("target_disk", "")
destination_location = next_file.get("destination_location", "")
self.__do_copy_file(source_location, filepath, target_disk, destination_location)
time.sleep(0.1)
def __do_read_file(self, source_location:str, source_disk:str, filepath:str, repository_path:str, source_fingerprint:str):
#self.__debug_threads()
dest_fingerprint = FileHelper.copy_file(source_location, filepath, repository_path, source_fingerprint)
if dest_fingerprint != "":
notif = NotificationFactory.create_notification_new_file(Constants.STR_REPOSITORY, filepath, source_fingerprint, dest_fingerprint)
self.mqtt_client.publish(Topics.NEW_FILE, notif)
else:
notif = NotificationFactory.create_notification_error(source_disk, filepath, "The file could not be copied")
self.mqtt_client.publish(Topics.ERROR, notif)
def __do_copy_file(self, source_location:str, filepath:str, target_disk: str, target_location:str):
source_fingerprint = FileHelper.calculate_fingerprint(f"{source_location}/{filepath}")
# Création du répertoire de destination si nécessaire
parent_path = Path(filepath).parent
if not parent_path.exists():
print(f"Création du répertoire {parent_path} dans le dépôt")
os.makedirs(f"{target_location}/{parent_path}", exist_ok= True)
dest_fingerprint = FileHelper.copy_file(source_location, filepath, target_location, source_fingerprint)
if dest_fingerprint != "":
Logger().debug(f"The file {filepath} has been copied to {target_location}. The fingerprint is {source_fingerprint}")
# Envoi d'une notification pour informer de la présence d'un nouveau fichier
notif = NotificationFactory.create_notification_new_file(disk= target_disk, filepath= filepath, source_fingerprint= source_fingerprint, dest_fingerprint= dest_fingerprint)
self.mqtt_client.publish(f"{Topics.NEW_FILE}/notification", notif)
response = ResponseFactory.create_response_copy_file(filepath, target_disk, True, dest_fingerprint)
else:
response = ResponseFactory.create_response_copy_file(filepath, target_disk, False, dest_fingerprint)
Logger().error(f"La copie du fichier {filepath} dans le dépôt a échoué.")
self.mqtt_client.publish(f"{Topics.COPY_FILE}/response", response)
def __handle_clear_queue(self):
while not self.__copy_files_queue.empty():
self.__copy_files_queue.get()
while not self.__read_files_queue.empty():
self.__read_files_queue.get()
def __handle_mount_file(self, payload):
""" Mounts an archive file or a guest file (ISO, VMDK, etc)
The guests files formats are not handled yet.
The archives files formats list is provided by :data:`Constants.ARCHIVE_EXTENSIONS_HANDLED`.
"""
if not MqttHelper.check_payload(payload, ["disk", "filepath"]):
Logger().error(f"The command {Topics.MOUNT_FILE} misses argument(s)", "sys-usb")
return
disk = payload["disk"]
filepath = payload["filepath"]
filename = self.sanitize_filename(os.path.basename(filepath))
file_ext = "".join(Path(filename).suffixes)
source_mount_point = Constants.USB_MOUNT_POINT
file_mount_point = f"/media/loop/{filename}"
if FileHelper.is_archive_file(filename):
try:
if os.path.exists(file_mount_point):
os.rmdir(file_mount_point)
os.mkdir(file_mount_point, mode=0o770)
except Exception as e:
Logger().error(f"An exception occured while preparing the mount point {file_mount_point}: {e}")
# The mount from python can not be done unless we are root... strange...
cmd = [
#"doas",
#"/usr/bin/archivemount",
"/usr/bin/fuse-archive",
"-o", "nospecials",
"-o", "noxattrs",
"-o", "ro,nodev,nosuid,noexec",
"-o", "uid=1000,gid=1000,umask=007",
"-o", "allow_other",
f"{source_mount_point}/{disk}{filepath}",
f"{file_mount_point}"
]
res = subprocess.run(cmd, check=False)
if res.returncode == 0:
payload = NotificationFactory.create_notification_disk_state(filename, DiskState.MOUNTED)
self.mqtt_client.publish(f"{Topics.DISK_STATE}", payload)
else:
Logger().warn(f"The file {filepath} could not be mounted (exitcode={res.returncode}")
else:
Logger().error(f"The file type {file_ext} is not handled for mounting")
def __handle_unmount(self, payload):
if not MqttHelper.check_payload(payload, ["disk"]):
Logger().error(f"The command {Topics.MOUNT_FILE} misses argument(s)", "sys-usb")
return
disk = payload["disk"]
#file_mount_point = f"/media/loop/{disk}"
file_mount_point = FileHelper.get_mount_point(disk)
if not file_mount_point:
Logger().error(f"The disk {disk} is not mounted")
return
cmd = [ "fusermount3", "-u", f"{file_mount_point}"]
#cmd = [ "doas", "/bin/umount", f"{file_mount_point}" ]
res = subprocess.run(cmd, check=False)
if res.returncode == 0:
# Mount succeeded, double check
if not os.path.exists(file_mount_point):
Logger().error(f"The disk {disk} has not been unmounted")
return
payload = NotificationFactory.create_notification_disk_state(disk, DiskState.UNMOUNTED)
self.mqtt_client.publish(f"{Topics.DISK_STATE}", payload)
try:
os.rmdir(file_mount_point)
except Exception as e:
Logger().error(f"Could not remove directory of mount point {file_mount_point}: {e}")
[docs]
def sanitize_filename(self, s: str) -> str:
""" Replaces all non-printable chars of a filename with '_' """
result = []
for c in s:
if unicodedata.category(c)[0] == 'C':
result.append('_')
else:
result.append(c)
return ''.join(result).replace(" ", "_")
########
## Special functions
##
def __debug_threads(self):
print(f"Active threads: {threading.active_count()}")
for thread in threading.enumerate():
print(f" -> {thread.name}")