""" \author Tristan Israƫl """
import subprocess
import platform
import os
import json
import threading
try:
import psutil
except ImportError:
pass
import shutil
from . import SingletonMeta, __version__, Constants, Topology, DomainType, ConfigurationHelper, Domain
try:
from . import LibvirtHelper
except ImportError:
print("Not using Libvirt")
try:
from inotify_simple import INotify, flags
except Exception:
print("The inotify library is not available on this system")
topology = Topology()
[docs]
class System(metaclass=SingletonMeta):
""" The System class provides functions for querying or modifying the system's state.
Some of the functions belong to the main system (Dom0) and some other belong to the virtual machines (DomU)
"""
__DEFAULT_SCREEN_SIZE = "1100,750"
__FALLBACK_CPU_COUNT = 1
__width = -1
__height = -1
__rotation = -1
__system_uuid = ""
__cpu_count = None
__cpu_assignments = None
__settings = {}
[docs]
def get_screen_width(self) -> int:
""" Returns the system main screen's resolution width
This function must be ran in a DomU.
"""
if self.__width > -1:
return self.__width
screen_size = self._get_screen_size()
if "," in screen_size:
rotation = self.get_screen_rotation()
if rotation == 0 or rotation == 180:
self.__width = int(screen_size.split(',')[0])
else:
self.__width = int(screen_size.split(',')[1])
return self.__width
return 1024
[docs]
def get_screen_height(self):
""" Returns the system main screen's resolution height
This function must be ran in a DomU.
"""
if self.__height > -1:
return self.__height
screen_size = self._get_screen_size()
if "," in screen_size:
rotation = self.get_screen_rotation()
if rotation == 0 or rotation == 180:
self.__height = int(screen_size.split(',')[1])
else:
self.__height = int(screen_size.split(',')[0])
return self.__height
return 768
[docs]
def get_screen_rotation(self) -> int:
""" Returns the Domain's screen rotation angle
The rotation angle is defined in the topology file in the setting ``gui.screen.rotation``.
This function must be ran in a DomU.
Possible values are: 0, 90, 180, 270.
"""
if self.__rotation > -1:
return self.__rotation
try:
res = subprocess.run(["xenstore-read", "/local/domain/system/screen_rotation"], capture_output=True, text=True, check=False)
if res.returncode == 0:
return int(res.stdout)
except Exception:
return 0
return 0
def _get_screen_size(self) -> str:
""" Returns the Domain's screen size as a string.
This function must be ran in a DomU.
A typical value is: "1280x1024"
"""
try:
res = subprocess.run(["xenstore-read", "/local/domain/system/screen_size"], capture_output=True, text=True, check=False)
if res.returncode == 0:
return res.stdout
except Exception:
return self.__DEFAULT_SCREEN_SIZE
return self.__DEFAULT_SCREEN_SIZE
[docs]
def get_system_uuid(self):
""" Returns the system's UUID
The UUID is queried from the Linux kernel's ``/sys/class/dmi/id/product_uuid``
"""
system = platform.system().lower()
# If we have it in cache...
if self.__system_uuid != "":
return self.__system_uuid
if system == 'linux':
try:
with open('/sys/class/dmi/id/product_uuid', 'r') as f:
self.__system_uuid = f.read().strip()
except FileNotFoundError:
return "" # UUID is not available on this machine
except PermissionError:
return "" # UUID reading is not autorized
elif system == 'windows':
try:
output = subprocess.check_output('wmic csproduct get uuid', shell=True)
lines = output.decode().split('\n')
uuid = [line.strip() for line in lines if line.strip() and "UUID" not in line]
self.__system_uuid = uuid[0] if uuid else ""
except subprocess.CalledProcessError:
return "" # Erreur lors de l'exƩcution de wmic
elif system == 'darwin': # macOS
try:
output = subprocess.check_output('ioreg -rd1 -c IOPlatformExpertDevice', shell=True)
for line in output.decode().split('\n'):
if 'IOPlatformUUID' in line:
self.__system_uuid = line.split('"')[-2]
return ""
except subprocess.CalledProcessError:
return "" # Erreur lors de l'exƩcution de ioreg
return self.__system_uuid
[docs]
@staticmethod
def debug_activated():
""" Returns whether the debugging has been activated
This function must be ran in the Dom0.
"""
try:
fd = os.open("/proc/cmdline", os.O_RDONLY)
data = os.read(fd, 4096)
return b'debug=on' in data.lower()
except Exception:
return False
[docs]
@staticmethod
def domain_name():
""" Returns the Domain's name
This function must be ran in a DomU.
"""
return platform.node()
[docs]
@staticmethod
def reset_topology():
""" Resets the current topology """
global topology
topology = Topology()
[docs]
@staticmethod
def get_topology(override_topology_file:str = "") -> Topology:
""" Returns a ``Topology`` object initialized with the contents of the topology file
This function must be ran in the Dom0.
"""
global topology
if not topology.initialized():
# Initialize the topology object with the default configuration
# We use the abstract struct returned by get_topology_struct()
topo_struct = System.get_topology_struct(override_topology_file)
topo = System.__parse_topology(topo_struct)
topology = ConfigurationHelper.apply_configuration(topo)
topology.set_initialized(True)
return topology
@staticmethod
def __parse_topology(topo:dict) -> Topology:
_topology = Topology()
# Product information
_topology.product_name = topo.get("product", {}).get("name", "Unknown")
_topology.add_color("splash_bgcolor", topo.get("product", {}).get("splash_bgcolor", "#000000"))
_topology.languages = topo.get("product", {}).get("languages", [])
_topology.default_language = topo.get("product", {}).get("default_language", "en")
# System information
_topology.use_usb = topo.get("system", {}).get("use_usb", False)
_topology.use_gui = topo.get("system", {}).get("use_gui", False)
_topology.screen.rotation = topo.get("system", {}).get("screen_rotation", 0)
screen = System.get_framebuffer_dimension()
_topology.screen.width = screen[0] if screen is not None else 0
_topology.screen.height = screen[1] if screen is not None else 0
_topology.uuid = System().get_system_uuid()
_topology.gui.app_package = topo.get("system", {}).get("gui_app_package", "error")
_topology.gui.memory = topo.get("system", {}).get("gui_memory", 2000)
_topology.gui.use = _topology.use_gui
_topology.pci.blacklist = topo.get("system", {}).get("pci", {}).get("blacklist", [])
# Domains information
domains = topo.get("domains", {})
for domain_name, domain_desc in domains.items():
domain = Domain(domain_name, domain_desc.get("type", DomainType.UNKNOWN))
domain.vcpu_group = domain_desc.get("vcpu_group", "")
domain.memory = domain_desc.get("memory", 0)
domain.vcpus = domain_desc.get("vcpus", "")
domain.cpu_affinity = domain_desc.get("cpus", []) #System.parse_range(domain.vcpus)
domain.package = domain_desc.get("package", "")
domain.temp_disk_size = domain_desc.get("temp_size_size", 0)
_topology.add_domain(domain)
# Configurations
_topology.configurations = topo.get("configurations", [])
return _topology
[docs]
@staticmethod
def parse_range(value: str) -> tuple[int, ...]:
""" Converts an int range represented as a string into a tuple of all values between min and max
Examples:
>>> parse_range("1-2")
(1,2)
>>> parse_range("3-8")
(3,4,5,6,7,8)
The left value must be lower that the right value.
"""
if value == "":
return ()
if "-" in value:
start, end = map(int, value.split("-", 1))
if start > end:
raise ValueError("Invalid range")
return tuple(range(start, end + 1))
return (int(value),)
[docs]
@staticmethod
def get_topology_struct(override_topology_file:str = "") -> dict:
""" Returns the topology of the current system
The topology structure is different from the configuration file topology.json because the file will
evolve from its original format and it must be dissociated from the internal structure to avoid
future compatibility problems.
The topology is defined in the file `topology.json`. This function returns a data structure
representing the topology as a dict. Instead of returning the JSON data as the function
:func:`read_topology_file` does, it returns a structure representing the objects:
::
{
"domains": [
"my-domain": {
"vcpu_group": "group1",
"memory": 4000,
"vcpus": 2,
"cpus": "3-4",
"package": "",
"temp_disk_size": 4096
}
],
"system": {
"use_usb": 1,
"use_gui": 1,
"screen_rotation": 0,
"gui_app_package": "",
"gui_memory": 1000,
},
"product": {
"name": "Safecor"
},
"configurations": [
{
"name": "",
...
}
]
}
All the keys are guaranteed to exist with a default value if necessary.
"""
topo_data = System.get_topology_data(override_topology_file)
if topo_data is None:
print("No topology data available. Aborting.")
return {}
topo_struct = System.__analyse_topology_struct(topo_data)
return topo_struct
@staticmethod
def __analyse_topology_struct(topo_data:dict) -> dict:
topo_struct = {}
usb = topo_data.get("usb", {})
gui = topo_data.get("gui", {})
screen = gui.get("screen", {})
vcpu = topo_data.get("vcpu", {})
business = topo_data.get("business", {})
business_domains = business.get("domains", [])
topo_data_pci = topo_data.get("pci", {})
pci_blacklist = topo_data_pci.get("blacklist", [])
use_usb = usb.get("use", False)
use_gui = gui.get("use", False)
gui_app_package = gui.get("app-package", "")
screen_rotation = screen.get("rotation", 0)
gui_memory = gui.get("memory", 128)
topo_struct["system"] = {
"use_usb": use_usb,
"use_gui": use_gui,
"screen_rotation": screen_rotation,
"gui_app_package": gui_app_package,
"memory": gui_memory,
"pci": {
"blacklist": pci_blacklist
}
}
vcpu_groups = vcpu.get("groups", {})
topo_domains = {}
# sys-usb domain
topo_domains["sys-usb"] = {
"name": "sys-usb",
"type": DomainType.CORE,
"memory": 350,
"vcpus": System.compute_vcpus_for_group("sys-usb", vcpu_groups),
"cpus": System().compute_cpus_for_group("sys-usb", vcpu_groups),
"vcpu_groups": vcpu_groups
}
# sys-gui domain
topo_domains["sys-gui"] = {
"name": "sys-gui",
"type": DomainType.CORE,
"memory": gui.get("memory"),
"vcpus": System.compute_vcpus_for_group("sys-gui", vcpu_groups),
"cpus": System().compute_cpus_for_group("sys-gui", vcpu_groups),
"vcpu_groups": vcpu_groups
}
for domain in business_domains:
# Business domains
domain_name = domain.get("name", "unknown")
group_name = domain.get("vcpu_group", domain_name) # If there is no group we will create a default configuration
topo_domains[domain_name] = {
"name": domain.get("name", "unknown"),
"type": DomainType.BUSINESS,
"memory": domain.get("memory", 0),
"package": domain.get("app-package", ""),
"vcpus": System.compute_vcpus_for_group(group_name, vcpu_groups),
"cpus": System().compute_cpus_for_group(group_name, vcpu_groups),
"vcpu_groups": vcpu_groups,
"temp_disk_size": domain.get("temp_disk_size", 0)
}
topo_struct["domains"] = topo_domains
# Get product information (copy)
json_product = topo_data.get("product", {})
topo_product = {}
topo_product["name"] = json_product.get("name", "No Name")
topo_product["splash_bgcolor"] = json_product.get("splash_bgcolor", "#1ca9f7")
topo_struct["product"] = topo_product
topo_product["languages"] = json_product.get("languages", [])
topo_product["default_language"] = json_product.get("default_language", "en")
# Get the configurations settings
data_configurations = topo_data.get("configurations", [])
topo_configurations = {}
for data_conf in data_configurations:
conf_name = data_conf.get("name", "noname")
conf_identifier = data_conf.get("identifier", {})
conf_settings = data_conf.get("settings", {})
topo_configurations[conf_name] = {
"identifier": conf_identifier,
"settings": conf_settings
}
topo_struct["configurations"] = topo_configurations
return topo_struct
[docs]
@staticmethod
def read_topology_file(override_topology_file:str = "") -> str:
""" Reads the topology file describing the product
This function must be ran in the Dom0.
"""
filepath = '/etc/safecor/topology.json' if override_topology_file == "" else override_topology_file
try:
with open(filepath, 'r') as f:
topo = f.read()
f.close()
return topo
except Exception as e:
print(f"An error occured while reading the topology file at {filepath})")
print(str(e))
return ""
[docs]
@staticmethod
def get_topology_data(override_topology_file:str = "") -> dict:
""" Interprets the topology file data as a JSON object
This function must be ran in the Dom0.
"""
try:
topo_data = System.read_topology_file(override_topology_file)
data = json.loads(topo_data)
return data
except Exception as e:
print("An error occured while decoding JSON file")
print(str(e))
return {}
[docs]
@staticmethod
def compute_vcpus_for_group(group_name:str, groups:dict) -> int:
""" Computes the number of vCPUs which will be pinned to each Domain of a group.
The number of vCPUs depends on the value of the parameter ``vcpu.groups`` defined in the file ``topology.json``.
This function must be ran in the Dom0.
"""
vcpus = 1
platform_cpus = System().get_platform_cpu_count()
dom0_vcpus = 2 if platform_cpus > 4 else 1
sys_usb_vcpus = 0 #Dom0 and sys-usb share the same vcpus
if group_name == "sys-usb":
return dom0_vcpus
reserved_vcpus = dom0_vcpus + sys_usb_vcpus
# If there is no group defined we force the cpu count
if len(groups) == 0:
#print(f"There is no group for {group_name}")
#print(platform_cpus, reserved_vcpus)
return platform_cpus - reserved_vcpus
if group_name in groups:
vcpu_rate = groups.get(group_name, None)
# Override cpu rate for sys-gui if not provided
vcpu_rate = 0.2 if vcpu_rate is None and group_name == "sys-gui" else vcpu_rate
if vcpu_rate is not None:
vcpus = int(round(vcpu_rate*(platform_cpus-reserved_vcpus), 0))
return max(vcpus, 1)
else:
# If there is no group defined we consider using 100% of the remaining cores
#print(f"default case for {group_name}:{platform_cpus - reserved_vcpus}")
return platform_cpus - reserved_vcpus
return vcpus
[docs]
def compute_cpus_for_group(self, group_name:str, groups:dict) -> list[int]:
""" Computes the CPUs (or cores) which will be pinned to the Domains of the group.
The first CPU is assigned to Dom0 and sys-usb Domain.
If there are at least 4 CPUs the second CPU is also assigned to Dom0 and sys-usb.
The other CPUs are assigned to sys-gui and the other groups by trying to avoid overlapping.
This function must be ran in the Dom0.
"""
cpu_count = System().get_platform_cpu_count()
if group_name not in groups and group_name not in [ "sys-usb", "Dom0" ]:
# If the group does not exist we return the default configuration
return list(range(1 if cpu_count <= 4 else 2, cpu_count))
if self.__cpu_assignments is not None:
# If the groups are in cache we take them
cpu_assignments = self.__cpu_assignments
else:
# Otherwise we calculate them before
cpu_assignments = {
"Dom0": [ 0 ],
"sys-usb": [ 0 ],
"sys-gui": [ 1 ]
}
next_pin = 1
if cpu_count > 4:
# If there are more than 4 CPUs/cores we add one to Dom0 and sys-usb
cpu_assignments["Dom0"].append(next_pin)
cpu_assignments["sys-usb"].append(next_pin)
next_pin += 1
# Then we assign sys-gui
if cpu_count > 4:
vcpus = System.compute_vcpus_for_group("sys-gui", groups)
cpu_assignments["sys-gui"] = list(range(2, next_pin + vcpus))
next_pin += 2
# Then the other groups
for gname in groups.keys():
if gname in [ "sys-gui" ]:
continue # We ignore sys-gui because we already worked on it
vcpus = System.compute_vcpus_for_group(gname, groups)
cpu_assignments[gname] = list(range(next_pin, next_pin + vcpus))
self.__cpu_assignments = cpu_assignments
# Finally we return the value
return cpu_assignments.get(group_name, [ 0 ])
@staticmethod
def __get_storage_info() -> dict:
""" Returns information about the storage
This function must be ran in the Dom0.
The fields are:
- total - The total size of the storage in bytes
- used - The used space of the storage in bytes
- free - The free space of the storage in bytes
- files - The number of files in the storage
"""
info = {
"total": 0,
"used": 0,
"free": 0,
"files": 0
}
storage_path = Constants.DOM0_REPOSITORY_PATH
print(f"Looking for storage information into {storage_path}")
# Get information about the disk
try:
if storage_path is not None:
usage = shutil.disk_usage(storage_path)
info["total"] = usage.total
info["used"] = usage.used
info["free"] = usage.free
except Exception as e:
print(f"Could not get storage information : {e}")
# Get information about the files
if storage_path is not None:
for _, _, files in os.walk(storage_path):
info["files"] += len(files)
return info
[docs]
def get_cpu_allocation(self) -> dict:
""" Provides information about CPU allocation for all the Domains
This function must be ran in the Dom0.
"""
cpu_alloc = {}
# First get the list of domains
domains = LibvirtHelper.get_domains()
for domain in domains.values():
cpu_alloc[domain.name] = domain.cpu_affinity
return cpu_alloc
[docs]
@staticmethod
def cpu_affinity_to_string(cpu_affinity:list) -> str:
""" Converts the cpu affinity of the Domain definition to a string
Example:
>>>
cpu_affinity_to_string([1,2,3,4])
'1-4'
"""
if len(cpu_affinity) == 0:
return ""
elif len(cpu_affinity) == 1:
return str(cpu_affinity[0])
return f"{cpu_affinity[0]}-{cpu_affinity[-1]}"
[docs]
@staticmethod
def get_framebuffer_dimension(fbdev:int = 0) -> tuple[int, int]:
""" Returns the framebuffer dimensions as a tuple (width, height)
By default, the framebuffer fb0 is queried. This can be overriden by setting the
parameter fbdev
"""
try:
with open(f"/sys/class/graphics/fb{fbdev}/virtual_size") as f:
data = f.read().strip()
width, height = map(int, data.split(","))
return width, height
except FileNotFoundError:
return (1100, 750)
#@staticmethod
#def get_screen_rotation_from_topology(override_topology_file:str = "") -> int:
# """ Returns the screen rotation defined in the topology file
#
# This function is necessary for treatments that occur before libvirt and xenstore
# are loaded. The function :func:`get_topology_struct` is one of those.
# """
#
# topo_data = System.get_topology_data(override_topology_file)
# gui = topo_data.get("gui", {})
# screen = gui.get("screen", {})
# rotation = screen.get("rotation", 0)
#
# return rotation
[docs]
@staticmethod
def get_splash_bgcolor_from_topology(override_topology_file:str = "") -> int:
""" Returns the background color for the plash defined in the topology file
This function is necessary for treatments that occur before libvirt and xenstore
are loaded. The function :func:`get_topology_struct` is one of those.
"""
topo_data = System.get_topology_data(override_topology_file)
product = topo_data.get("product", {})
color = product.get("splash_bgcolor", "#000000")
return color
[docs]
def monitor_file(self, path:str, filename:str, fn_callback):
"""
Starts the monitoring of a single file or a whole directory.
When a file is created or removed, the callback function is called with the following parameters:
- path:str - The directory path
- filename:str - The file name
- exists:bool - True if the file has been created, else False
This function is uninterruptible.
:param path: The directory to monitor
:type path: str
:param filename: A file name to monitor, set to None if the whole directory should be monitored
:type filename: str
:param fn_callback: The callback function
"""
if fn_callback is None:
print("Monitoring of file called without callback. Aborted.")
return
thread = threading.Thread(target=self.__monitor_file_worker, daemon=True, args=(path, filename, fn_callback,))
thread.start()
def __monitor_file_worker(self, path:str, filename:str, fn_callback):
i = INotify()
i.add_watch(path, flags.CREATE | flags.DELETE)
while True:
for event in i.read():
(_, mask, _path, _filename) = event
if filename is not None and filename != _filename:
continue
present = flags.CREATE in flags.from_mask(mask)
threading.Thread(target=fn_callback, args=(path, _filename, present,)).start()
[docs]
def get_settings(self) -> dict:
return self.__settings
[docs]
def get_setting(self, key:str):
return self.__settings.get(key, "")
[docs]
def set_setting(self, key:str, value):
self.__settings[key] = value
[docs]
def parse_kernel_command_line_settings(self):
""" Returns the settings defined on the command line
All the keys are turned to lowercase.
"""
valid_settings = [ "DEBUG", "debug", "DEFAULT_LANGUAGE", "default_language" ]
with open("/proc/cmdline", "r") as f:
content = f.read().strip()
result = {}
for item in content.split():
if "=" in item:
key, value = item.split("=", 1)
if key in valid_settings:
result[key.lower()] = value
return result
return {}