Source code for safecor._dom0_controller

""" \author Tristan Israël """

import threading
import subprocess
import time
import psutil
from . import Constants, __version__, Settings
from . import Logger, FileHelper
from . import ResponseFactory
from . import MqttClient, Topics, MqttHelper, NotificationFactory
from . import System, ComponentState, topology
LIBVIRT_UNAVAILABLE = False
try:
    from . import LibvirtHelper
except ImportError:
    LIBVIRT_UNAVAILABLE = True
    print("Not using libvirt")

[docs] class Dom0Controller(): """ This class handles some of the commands sent by Domains that involve the repository and the system in general (supervision, configuration, etc) The capabilities of the Dom0 controller are: - List the files of the repository (:attr:`Topics.LIST_FILES`). - Delete a file in the repository (:attr:`Topics.DELETE_FILE`). - Calculate a file footprint (:attr:`Topics.FILE_FOOTPRINT`). - Shut the system down (:attr:`Topics.SHUTDOWN`). - Restart a Domain (:attr:`Topics.RESTART_DOMAIN`). ** The Dom0 cannot be restarted **. - Get the system information (:attr:`Topics.SYSTEM_INFO`). - Get the energy state (:attr:`Topics.ENERGY_STATE`). - Discover the components of the system (:attr:`Topics.DISCOVER_COMPONENTS`). - Ping a Domain (:attr:`Topics.PING`). - Notify that the GUI is ready (:attr:`Topics.GUI_READY`). All the function can be called using the API or the protocol. .. seealso:: :class:`Api` - The API class """ __mqtt_lock = threading.Event() __is_shutting_down = False
[docs] def __init__(self, mqtt_client: MqttClient): """ Instanciates the Dom0 controller. Args: mqtt_client (MqttClient): The instance of the MqttClient class which handles the connexion to the MQTT broker. """ self.mqtt_client = mqtt_client # Handle Mqtt messages self.mqtt_client.on_connected = self.__on_mqtt_connected self.mqtt_client.on_message = self.__on_mqtt_message Logger().setup("System controller", mqtt_client)
[docs] def start(self): """ Starts the Dom0 controller. When the Dom0 controlelr starts it automatically subscribes to the following topics: - :attr:`Topics.LIST_FILES` - :attr:`Topics.FILE_FOOTPRINT` - :attr:`Topics.SHUTDOWN` - :attr:`Topics.RESTART_DOMAIN` - :attr:`Topics.GUI_READY` - :attr:`Topics.SYSTEM_INFO` - :attr:`Topics.ENERGY_STATE` - :attr:`Topics.DELETE_FILE` - :attr:`Topics.DISCOVER_COMPONENTS` - :attr:`Topics.PING` After the Dom0 controller is started it is able to answer requests on these topics. """ self.mqtt_client.start() self.__mqtt_lock.wait()
def __on_mqtt_connected(self): self.mqtt_client.subscribe(f"{Topics.LIST_FILES}/request") self.mqtt_client.subscribe(f"{Topics.FILE_FINGERPRINT}/request") self.mqtt_client.subscribe(f"{Topics.SHUTDOWN}/request") self.mqtt_client.subscribe(f"{Topics.RESTART_DOMAIN}/request") self.mqtt_client.subscribe(Topics.GUI_READY) self.mqtt_client.subscribe(f"{Topics.SYSTEM_INFO}/request") self.mqtt_client.subscribe(f"{Topics.ENERGY_STATE}/request") self.mqtt_client.subscribe(f"{Topics.DELETE_FILE}/request") self.mqtt_client.subscribe(f"{Topics.DISCOVER_COMPONENTS}/request") self.mqtt_client.subscribe(f"{Topics.PING}/request") self.mqtt_client.subscribe(f"{Topics.SETTINGS}/#/request") # Handle the kernel's or configuration command line settings self.__handle_settings() Logger().debug("Safecor Dom0 controller is ready") def __on_mqtt_message(self, topic:str, payload:dict): # The message will be handled in a thread threading.Thread(target=self.__message_worker, args=(topic, payload, )).start() def __message_worker(self, topic:str, payload:dict): """ Cette fonction traite uniquement les messages destinés au Dom0 """ try: if topic == f"{Topics.LIST_FILES}/request": self.__handle_list_files(payload) elif topic == f"{Topics.FILE_FINGERPRINT}/request": self.__handle_file_fingerprint(payload) elif topic == f"{Topics.SHUTDOWN}/request": self.__handle_shutdown(payload) elif topic == f"{Topics.RESTART_DOMAIN}/request": self.__handle_restart_domain(payload) elif topic == Topics.GUI_READY: self.__handle_gui_ready(payload) elif topic == f"{Topics.ENERGY_STATE}/request": self.__handle_energy_state() elif topic == f"{Topics.SYSTEM_INFO}/request": self.__handle_system_info() elif topic == f"{Topics.DELETE_FILE}/request": self.__handle_delete_file(payload) elif topic == f"{Topics.DISCOVER_COMPONENTS}/request": self.__handle_discover_components() elif topic == f"{Topics.PING}/request": self.__handle_ping(payload) elif topic.startswith(Topics.SETTINGS): self.__handle_settings(topic, payload) except Exception: Logger.print("An exception occured while handling the message") def __handle_list_files(self, payload:dict) -> None: if not self.__is_storage_request(payload): return # Récupère la liste des fichiers fichiers = FileHelper.get_files_list(Constants.STR_REPOSITORY, True) # Génère la réponse response = ResponseFactory.create_response_list_files(Constants.STR_REPOSITORY, fichiers) self.mqtt_client.publish(f"{Topics.LIST_FILES}/response", response) def __handle_file_fingerprint(self, payload:dict) -> None: if not self.__is_storage_request(payload): return filepath = payload.get("filepath") disk = payload.get("disk") if filepath is None or disk is None: # S'il manque un argument on envoie une erreur Logger().error("La commande est incomplète : il manque le nom du disque et/ou le chemin du fichier") return # Calcule l'empreinte repository_path = Constants.DOM0_REPOSITORY_PATH fingerprint = FileHelper.calculate_fingerprint(f"{repository_path}/{filepath}") Logger().info(f"Fingerprint = {fingerprint}") # Génère la réponse response = ResponseFactory.create_response_file_fingerprint(filepath, disk, fingerprint) self.mqtt_client.publish(f"{Topics.FILE_FINGERPRINT}/response", response) def __handle_shutdown(self, payload:dict): if self.__is_shutting_down: return Logger().warn("System shutdown requested!") self.__is_shutting_down = True # There is currently no rule for the shutdown, so we accept it response = ResponseFactory.create_response_shutdown(True) self.mqtt_client.publish(f"{Topics.SHUTDOWN}/response", response) # We wait 5 seconds to let the GUIs and clients acknowledge the information time.sleep(5) # Then we shut the system down cmd = ["/usr/bin/doas", "/sbin/poweroff"] subprocess.run(cmd, check=False) def __handle_restart_domain(self, payload:dict): if not MqttHelper.check_payload(payload, ["domain_name"]): Logger().error(f"Missing argument domain_name in the topic {Topics.RESTART_DOMAIN}") return domain_name = payload.get("domain_name", "") self.__reboot_domain(domain_name) def __handle_gui_ready(self, payload:dict): # When GUI is ready we hide the splash screen cmd = ["/usr/bin/doas", "/usr/bin/killall", "feh"] subprocess.run(cmd, check=False) def __is_storage_request(self, payload:dict) -> bool: if payload.get("disk") is not None: return payload.get("disk") == Constants.STR_REPOSITORY else: return False def __reboot_domain(self, domain_name:str): # The reboot vie libvirt requires that the Domains are created # by libvirt. This is a coming feature... #if LIBVIRT_UNAVAILABLE: # Logger().error("Libvirt is unavailable. Cannot reboot domain") # return # #if LibvirtHelper.reboot_domain(domain_name): # Logger().info(f"Rebooting domain {domain_name}") # response = ResponseFactory.create_response_restart_domain(domain_name, True) # self.mqtt_client.publish(f"{Topics.RESTART_DOMAIN}/response", response) #else: # Logger().error(f"The domain {domain_name} won't reboot") # response = ResponseFactory.create_response_restart_domain(domain_name, False) # self.mqtt_client.publish(f"{Topics.RESTART_DOMAIN}/response", response) cmd = ["doas", "/usr/lib/safecor/bin/reboot-domain.sh", domain_name] res = subprocess.run(cmd, check=False) if res.returncode == 0: Logger().info(f"Rebooting domain {domain_name}") response = ResponseFactory.create_response_restart_domain(domain_name, True, "") self.mqtt_client.publish(f"{Topics.RESTART_DOMAIN}/response", response) else: Logger().error(f"The domain {domain_name} won't reboot") response = ResponseFactory.create_response_restart_domain(domain_name, False, f"{res.stdout}\n{res.stderr}") self.mqtt_client.publish(f"{Topics.RESTART_DOMAIN}/response", response) def __handle_energy_state(self): battery = psutil.sensors_battery() if battery: payload = NotificationFactory.create_notification_energy_state(battery) self.mqtt_client.publish(f"{Topics.ENERGY_STATE}/response", payload) def __handle_system_info(self): payload = System.get_system_information() self.mqtt_client.publish(f"{Topics.SYSTEM_INFO}/response", payload) def __handle_delete_file(self, payload:dict): if not MqttHelper.check_payload(payload, ["disk", "filepath"]): Logger().error(f"The command {Topics.DELETE_FILE} misses argument(s)", "Dom0") return disk = payload["disk"] if disk != Constants.STR_REPOSITORY: # This file is not stored in the repository so we ignore it return filepath = payload["filepath"] repository_path = Constants.DOM0_REPOSITORY_PATH storage_filepath = f"{repository_path}/{filepath}" if not FileHelper().remove_file(storage_filepath): Logger().error(f"Removal of file {filepath} from repository failed") else: notif = NotificationFactory.create_notification_deleted_file(disk, filepath) self.mqtt_client.publish(Topics.DELETED_FILE, notif) Logger().info(f"Removed file {filepath} from repository") def __handle_discover_components(self): comp = ResponseFactory.create_entry_component_state(Constants.SAFECOR_SYSTEM_CONTROLLER, "Core controller", "Dom0", ComponentState.READY ) payload = ResponseFactory.create_response_component_state([comp]) self.mqtt_client.publish(f"{Topics.DISCOVER_COMPONENTS}/response", payload) def __handle_ping(self, payload): ping_id = payload.get("id", "") data = payload.get("data", "") sent_at = payload.get("sent_at", "") payload = ResponseFactory.create_response_ping(ping_id, "Dom0", data, sent_at) self.mqtt_client.publish(f"{Topics.PING}/response", payload) def __handle_settings(self, topic:str, payload:dict): """ Handles the settings requests """ if topic.startswith(Topics.LANGUAGES): self.__handle_settings_languages(topic, payload) def __handle_settings_languages(self, topic:str, payload:dict): """ Handles the languages settings """ if topic.startswith(Topics.LIST_LANGUAGES): payload = ResponseFactory.create_response_languages_list(topology.languages) self.mqtt_client.publish(f"{Topics.LIST_LANGUAGES}/response", payload) elif topic.startswith(Topics.DEFAULT_LANGUAGE): payload = ResponseFactory.create_response_language_default(topology.default_language) self.mqtt_client.publish(f"{Topics.DEFAULT_LANGUAGE}/response", payload) elif topic.startswith(Topics.CURRENT_LANGUAGE): key = payload.get("key", None) if key is None: Logger().warn(f"There is no setting with the name {key}") return payload = ResponseFactory.create_response_get_setting(key, System().get_setting(key)) self.mqtt_client.publish(f"{Topics.CURRENT_LANGUAGE}/response", payload) elif topic.startswith(Topics.SET_LANGUAGE): # Send the response key = payload.get("key", None) if key is None: Logger().warn(f"There is no setting with the name {key}") return value = payload.get("value", "") # Define the setting System().set_setting(key, value) new_value = System().get_setting(key) # Verify the setting if value == new_value: Logger().warn(f"The setting value for {key} has not been changed") return # Send the response payload = ResponseFactory.create_response_set_setting(key, new_value) self.mqtt_client.publish(f"{Topics.CURRENT_LANGUAGE}/response", payload) # Send a notification that the value changed notif = NotificationFactory.create_notification_setting_changed(key, value) self.mqtt_client.publish(f"{Topics.SETTING_CHANGED}", notif) def __handle_settings(self): """ Handle the settings defined in the configuration or kernel's command line The current settings are: - The language that becomes the current language. If no language setting is defined, the default language becomes the current language """ # Look in the kernel command line kernel_settings = System().parse_kernel_command_line_settings() if "language" in kernel_settings: lang = kernel_settings["language"] Logger().info(f"Default language defined on the kernel's command line is {lang}") System().set_setting(Settings.CURRENT_LANGUAGE, lang) else: Logger().info(f"Default language defined on the kernel's command line is {topology.default_language}") System().set_setting(Settings.CURRENT_LANGUAGE, topology.default_language)