Source code for radical.edge.plugin_sysinfo


__author__    = 'Radical Development Team'
__email__     = 'radical@radical-project.org'
__copyright__ = 'Copyright 2024, RADICAL@Rutgers'
__license__   = 'MIT'


import glob
import os
import re
import time
import threading
import psutil
import socket
import logging
import platform
import subprocess
import json

from typing import Dict, List, Any

from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import JSONResponse

from .plugin_base import Plugin
from .client import PluginClient

log = logging.getLogger("radical.edge")


[docs] class SysInfoProvider: """ Helper class to gather system information using psutil and standard tools. """ def __init__(self): self._cpu_model = None self._gpu_info = None
[docs] def start_prefetch(self): """ Start a background thread to prefetch hardware detection. This lazily fills the detection cache so later queries are faster. """ def _prefetch(): try: self._ensure_detected() except Exception: pass # Silently ignore prefetch failures thread = threading.Thread(target=_prefetch, daemon=True) thread.start()
def _ensure_detected(self): """Run hardware detection once on first use.""" if self._cpu_model is None: self._cpu_model = self._detect_cpu_model() if self._gpu_info is None: self._gpu_info = self._detect_gpus() def _detect_cpu_model(self) -> str: """Parse /proc/cpuinfo or platform info for CPU model name.""" try: if platform.system() == "Linux": with open("/proc/cpuinfo", "r") as f: for line in f: if "model name" in line: return line.split(":")[1].strip() return platform.processor() except Exception: return "Unknown" def _detect_disk_type(self, device: str) -> str: """Detect storage type (SSD/HDD) via /sys/block on Linux.""" try: dev_name = os.path.basename(device) # Handle different device naming patterns # NVMe: nvme0n1p1 → nvme0n1 # MMC: mmcblk0p1 → mmcblk0 # Standard: sda1 → sda if dev_name.startswith('nvme'): # NVMe devices: nvme0n1p1 → nvme0n1 base_dev = re.sub(r'p\d+$', '', dev_name) elif dev_name.startswith('mmcblk'): # MMC devices: mmcblk0p1 → mmcblk0 base_dev = re.sub(r'p\d+$', '', dev_name) else: # Standard devices: sda1 → sda base_dev = re.sub(r'\d+$', '', dev_name) rot_path = f"/sys/block/{base_dev}/queue/rotational" if os.path.exists(rot_path): with open(rot_path, "r") as f: rot = f.read().strip() if rot == "0": return "ssd" elif rot == "1": return "hdd" except Exception: pass return "unknown" def _detect_gpus(self) -> List[Dict[str, Any]]: """ Detect available GPUs (NVIDIA, AMD, Intel) in parallel. Each detector runs in its own thread with a 5 s timeout so a slow or absent tool does not block the others. """ from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout gpus: List[Dict[str, Any]] = [] detectors = [ self._detect_nvidia_gpus, self._detect_amd_gpus, self._detect_intel_gpus, ] with ThreadPoolExecutor(max_workers=3) as ex: futures = [ex.submit(fn) for fn in detectors] for future in futures: try: gpus.extend(future.result(timeout=5)) except (FuturesTimeout, Exception): pass return gpus def _detect_nvidia_gpus(self) -> List[Dict[str, Any]]: """Query nvidia-smi for NVIDIA GPUs.""" nvidia_smi = os.environ.get("NVIDIA_SMI_PATH", "nvidia-smi") try: # Check availability subprocess.run([nvidia_smi, "-L"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, timeout=5) # Query static info cmd = [ nvidia_smi, "--query-gpu=index,name,driver_version,uuid", "--format=csv,noheader,nounits" ] ret = subprocess.check_output(cmd, text=True, timeout=5) gpus = [] for line in ret.strip().splitlines(): if not line.strip(): continue parts = [x.strip() for x in line.split(',')] if len(parts) >= 4: idx, name, driver, uuid = parts[:4] gpus.append({ "id": idx, # specific ID for internal mapping "index": int(idx), "name": name, "driver_version": driver, "uuid": uuid, "vendor": "NVIDIA" }) return gpus except (FileNotFoundError, subprocess.CalledProcessError): return [] def _detect_amd_gpus(self) -> List[Dict[str, Any]]: """Query rocm-smi for AMD GPUs.""" rocm_smi = os.environ.get("ROCM_SMI_PATH", "rocm-smi") try: # Check availability: rocm-smi --json returns full info # Note: --json flag support varies by version, but modern rocm-smi has it. cmd = [rocm_smi, "--showproductname", "--showdriverversion", "--showuniqueid", "--json"] ret = subprocess.check_output(cmd, text=True, timeout=5) data = json.loads(ret) gpus = [] # rocm-smi json keys are typically "card0", "card1", etc. for card_key, info in data.items(): if not card_key.startswith("card"): continue idx = int(card_key.replace("card", "")) gpus.append({ "id": card_key, # Use card0 as ID "index": idx, "name": info.get("Card Series", "Unknown AMD GPU"), "driver_version": info.get("Driver version", "Unknown"), "uuid": info.get("Unique ID", ""), "vendor": "AMD" }) return gpus except (FileNotFoundError, subprocess.CalledProcessError, ImportError, Exception): return [] def _detect_intel_gpus(self) -> List[Dict[str, Any]]: """Detect Intel GPUs via sysfs.""" try: gpus = [] # Intel PCI vendor ID INTEL_VENDOR_ID = "0x8086" # Scan /sys/class/drm/card* for Intel GPUs for card_path in glob.glob("/sys/class/drm/card[0-9]"): try: # Read vendor ID vendor_path = os.path.join(card_path, "device/vendor") if not os.path.exists(vendor_path): continue with open(vendor_path, 'r') as f: vendor_id = f.read().strip() if vendor_id != INTEL_VENDOR_ID: continue # This is an Intel GPU card_name = os.path.basename(card_path) card_idx = int(card_name.replace("card", "")) # Try to get device name from various sources device_name = "Intel GPU" # Try device/label label_path = os.path.join(card_path, "device/label") if os.path.exists(label_path): with open(label_path, 'r') as f: device_name = f.read().strip() else: # Try to parse from modalias or device ID device_path = os.path.join(card_path, "device/device") if os.path.exists(device_path): with open(device_path, 'r') as f: device_id = f.read().strip() # Basic mapping for common Intel GPUs (incomplete) if "0x46" in device_id: device_name = "Intel Iris Xe Graphics" elif "0x9a" in device_id: device_name = "Intel Iris Xe Graphics" elif "0x19" in device_id: device_name = "Intel UHD Graphics" # Try to get driver version driver_version = "Unknown" try: driver_path = os.path.join(card_path, "device/driver/module/version") if os.path.exists(driver_path): with open(driver_path, 'r') as f: driver_version = f.read().strip() except Exception: pass gpus.append({ "id": card_name, "index": card_idx, "name": device_name, "driver_version": driver_version, "uuid": "", # Intel doesn't provide UUID via sysfs easily "vendor": "Intel" }) except Exception: continue return gpus except Exception: return [] def _get_gpu_metrics(self, static_gpus: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Gather dynamic metrics for detected GPUs.""" nvidia_gpus = [g for g in static_gpus if g['vendor'] == 'NVIDIA'] amd_gpus = [g for g in static_gpus if g['vendor'] == 'AMD'] intel_gpus = [g for g in static_gpus if g['vendor'] == 'Intel'] metrics = [] # NVIDIA Metrics if nvidia_gpus: try: # Add memory.free to query cmd = [ "nvidia-smi", "--query-gpu=index,utilization.gpu,utilization.memory,memory.total,memory.used,memory.free", "--format=csv,noheader,nounits" ] ret = subprocess.check_output(cmd, text=True) # Parse n_metrics = {} for line in ret.strip().splitlines(): if not line.strip(): continue parts = [x.strip() for x in line.split(',')] idx = int(parts[0]) n_metrics[idx] = { "util_gpu": float(parts[1]), "util_mem": float(parts[2]), "mem_total": int(parts[3]), "mem_used": int(parts[4]), "mem_free": int(parts[5]) } # Merge for g in nvidia_gpus: idx = g['index'] m = n_metrics.get(idx, {}) metrics.append({**g, **m}) except Exception: metrics.extend(nvidia_gpus) # AMD Metrics if amd_gpus: try: # rocm-smi --showuse --showmeminfo vram --json cmd = ["rocm-smi", "--showuse", "--showmeminfo", "vram", "--json"] ret = subprocess.check_output(cmd, text=True) data = json.loads(ret) for g in amd_gpus: card = g['id'] # card0 info = data.get(card, {}) # Parse utilization try: util_gpu = float(info.get("GPU use (%)", 0)) except Exception: util_gpu = 0.0 # Parse memory (Assuming Bytes usually, check output carefully in prod) # rocm-smi JSON output format varies. try: mem_tot = int(info.get("VRAM Total Memory (B)", 0)) mem_used = int(info.get("VRAM Total Used Memory (B)", 0)) metrics.append({ **g, "util_gpu": util_gpu, "util_mem": 0.0, "mem_total": mem_tot // (1024 * 1024), "mem_used": mem_used // (1024 * 1024), "mem_free": (mem_tot - mem_used) // (1024 * 1024) }) except Exception: metrics.append(g) except Exception: metrics.extend(amd_gpus) # Intel Metrics # Intel GPUs don't have standard CLI tool like nvidia-smi # Return static info only for now if intel_gpus: metrics.extend(intel_gpus) return metrics
[docs] def get_metrics(self) -> Dict[str, Any]: """Collect current system metrics.""" self._ensure_detected() # --- System --- boot_time = psutil.boot_time() uptime = time.time() - boot_time unet = platform.uname() metrics = { "system": { "hostname": socket.gethostname(), "uptime": uptime, "kernel": f"{unet.system} {unet.release}", "arch": unet.machine } } # --- CPU --- # psutil.cpu_freq() might fail on some container envs freq = None try: freq_struct = psutil.cpu_freq() freq = freq_struct.current if freq_struct else None except Exception: pass metrics["cpu"] = { "model": self._cpu_model, "vendor": platform.processor(), # Simplistic "cores_physical": psutil.cpu_count(logical=False) or 0, "cores_logical": psutil.cpu_count(logical=True) or 0, "percent": psutil.cpu_percent(interval=None), # Non-blocking (requires previous call for accuracy) "load_avg": list(os.getloadavg()) if hasattr(os, "getloadavg") else [], "freq_mhz": freq } # --- Memory --- mem = psutil.virtual_memory() metrics["memory"] = { "total": mem.total, "available": mem.available, "percent": mem.percent, "used": mem.used } # --- Disk --- # Shared/network filesystem types common on HPC systems NETWORK_FSTYPES = { # Standard network filesystems "nfs", "nfs4", "cifs", "smb", "smbfs", # Parallel/HPC filesystems "lustre", "gpfs", "beegfs", "pvfs2", "orangefs", "glusterfs", "cephfs", "afs", # Cray-specific "dvs", } disks = [] for part in psutil.disk_partitions(all=False): # Skip pseudo filesystems if possible (handled by all=False usually) if "sqsh" in part.opts or "snap" in part.mountpoint: continue try: usage = psutil.disk_usage(part.mountpoint) # Heuristic for filesystem type vs disk type fstype = part.fstype disk_type = "unknown" if fstype in NETWORK_FSTYPES: disk_type = "shared" elif part.device.startswith("/dev/"): sub_type = self._detect_disk_type(part.device) disk_type = f"local {sub_type}" disks.append({ "mount": part.mountpoint, "device": part.device, "fstype": fstype, "type": disk_type, "total": usage.total, "used": usage.used, "percent": usage.percent }) except PermissionError: continue metrics["disks"] = disks # --- Network --- net_io = psutil.net_io_counters(pernic=True) net_addrs = psutil.net_if_addrs() net_stats = psutil.net_if_stats() networks = [] for iface, addrs in net_addrs.items(): # Skip loopback if iface == "lo": continue # Verify UP state if iface in net_stats and not net_stats[iface].isup: continue ip_addr = None mac_addr = None for addr in addrs: if addr.family == socket.AF_INET: ip_addr = addr.address elif addr.family == psutil.AF_LINK: mac_addr = addr.address if not ip_addr: continue io = net_io.get(iface) speed = net_stats[iface].speed if iface in net_stats else 0 networks.append({ "interface": iface, "ip": ip_addr, "mac": mac_addr, "rx_bytes": io.bytes_recv if io else 0, "tx_bytes": io.bytes_sent if io else 0, "speed_mbps": speed }) metrics["network"] = networks # --- GPU --- metrics["gpus"] = self._get_gpu_metrics(self._gpu_info) return metrics
from .plugin_session_base import PluginSession
[docs] class SysInfoSession(PluginSession): """ SysInfo session (Service-side). Provides methods to gather system metrics. """ def __init__(self, sid: str, provider: SysInfoProvider): super().__init__(sid) self._provider = provider
[docs] async def get_metrics(self) -> dict: """ Return current system metrics. """ self._check_active() return self._provider.get_metrics()
[docs] class SysInfoClient(PluginClient): """ Client-side interface for the SysInfo plugin. """
[docs] def homedir(self) -> str: """Return the home directory of the edge-side process. No session is required. """ resp = self._http.get(self._url('homedir')) self._raise(resp, 'homedir') return resp.json()['homedir']
[docs] def get_metrics(self) -> dict: """ Return current system metrics. """ self._require_session() url = self._url(f"metrics/{self.sid}") resp = self._http.get(url) self._raise(resp) return resp.json()
[docs] class PluginSysInfo(Plugin): """ SysInfo plugin for Radical Edge. Provides system hardware configuration and resource utilization metrics. """ plugin_name = "sysinfo" session_class = SysInfoSession client_class = SysInfoClient version = '0.0.1' ui_config = { "icon": "🖥️", "title": "System Info", "description": "Live CPU, memory, disk, network and GPU metrics.", "refresh_button": True, "monitors": [{ "id": "metrics", "title": "System Metrics", "type": "metrics", "css_class": "sysinfo-content", "auto_load": "metrics/{sid}" }] } def __init__(self, app: FastAPI): """ Initialize the SysInfo plugin. """ super().__init__(app, 'sysinfo') self._provider = SysInfoProvider() # Start background prefetch for hardware detection self._provider.start_prefetch() # Register routes self.add_route_get('homedir', self.homedir_endpoint) self.add_route_get('metrics/{sid}', self.get_metrics_endpoint) def _create_session(self, sid: str, **kwargs) -> SysInfoSession: """ Custom session creation to pass the provider. """ return SysInfoSession(sid, self._provider)
[docs] async def homedir_endpoint(self, request: Request) -> JSONResponse: """Return the home directory of the edge-side process.""" return JSONResponse({'homedir': os.path.expanduser('~')})
[docs] async def get_metrics_endpoint(self, request: Request) -> JSONResponse: """ Return current system metrics for the specified session. """ sid = request.path_params['sid'] return await self._forward(sid, SysInfoSession.get_metrics)