Source code for radical.edge.plugin_xgfabric

'''
XGFabric Plugin for Radical Edge.

Orchestrates CFDaAI workflows across multiple HPC clusters. Provides:
- Configuration management (load/save workflow configs)
- Workflow execution (start/stop/status)
- Real-time progress notifications via SSE

The plugin runs on a local edge and communicates with remote edges
(UCSB, Perlmutter) via the bridge.
'''

import asyncio
import csv
import dataclasses
import json
import logging
import os
import re
import shutil
import subprocess
import urllib.parse
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional

import httpx

from fastapi import FastAPI, HTTPException, Request
from starlette.responses import JSONResponse

from .plugin_session_base import PluginSession
from .plugin_base import Plugin
from .client import PluginClient

log = logging.getLogger("radical.edge")


# -----------------------------------------------------------------------------
# Configuration Dataclasses
# -----------------------------------------------------------------------------

[docs] @dataclass class ResourceConfig: """Resource configuration — bridge connection and per-cluster scheduler settings.""" name: str = "default" bridge_url: str = "https://localhost:8000" bridge_cert: Optional[str] = None # Per-edge overrides used when submitting pilots via psij. # Keys are edge names; values are dicts with any of: # queue, account, duration, nodes, executor, workflow_path, custom_attributes cluster_configs: Dict[str, Dict] = field(default_factory=dict)
[docs] def dict_to_resource_config(d: Dict) -> 'ResourceConfig': """Convert dict to ResourceConfig, ignoring unknown fields.""" valid = {f.name for f in dataclasses.fields(ResourceConfig)} return ResourceConfig(**{k: v for k, v in d.items() if k in valid})
[docs] @dataclass class WorkflowConfig: """Workflow configuration — task templates and execution parameters.""" name: str = "default" description: str = "" # Paths local_workspace: str = "/tmp/xgfabric_workspace" # CSPOT cspot_woof_url: str = "woof://128.111.45.61/davisstations/daviscupsout" cspot_limit: int = 10 # Workflow num_simulations: int = 16 batch_size: int = 4 train_models: List[str] = field(default_factory=lambda: ["pcr", "pinn", "fno"]) # Task templates # Simulation task: {workflow_path}, {sim_output_dir}, {wind_speed}, {sim_id}, {wind_dir} simulation_task: Optional[Dict] = None # Training tasks: model-name → task spec; placeholders: {workflow_path}, # {sensor_dir}, {sim_dir}, {output_dir}, {model} training_tasks: Dict[str, Dict] = field(default_factory=dict) # Evaluation task: {workflow_path}, {sensor_file}, {eval_output} evaluation_task: Optional[Dict] = None # Test/debug: skip CSPOT and generate synthetic sensor data mock_sensor_data: bool = False
[docs] def config_to_dict(cfg: WorkflowConfig) -> Dict: """Convert config to JSON-serializable dict.""" return asdict(cfg)
[docs] def dict_to_config(d: Dict) -> WorkflowConfig: """Convert dict to WorkflowConfig, filtering unknown fields.""" # Get valid field names from the dataclass valid_fields = {f.name for f in dataclasses.fields(WorkflowConfig)} # Filter to only valid fields filtered = {k: v for k, v in d.items() if k in valid_fields} # Convert string numbers to int where needed for int_field in ('cspot_limit', 'num_simulations', 'batch_size'): if int_field in filtered and isinstance(filtered[int_field], str): filtered[int_field] = int(filtered[int_field]) return WorkflowConfig(**filtered)
# ----------------------------------------------------------------------------- # Workflow State # -----------------------------------------------------------------------------
[docs] @dataclass class ClusterStatus: """Status of a single cluster.""" name: str edge_name: str cluster_type: str # 'immediate' or 'allocate' has_gpu: bool = False online: bool = False tasks_running: int = 0 pilot_job_id: Optional[str] = None pilot_status: Optional[str] = None # 'pending', 'running', 'completed', 'failed'
[docs] @dataclass class WorkflowState: """Runtime state of a workflow execution.""" status: str = 'idle' # idle, running, completed, failed phase: str = '' progress: int = 0 message: str = '' start_time: Optional[str] = None end_time: Optional[str] = None error: Optional[str] = None active_cluster: Optional[str] = None completed_simulations: int = 0 total_simulations: int = 0 current_batch: int = 0 total_batches: int = 0 # Pilot job tracking pilot_jobs: Dict[str, str] = field(default_factory=dict) # Cluster status immediate_clusters: List[Dict] = field(default_factory=list) allocate_clusters: List[Dict] = field(default_factory=list) # Execution log (most recent first) log: List[Dict] = field(default_factory=list) # Config info config_name: Optional[str] = None config_dir: Optional[str] = None
# ----------------------------------------------------------------------------- # Session # -----------------------------------------------------------------------------
[docs] class XGFabricSession(PluginSession): """ XGFabric session - manages workflow configuration and execution. """ def __init__(self, sid: str, workdir: Optional[str] = None, edge_name: Optional[str] = None, bridge_url: Optional[str] = None, bridge_cert: Optional[str] = None): super().__init__(sid) default_workdir = os.environ.get('XGFABRIC_WORKDIR') or os.getcwd() self._workdir = Path(workdir or default_workdir) self._workdir.mkdir(parents=True, exist_ok=True) self._config_dir = self._workdir / 'configs' self._config_dir.mkdir(exist_ok=True) self._edge_name = edge_name or 'local' self._bridge_url = bridge_url self._bridge_cert = bridge_cert self._connected_edges: Dict[str, Any] = {} # Cached connected edges self._current_config: Optional[WorkflowConfig] = None self._current_resource_config: Optional[ResourceConfig] = None self._state = WorkflowState() self._workflow_task: Optional[asyncio.Task] = None self._cancel_requested = False # Bridge client for communicating with other edges self._bc = None # Cache of resolved home dirs per edge (populated on first use) self._homedir_cache: Dict[str, str] = {} # Active rhapsody client + pending task UIDs for the current batch (for cleanup) self._pending_tasks: Optional[tuple] = None # (rhapsody_client, set[uid]) def _verify(self) -> Any: """Return SSL verification argument for httpx calls.""" return self._bridge_cert if self._bridge_cert else False async def _http_get(self, url: str, **kwargs) -> Any: """Run httpx.get in a thread with appropriate SSL verification.""" verify = self._verify() return await asyncio.to_thread(lambda: httpx.get(url, verify=verify, **kwargs)) async def _http_post(self, url: str, **kwargs) -> Any: """Run httpx.post in a thread with appropriate SSL verification.""" verify = self._verify() return await asyncio.to_thread(lambda: httpx.post(url, verify=verify, **kwargs)) async def _resolve_path(self, edge_name: str, path: str) -> str: """Expand a leading '~' to the home directory on the remote edge.""" if not path.startswith('~'): return path if edge_name not in self._homedir_cache: url = f"{self._bridge_url.rstrip('/')}/{edge_name}/sysinfo/homedir" try: resp = await self._http_get(url, timeout=5) self._homedir_cache[edge_name] = resp.json().get('homedir', '~') except Exception as e: log.warning("[XGFabric] _resolve_path(%s): failed — %s", edge_name, e) return path return path.replace('~', self._homedir_cache[edge_name], 1)
[docs] def update_connected_edges(self, edges: Dict[str, Any]): """Update the cached list of connected edges.""" log.debug("[XGFabric] Session %s: topology update — %d edges: %s", self._sid, len(edges), list(edges.keys())) self._connected_edges = edges
# ------------------------------------------------------------------------- # Config Directory Management # -------------------------------------------------------------------------
[docs] async def get_config_dir(self) -> Dict: """Get current config directory.""" return {'path': str(self._config_dir.parent)}
[docs] async def set_config_dir(self, path: str) -> Dict: """Set config directory.""" new_dir = Path(path) if not new_dir.exists(): raise HTTPException(status_code=400, detail=f"Directory not found: {path}") self._workdir = new_dir self._config_dir = new_dir / 'configs' self._config_dir.mkdir(exist_ok=True) return {'path': str(self._workdir), 'status': 'ok'}
# ------------------------------------------------------------------------- # Config Management # -------------------------------------------------------------------------
[docs] async def list_configs(self) -> List[Dict]: """List all saved configurations.""" configs = [] for f in self._config_dir.glob('*.json'): try: with open(f) as fp: data = json.load(fp) configs.append({ 'name': f.stem, 'description': data.get('description', ''), 'modified': datetime.fromtimestamp(f.stat().st_mtime).isoformat() }) except Exception as e: log.warning("Failed to read config %s: %s", f, e) return sorted(configs, key=lambda x: x['name'])
[docs] async def load_config(self, name: str) -> Dict: """Load a workflow config by name, path, or builtin alias ('default', 'test').""" _builtins = { 'default': 'xgfabric_workflow_default.json', 'test': 'xgfabric_workflow_test.json', } if name in _builtins: return self._load_builtin_config(_builtins[name]) p = Path(name) if p.is_absolute() or p.exists(): config_file = p if p.suffix else p.with_suffix('.json') else: config_file = self._config_dir / (name if name.endswith('.json') else f'{name}.json') if not config_file.exists(): raise HTTPException(status_code=404, detail=f"Config '{name}' not found") with open(config_file) as f: data = json.load(f) self._current_config = dict_to_config(data) return data
[docs] async def save_config(self, data: Dict) -> Dict: """Save a configuration.""" name = data.get('name', 'default') if not name: raise HTTPException(status_code=400, detail="Config name is required") # Convert to WorkflowConfig (filters out UI-only fields) and back to dict self._current_config = dict_to_config(data) clean_data = config_to_dict(self._current_config) config_file = self._config_dir / f'{name}.json' with open(config_file, 'w') as f: json.dump(clean_data, f, indent=2) return {'status': 'saved', 'name': name}
[docs] async def delete_config(self, name: str) -> Dict: """Delete a configuration.""" config_file = self._config_dir / f'{name}.json' if not config_file.exists(): raise HTTPException(status_code=404, detail=f"Config '{name}' not found") config_file.unlink() return {'status': 'deleted', 'name': name}
@staticmethod def _load_builtin_config(filename: str) -> Dict: """Load a built-in config JSON from the package data directory.""" data_dir = os.path.join(os.path.dirname(__file__), 'data') with open(os.path.join(data_dir, filename)) as f: return json.load(f) # ------------------------------------------------------------------------- # Workflow Control # -------------------------------------------------------------------------
[docs] async def get_status(self) -> Dict: """Get current workflow status including cluster info.""" # Always update config_dir self._state.config_dir = str(self._workdir) # Update config name if loaded if self._current_config: self._state.config_name = self._current_config.name # Query connected edges from bridge (if not running a workflow) if self._state.status != 'running': immediate, allocate = await self._get_connected_edges() log.info("[XGFabric] get_status: immediate=%s allocate=%s", [c['name'] for c in immediate], [c['name'] for c in allocate]) self._state.immediate_clusters = immediate self._state.allocate_clusters = allocate else: log.info("[XGFabric] get_status: workflow running — skipping cluster refresh " "(immediate=%s allocate=%s)", [c['name'] for c in self._state.immediate_clusters], [c['name'] for c in self._state.allocate_clusters]) return asdict(self._state)
async def _is_enabled(self, edge_name: str) -> bool: """Check whether an edge's queue_info plugin reports a working scheduler.""" if not self._bridge_url: log.info("[XGFabric] _is_enabled(%s): no bridge_url — returning False", edge_name) return False url = self._bridge_url.rstrip('/') + f'/{edge_name}/queue_info/is_enabled' try: resp = await self._http_get(url, timeout=5) result = resp.json().get('available', False) log.info("[XGFabric] _is_enabled(%s): available=%s", edge_name, result) return result except Exception as e: log.info("[XGFabric] _is_enabled(%s): request failed — %s", edge_name, e) return False async def _get_connected_edges(self) -> tuple[List[Dict], List[Dict]]: """Return (immediate, allocate) cluster lists from cache or bridge query. Edges with the queue_info plugin AND a working scheduler go into allocate_clusters; all others go into immediate_clusters. """ rc = self._current_resource_config def _cluster(edge_name: str) -> Dict: base = {'name': edge_name, 'edge_name': edge_name, 'has_gpu': False, 'online': True, 'tasks_running': 0} if rc: base.update(rc.cluster_configs.get(edge_name, {})) return base async def _classify(edges_info: Dict) -> tuple[List[Dict], List[Dict]]: """Classify edges_info dict into (immediate, allocate).""" immediate, allocate = [], [] for edge_name, edge_info in edges_info.items(): plugins = edge_info.get('plugins', []) # ucsb edges are always immediate (no batch scheduler available) if 'ucsb' in edge_name: log.info("[XGFabric] %s -> immediate (ucsb)", edge_name) immediate.append(_cluster(edge_name)) elif 'queue_info' in plugins and await self._is_enabled(edge_name): log.info("[XGFabric] %s -> allocate (queue_info enabled)", edge_name) allocate.append(_cluster(edge_name)) else: log.info("[XGFabric] %s -> immediate (no scheduler)", edge_name) immediate.append(_cluster(edge_name)) return immediate, allocate # Use cached topology if available (populated by on_topology_change) if self._connected_edges: log.info("[XGFabric] _get_connected_edges: using cached topology (%d edges)", len(self._connected_edges)) return await _classify(self._connected_edges) # Fallback: query bridge for full plugin info and classify the same way log.info("[XGFabric] _get_connected_edges: no cached topology — querying bridge " "(bridge_url=%s)", self._bridge_url) if not self._bridge_url: return [], [] try: resp = await self._http_post( f"{self._bridge_url.rstrip('/')}/edge/list", timeout=5) data = resp.json().get('data', {}) edges_info = {name: {'plugins': list(info.get('plugins', {}).keys())} for name, info in data.get('edges', {}).items()} log.info("[XGFabric] _get_connected_edges: bridge returned %d edges: %s", len(edges_info), list(edges_info.keys())) return await _classify(edges_info) except Exception as e: log.info("[XGFabric] _get_connected_edges: bridge query failed — %s", e) return [], []
[docs] async def start_workflow(self, workflow: str = '__default__', resource: str = '__default__') -> Dict: """Start workflow execution.""" log.info("[XGFabric] start_workflow: workflow=%s resource=%s status=%s", workflow, resource, self._state.status) if self._state.status == 'running': raise HTTPException(status_code=409, detail="Workflow already running") # Load workflow config (handles 'default', 'test', and user configs) self._current_config = dict_to_config(await self.load_config(workflow)) # Load resource config self._current_resource_config = dict_to_resource_config( await self._load_resource_config(resource)) cfg = self._current_config self._state = WorkflowState( status='running', phase='initializing', start_time=datetime.now(timezone.utc).isoformat(), config_name=cfg.name, config_dir=str(self._workdir), total_simulations=cfg.num_simulations, total_batches=(cfg.num_simulations + cfg.batch_size - 1) // cfg.batch_size, immediate_clusters=self._state.immediate_clusters, allocate_clusters=self._state.allocate_clusters, ) self._cancel_requested = False # Start workflow in background self._workflow_task = asyncio.create_task(self._run_workflow()) return {'status': 'started', 'config': cfg.name}
async def _load_resource_config(self, name: str) -> Dict: """Load a resource config by name or builtin alias ('default', 'test').""" _builtins = { 'default': 'xgfabric_resource_default.json', '__default__': 'xgfabric_resource_default.json', 'test': 'xgfabric_resource_test.json', '__test__': 'xgfabric_resource_test.json', } if name in _builtins: return self._load_builtin_config(_builtins[name]) p = Path(name) config_file = (p if p.suffix else p.with_suffix('.json')) \ if (p.is_absolute() or p.exists()) \ else self._config_dir / (name if name.endswith('.json') else f'{name}.json') if not config_file.exists(): raise HTTPException(status_code=404, detail=f"Resource config '{name}' not found") with open(config_file) as f: return json.load(f)
[docs] async def stop_workflow(self) -> Dict: """Stop running workflow.""" if self._state.status != 'running': raise HTTPException(status_code=409, detail="No workflow running") self._cancel_requested = True self._state.message = "Cancellation requested..." if self._workflow_task: self._workflow_task.cancel() try: await self._workflow_task except asyncio.CancelledError: pass return {'status': 'stopped'}
# ------------------------------------------------------------------------- # Workflow Execution # ------------------------------------------------------------------------- async def _run_workflow(self): """Execute the complete workflow.""" try: await self._execute_workflow() self._state.status = 'completed' self._state.phase = 'done' self._state.message = 'Workflow completed successfully' self._state.end_time = datetime.now(timezone.utc).isoformat() self._notify_state() except asyncio.CancelledError: self._state.status = 'failed' self._state.error = 'Workflow cancelled by user' self._state.end_time = datetime.now(timezone.utc).isoformat() await self._cleanup_on_failure() self._notify_state() except Exception as e: log.exception("Workflow failed: %s", e) self._state.status = 'failed' self._state.error = str(e) self._state.end_time = datetime.now(timezone.utc).isoformat() await self._cleanup_on_failure() self._notify_state() async def _execute_workflow(self): """Main workflow execution logic.""" if not self._current_config: raise RuntimeError("No active workflow configuration") cfg = self._current_config rc = self._current_resource_config log.info("[XGFabric] _execute_workflow: starting — session.bridge_url=%s", self._bridge_url) # Always prefer the live bridge URL from the session over the resource config — # the saved URL may be stale (e.g. localhost vs public IP). bridge_url = self._bridge_url or (rc.bridge_url if rc else 'https://localhost:8000') bridge_cert = self._bridge_cert or (rc.bridge_cert if rc else None) log.info("[XGFabric] _execute_workflow: effective bridge_url=%s cert=%s", bridge_url, bridge_cert) self._update_state('connecting', 'Connecting to bridge...') from .client import BridgeClient self._bc = BridgeClient(url=bridge_url, cert=bridge_cert) # Discover which clusters are connected right now (always live, ignores config) self._update_state('verifying', 'Verifying edges...') immediate_list, allocate_list = await self._get_connected_edges() log.info("[XGFabric] _execute_workflow: immediate=%s allocate=%s", [c['name'] for c in immediate_list], [c['name'] for c in allocate_list]) immediate = immediate_list[0] if immediate_list else None allocate = allocate_list[0] if allocate_list else None if not immediate: raise RuntimeError( f"No immediate cluster is connected " f"(online immediate: {[c['edge_name'] for c in immediate_list]}, " f"online allocate: {[c['edge_name'] for c in allocate_list]})" ) # Phase 1: Data acquisition self._update_state('data_acquisition', 'Fetching sensor data from CSPOT...') workspace = Path(cfg.local_workspace) workspace.mkdir(parents=True, exist_ok=True) sensor_csv = await self._acquire_sensor_data(workspace) # Phase 2: Submit pilot (async) if allocate: self._update_state('pilot_submit', f"Submitting pilot job to {allocate['name']}...") pilot_id = await self._submit_pilot(allocate, bridge_url) self._state.pilot_jobs[allocate['name']] = pilot_id # Phase 3: Stage data and run simulations self._update_state('staging', f"Staging data to {immediate['name']}...") await self._stage_sensor_data(immediate, sensor_csv) self._update_state('simulations', f"Running simulations on {immediate['name']}...") self._state.total_simulations = cfg.num_simulations sim_results = await self._run_simulations(immediate, sensor_csv, allocate) # Phase 4: Migration decision self._update_state('migration_check', 'Checking for GPU cluster...') active_cluster = immediate if allocate and await self._is_edge_online(allocate): self._update_state('migration', f"Migrating to {allocate['name']}...") await self._migrate_data(immediate, allocate, sim_results) active_cluster = allocate self._state.active_cluster = active_cluster['name'] # Phase 5: Training self._update_state('training', f"Running ML training on {active_cluster['name']}...") await self._run_training(active_cluster, sim_results) # Phase 6: Evaluation self._update_state('evaluation', f"Running evaluation on {active_cluster['name']}...") await self._run_evaluation(active_cluster) # Done self._state.progress = 100 def _update_state(self, phase: str, message: str, progress: Optional[int] = None): """Update workflow state, add log entry, and send notification.""" self._state.phase = phase self._state.message = message if progress is not None: self._state.progress = progress self._add_log(message) self._notify_state() def _add_log(self, message: str, level: str = 'info'): """Add entry to execution log (most recent first, max 50 entries).""" entry = { 'time': datetime.now(timezone.utc).strftime('%H:%M:%S'), 'level': level, 'message': message, } self._state.log.insert(0, entry) if len(self._state.log) > 50: self._state.log = self._state.log[:50] def _log_task_error(self, label: str, t: dict, task_spec: Optional[Dict] = None): """Log a task failure immediately and notify clients.""" state = t.get('state', '?') exit_code = t.get('exit_code') exception = (t.get('exception') or '').strip() stdout = (t.get('stdout') or '').strip() stderr = (t.get('stderr') or '').strip() msg = f"FAILED {label}: state={state} exit={exit_code}" if task_spec: cmd = task_spec.get('executable', '') args = ' '.join(str(a) for a in task_spec.get('arguments', [])) msg += f" | cmd: {cmd} {args}"[:120] if exception: msg += f" | exception: {exception[:200]}" if stderr: msg += f" | stderr: {stderr[:200]}" if stdout and not stderr: msg += f" | stdout: {stdout[:200]}" log.warning("[XGFabric] %s", msg) self._add_log(msg, level='error') self._state.error = msg self._notify_state() def _notify_state(self): """Send state notification via SSE.""" log.info("[XGFabric] _notify_state: phase=%s status=%s immediate=%s allocate=%s", self._state.phase, self._state.status, [c['name'] for c in self._state.immediate_clusters], [c['name'] for c in self._state.allocate_clusters]) if self._plugin: self._plugin._dispatch_notify('workflow_status', asdict(self._state)) async def _is_edge_online(self, cluster: Dict) -> bool: """Check if cluster's child edge is online.""" if not self._bc: raise RuntimeError("No active bridge connection") edge_name = cluster.get('child_edge_name') or cluster['edge_name'] edges = await asyncio.to_thread(self._bc.list_edges) return edge_name in edges def _get_plugin(self, cluster: Dict, plugin_name: str) -> Any: """Get plugin client for a cluster.""" if not self._bc: raise RuntimeError("No active bridge connection") edge_name = cluster.get('child_edge_name') or cluster['edge_name'] ec = self._bc.get_edge_client(edge_name) return ec.get_plugin(plugin_name) # ------------------------------------------------------------------------- # Task rendering # ------------------------------------------------------------------------- def _render_task(self, template: Dict, **subs) -> Dict: """Substitute {placeholder} values in a task template dict.""" return { "executable": template["executable"].format_map(subs), "arguments": [str(a).format_map(subs) for a in template.get("arguments", [])], } # ------------------------------------------------------------------------- # Data Acquisition # ------------------------------------------------------------------------- async def _acquire_sensor_data(self, workspace: Path) -> Path: """Fetch sensor data from CSPOT (or generate mock data for testing).""" if not self._current_config: raise RuntimeError("No active workflow configuration") cfg = self._current_config output_dir = workspace / "data" output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / "sensor_out.csv" if cfg.mock_sensor_data: log.info("[XGFabric] _acquire_sensor_data: mock mode — writing synthetic CSV") with open(output_file, 'w', newline='') as f: writer = csv.DictWriter(f, fieldnames=['dt', 'windspeed', 'windavg', 'winddir']) writer.writeheader() for i in range(max(cfg.num_simulations, 8)): writer.writerow({'dt': f'2024-01-01T{i:02d}:00:00+00:00', 'windspeed': 2.0 + i * 0.5, 'windavg': 2.0 + i * 0.5, 'winddir': 90 + i * 5}) self._update_state('data_acquisition', 'Mock sensor data ready (test mode)', 10) return output_file # Find senspot-get senspot_path = self._find_senspot_get() # Validate CSPOT URL parsed = urllib.parse.urlparse(cfg.cspot_woof_url) if parsed.scheme not in ('http', 'https', 'woof', ''): raise ValueError(f"Invalid cspot_woof_url scheme: {cfg.cspot_woof_url}") # Fetch latest sequence number cmd = [senspot_path, '-W', cfg.cspot_woof_url] result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=30 ) if result.returncode != 0: raise RuntimeError(f"senspot-get failed: {result.stderr}") match = re.search(r'seq_no:\s+(\d+)', result.stdout) if not match: raise RuntimeError("Could not parse sequence number from CSPOT") latest_seq = int(match.group(1)) # Collect data backwards records = [] current_seq = latest_seq limit = cfg.cspot_limit skipped = 0 log.info("[XGFabric] _acquire_sensor_data: latest_seq=%d limit=%d woof=%s", latest_seq, limit, cfg.cspot_woof_url) while len(records) < limit and current_seq >= 0: if self._cancel_requested: raise asyncio.CancelledError() cmd = [senspot_path, '-W', cfg.cspot_woof_url, '-S', str(current_seq)] result = await asyncio.to_thread( subprocess.run, cmd, capture_output=True, text=True, timeout=30 ) if result.returncode == 0: output = result.stdout.strip() match = re.search(r'time:\s+([\d.]+)', output) if match: timestamp = float(match.group(1)) dt = datetime.fromtimestamp(timestamp, timezone.utc) parts = output.split() data = parts[0].split(':') if parts else [] if len(data) >= 3: ws = float(data[0]) wa = float(data[1]) wd = float(data[2]) if ws > 50: # mph to m/s ws *= 0.44704 wa *= 0.44704 records.append({ 'dt': dt.isoformat(), 'windspeed': ws, 'windavg': wa, 'winddir': wd }) else: skipped += 1 log.info("[XGFabric] seq %d: not enough fields (%d): %s", current_seq, len(data), output[:80]) else: skipped += 1 log.info("[XGFabric] seq %d: no 'time:' in output: %s", current_seq, output[:80]) else: skipped += 1 log.info("[XGFabric] seq %d: returncode=%d stderr=%s", current_seq, result.returncode, result.stderr.strip()[:80]) current_seq -= 1 # Log progress every 10 iterations if (latest_seq - current_seq) % 10 == 0: log.info("[XGFabric] _acquire_sensor_data: seq=%d records=%d/%d skipped=%d", current_seq, len(records), limit, skipped) # Update progress progress = int(len(records) / limit * 10) # 0-10% for data acquisition self._update_state('data_acquisition', f'Fetched {len(records)}/{limit} sensor records', progress) if not records: raise RuntimeError("No records fetched from CSPOT") # Write CSV with open(output_file, 'w') as f: f.write("dt,windspeed,windavg,winddir\n") for r in records: f.write(f"{r['dt']},{r['windspeed']},{r['windavg']},{r['winddir']}\n") return output_file def _find_senspot_get(self) -> str: """Find senspot-get binary.""" if os.environ.get('SENSPOT_PATH'): path = os.environ['SENSPOT_PATH'] if os.path.isfile(path) and os.access(path, os.X_OK): return path which_path = shutil.which('senspot-get') if which_path: return which_path home = os.path.expanduser('~') candidates = [ f"{home}/bin/senspot-get", f"{home}/common/cspot/build/bin/senspot-get", "/global/common/software/m5290/cspot/build/bin/senspot-get", ] for path in candidates: if os.path.isfile(path) and os.access(path, os.X_OK): return path raise FileNotFoundError("senspot-get not found") # ------------------------------------------------------------------------- # Pilot Job # ------------------------------------------------------------------------- async def _submit_pilot(self, cluster: Dict, bridge_url: str) -> str: """Submit pilot job to spawn child edge.""" if not self._bc: raise RuntimeError("No active bridge connection") ec = self._bc.get_edge_client(cluster['edge_name']) psij: Any = await asyncio.to_thread(ec.get_plugin, 'psij') args = ["--url", bridge_url, "--name", cluster['edge_name'] + ".1"] edge_svc = self._app.state.edge_service plugin_filter = edge_svc._plugin_filter args += ["-p", ",".join(plugin_filter)] pilot_spec = { "executable": "radical-edge-service.sh", "arguments": args, "attributes": { "queue_name": cluster.get('queue', 'regular'), "account": cluster.get('account', ''), "duration": str(cluster.get('duration', 3600)), "node_count": cluster.get('nodes', 1), } } result = await asyncio.to_thread(psij.submit_job, pilot_spec, cluster.get('executor', 'slurm')) return result['job_id'] # ------------------------------------------------------------------------- # Data Staging # ------------------------------------------------------------------------- async def _stage_sensor_data(self, cluster: Dict, sensor_csv: Path): """Stage sensor data to cluster.""" staging = await asyncio.to_thread(self._get_plugin, cluster, 'staging') workflow_path = await self._resolve_path(cluster['edge_name'], cluster['workflow_path']) remote_path = f"{workflow_path}/data/sensor_out.csv" await asyncio.to_thread(staging.put, str(sensor_csv), remote_path, overwrite=True) async def _migrate_data(self, source: Dict, dest: Dict, sim_results: List[str]): """Migrate simulation results between clusters.""" if not sim_results: return if not self._current_config: raise RuntimeError("No active workflow configuration") source_staging = await asyncio.to_thread(self._get_plugin, source, 'staging') dest_staging = await asyncio.to_thread(self._get_plugin, dest, 'staging') staging_dir = Path(self._current_config.local_workspace) / "staging" staging_dir.mkdir(parents=True, exist_ok=True) dest_workflow = await self._resolve_path(dest['edge_name'], dest['workflow_path']) for i, remote_path in enumerate(sim_results): if self._cancel_requested: raise asyncio.CancelledError() filename = Path(remote_path).name local_path = staging_dir / filename await asyncio.to_thread(source_staging.get, remote_path, str(local_path)) await asyncio.to_thread(dest_staging.put, str(local_path), f"{dest_workflow}/simulations/{filename}") progress = 50 + int((i + 1) / len(sim_results) * 10) self._update_state('migration', f'Migrated {i+1}/{len(sim_results)} files', progress) # ------------------------------------------------------------------------- # Simulations # ------------------------------------------------------------------------- async def _run_simulations(self, cluster: Dict, sensor_csv: Path, allocate: Optional[Dict]) -> List[str]: """Run CFD simulations on cluster. Polls Rhapsody wait_tasks (timeout=5s) so the event loop stays free. If the allocate cluster comes online mid-batch the current batch is aborted and we return early so the caller can migrate. """ cfg = self._current_config if not cfg: raise RuntimeError("No active workflow configuration") params = self._generate_sim_params(sensor_csv, cfg.num_simulations) workflow_path = await self._resolve_path(cluster['edge_name'], cluster['workflow_path']) sim_output_dir = f"{workflow_path}/simulations" rhapsody = await asyncio.to_thread(self._get_plugin, cluster, 'rhapsody') if not cfg.simulation_task: raise RuntimeError("Config missing 'simulation_task' — cannot run simulations") # Build tasks, track (ws, sim_id, wd) per index for result-path assembly tasks = [] task_params = [] # parallel to tasks: (wind_speed_str, sim_id_str, wind_dir_str) for wind_speed, wind_dir, sim_id in params: task = self._render_task(cfg.simulation_task, workflow_path=workflow_path, sim_output_dir=sim_output_dir, wind_speed=str(wind_speed), sim_id=str(sim_id), wind_dir=str(wind_dir)) tasks.append(task) task_params.append((str(wind_speed), str(sim_id), str(wind_dir))) completed_results = [] total_batches = (len(tasks) + cfg.batch_size - 1) // cfg.batch_size self._state.total_batches = total_batches TERMINAL = {'COMPLETED', 'FAILED', 'CANCELED', 'CANCELLED', 'ERROR'} abort_for_pilot = False for batch_num, i in enumerate(range(0, len(tasks), cfg.batch_size)): if self._cancel_requested: raise asyncio.CancelledError() batch = tasks[i:i + cfg.batch_size] batch_params = task_params[i:i + cfg.batch_size] self._state.current_batch = batch_num + 1 self._update_state('simulations', f'Running batch {batch_num+1}/{total_batches}...', 15 + int(batch_num / total_batches * 30)) submitted = await asyncio.to_thread(rhapsody.submit_tasks, batch) # uid → expected result path (known at submission time) uid_to_result: dict = {} for j, t in enumerate(submitted): uid = t['uid'] ws, sim_idx, wd = batch_params[j] uid_to_result[uid] = f"{sim_output_dir}/sim_{sim_idx}_ws_{ws}_wd_{wd}.csv" pending = set(uid_to_result) self._pending_tasks = (rhapsody, pending) while pending: if self._cancel_requested: raise asyncio.CancelledError() # Pilot came online — abort remaining tasks and migrate. if allocate and await self._is_edge_online(allocate): log.info("[XGFabric] Pilot online mid-batch (%d pending) " "— aborting to migrate", len(pending)) abort_for_pilot = True break # Poll for up to 5s; returns current state of all requested tasks. try: results = await asyncio.to_thread( rhapsody.wait_tasks, list(pending), 5.0) except Exception as e: log.warning("[XGFabric] wait_tasks error: %s — retrying", e) await asyncio.sleep(1) continue for t in results: uid = t.get('uid') state = str(t.get('state') or '').upper() if uid not in pending: continue if not any(s in state for s in TERMINAL): continue # still running — will be included in next poll pending.discard(uid) exit_code = t.get('exit_code') task_ok = (state == 'COMPLETED') and exit_code in (None, 0) if task_ok: completed_results.append(uid_to_result[uid]) else: j = next((idx for idx, sub in enumerate(submitted) if sub.get('uid') == uid), None) spec = batch[j] if j is not None else None self._log_task_error(f"sim {uid}", t, spec) self._state.completed_simulations += 1 if task_ok: self._notify_state() self._pending_tasks = None if abort_for_pilot: break if not completed_results: raise RuntimeError( f"All {len(tasks)} simulations failed — cannot proceed to training") return completed_results def _generate_sim_params(self, sensor_csv: Path, num_sims: int) -> List: """Generate simulation parameters from sensor data.""" wind_speeds = [] with open(sensor_csv, 'r') as f: reader = csv.DictReader(f) for row in reader: ws = float(row['windspeed']) if 0.5 < ws < 30: wind_speeds.append(ws) if not wind_speeds: raise RuntimeError("No valid wind speeds in sensor data") ws_min, ws_max = min(wind_speeds), max(wind_speeds) step = (ws_max - ws_min) / max(num_sims - 1, 1) return [(round(ws_min + i * step, 2), 0, i) for i in range(num_sims)] # ------------------------------------------------------------------------- # Training # ------------------------------------------------------------------------- async def _run_training(self, cluster: Dict, sim_results: List[str]): """Run ML training on cluster.""" cfg = self._current_config if not cfg: raise RuntimeError("No active workflow configuration") workflow_path = await self._resolve_path(cluster['edge_name'], cluster['workflow_path']) sensor_dir = f"{workflow_path}/data" sim_dir = f"{workflow_path}/simulations" output_dir = f"{workflow_path}/models" rhapsody = await asyncio.to_thread(self._get_plugin, cluster, 'rhapsody') subs = dict(workflow_path=workflow_path, sensor_dir=sensor_dir, sim_dir=sim_dir, output_dir=output_dir) for i, model in enumerate(cfg.train_models): if self._cancel_requested: raise asyncio.CancelledError() if model not in cfg.training_tasks: log.warning("[XGFabric] No training_task defined for model '%s' — skipping", model) continue progress = 60 + int((i + 1) / len(cfg.train_models) * 25) self._update_state('training', f'Training {model.upper()} model...', progress) task = self._render_task(cfg.training_tasks[model], model=model, **subs) submitted = await asyncio.to_thread(rhapsody.submit_tasks, [task]) results = await asyncio.to_thread(rhapsody.wait_tasks, [submitted[0]['uid']]) t = results[0] if results else {} if t.get('exit_code') not in (None, 0) or \ str(t.get('state', '')).upper() != 'COMPLETED': self._log_task_error(f"training/{model}", t, task) raise RuntimeError(f"Training {model} failed (exit={t.get('exit_code')})") # ------------------------------------------------------------------------- # Evaluation # ------------------------------------------------------------------------- async def _run_evaluation(self, cluster: Dict): """Run evaluation metrics computation.""" if not self._current_config: raise RuntimeError("No active workflow configuration") cfg = self._current_config workflow_path = await self._resolve_path(cluster['edge_name'], cluster['workflow_path']) sensor_file = f"{workflow_path}/data/sensor_out.csv" eval_output = f"{workflow_path}/evaluation" rhapsody = await asyncio.to_thread(self._get_plugin, cluster, 'rhapsody') if not cfg.evaluation_task: raise RuntimeError("Config missing 'evaluation_task' — cannot run evaluation") task = self._render_task(cfg.evaluation_task, workflow_path=workflow_path, sensor_file=sensor_file, eval_output=eval_output) self._update_state('evaluation', 'Computing metrics...', 90) submitted = await asyncio.to_thread(rhapsody.submit_tasks, [task]) results = await asyncio.to_thread(rhapsody.wait_tasks, [submitted[0]['uid']]) t = results[0] if results else {} if t.get('exit_code') not in (None, 0) or \ str(t.get('state', '')).upper() != 'COMPLETED': self._log_task_error("evaluation", t, task) raise RuntimeError(f"Evaluation failed (exit={t.get('exit_code')})") # ------------------------------------------------------------------------- # Cleanup # ------------------------------------------------------------------------- async def _cleanup_on_failure(self): """Clean up resources on failure.""" # Cancel any pending rhapsody tasks from the current batch if self._pending_tasks: rhapsody, pending = self._pending_tasks self._pending_tasks = None for uid in list(pending): try: await asyncio.to_thread(rhapsody.cancel_task, uid) log.info("[XGFabric] Cancelled task %s", uid) except Exception as e: log.warning("[XGFabric] Failed to cancel task %s: %s", uid, e) # Cancel pilot jobs for cluster_name, pilot_id in self._state.pilot_jobs.items(): try: # Find the cluster config cfg = self._current_config if not cfg: raise RuntimeError("No active workflow configuration") if not self._bc: raise RuntimeError("No active bridge connection") all_clusters = (self._state.immediate_clusters + self._state.allocate_clusters) for c in all_clusters: if c.get('name') == cluster_name: ec = self._bc.get_edge_client(c['edge_name']) psij = await asyncio.to_thread(ec.get_plugin, 'psij') await asyncio.to_thread(psij.cancel_job, pilot_id) log.info("Cancelled pilot job %s", pilot_id) break except Exception as e: log.warning("Failed to cancel pilot %s: %s", pilot_id, e)
[docs] async def close(self) -> dict: """Close the session.""" if self._workflow_task and not self._workflow_task.done(): self._cancel_requested = True self._workflow_task.cancel() if self._bc: try: self._bc.close() except Exception as e: log.exception("[XGFabric] Error closing bridge client: %s", e) return await super().close()
# ----------------------------------------------------------------------------- # Client # -----------------------------------------------------------------------------
[docs] class XGFabricClient(PluginClient): """Client-side interface for the XGFabric plugin."""
[docs] def get_workdir(self) -> Dict: """Get current working directory.""" resp = self._http.get(self._url(f"workdir/{self.sid}")) self._raise(resp) return resp.json()
[docs] def set_workdir(self, path: str) -> Dict: """Set working directory.""" resp = self._http.post(self._url(f"workdir/{self.sid}"), json={'path': path}) self._raise(resp) return resp.json()
[docs] def list_configs(self) -> List[Dict]: """List all saved configurations.""" resp = self._http.get(self._url(f"configs/{self.sid}")) self._raise(resp) return resp.json()
[docs] def load_config(self, name: str) -> Dict: """Load a configuration by name.""" resp = self._http.get(self._url(f"config/{self.sid}/{name}")) self._raise(resp) return resp.json()
[docs] def save_config(self, config: Dict) -> Dict: """Save a configuration.""" resp = self._http.post(self._url(f"config/{self.sid}"), json=config) self._raise(resp) return resp.json()
[docs] def delete_config(self, name: str) -> Dict: """Delete a configuration.""" resp = self._http.post(self._url(f"config/{self.sid}/{name}/delete")) self._raise(resp) return resp.json()
[docs] def get_default_config(self) -> Dict: """Get default configuration template.""" resp = self._http.get(self._url(f"config/{self.sid}/default")) self._raise(resp) return resp.json()
[docs] def get_test_config(self) -> Dict: """Get test configuration template (stub tasks, no CSPOT required).""" resp = self._http.get(self._url(f"config/{self.sid}/test")) self._raise(resp) return resp.json()
[docs] def get_status(self) -> Dict: """Get current workflow status.""" resp = self._http.get(self._url(f"status/{self.sid}")) self._raise(resp) return resp.json()
[docs] def start_workflow(self, workflow: str = '__default__', resource: str = '__default__') -> Dict: """Start workflow execution.""" resp = self._http.post(self._url(f"start/{self.sid}"), json={'workflow': workflow, 'resource': resource}) self._raise(resp) return resp.json()
[docs] def stop_workflow(self) -> Dict: """Stop running workflow.""" resp = self._http.post(self._url(f"stop/{self.sid}")) self._raise(resp) return resp.json()
# ----------------------------------------------------------------------------- # Plugin # -----------------------------------------------------------------------------
[docs] class PluginXGFabric(Plugin): """ XGFabric plugin for Radical Edge. Orchestrates CFDaAI workflows across multiple HPC clusters. Provides configuration management and workflow execution via REST API. """ plugin_name = "xgfabric" session_class = XGFabricSession client_class = XGFabricClient version = '0.1.0' ui_config = { "icon": "🌊", "title": "XGFabric Workflow", "description": "CFDaAI workflow orchestrator for HPC clusters.", "custom_template": True, } def __init__(self, app: FastAPI, workdir: Optional[str] = None): super().__init__(app, 'xgfabric') self._workdir = workdir or os.environ.get('XGFABRIC_WORKDIR') or os.getcwd() self._connected_edges: Dict[str, Any] = {} # Cache of connected edges # Config directory endpoints self.add_route_get('workdir/{sid}', self.get_workdir) self.add_route_post('workdir/{sid}', self.set_workdir) # Config endpoints self.add_route_get('configs/{sid}', self.list_configs) self.add_route_get('config/{sid}/default', self.get_default_config) self.add_route_get('config/{sid}/test', self.get_test_config) self.add_route_get('config/{sid}/{name}', self.load_config) self.add_route_post('config/{sid}', self.save_config) self.add_route_post('config/{sid}/{name}/delete', self.delete_config) # Workflow endpoints self.add_route_get('status/{sid}', self.get_status) self.add_route_post('start/{sid}', self.start_workflow) self.add_route_post('stop/{sid}', self.stop_workflow) def _create_session(self, sid: str, **_) -> XGFabricSession: """Create session with workdir, edge name, and bridge connection info.""" edge_name = getattr(self._app.state, 'edge_name', 'local') # Get bridge URL from edge service edge_service = getattr(self._app.state, 'edge_service', None) bridge_url = getattr(edge_service, '_bridge_url', None) if edge_service else None bridge_cert = os.environ.get('RADICAL_BRIDGE_CERT') log.info("[XGFabric] _create_session: sid=%s edge=%s bridge_url=%s cached_edges=%s", sid, edge_name, bridge_url, list(self._connected_edges.keys())) # Use super() so the base class injects the _notify callback into the session session = super()._create_session(sid, workdir=self._workdir, edge_name=edge_name, bridge_url=bridge_url, bridge_cert=bridge_cert) if not isinstance(session, XGFabricSession): raise RuntimeError(f"Expected XGFabricSession, got {type(session).__name__}") # Seed session with current topology so get_status() classifies edges correctly if self._connected_edges: session.update_connected_edges(self._connected_edges) return session
[docs] async def on_topology_change(self, edges: dict): """Handle topology updates from the bridge.""" prev = set(self._connected_edges or {}) curr = set(edges) self._connected_edges = edges for name in curr - prev: plugins = list(edges[name].get('plugins', [])) log.info("[XGFabric] Edge connected: %s plugins=%s", name, plugins) for name in prev - curr: log.info("[XGFabric] Edge disconnected: %s", name) # Update all active sessions and push updated cluster list to clients for session in self._sessions.values(): if isinstance(session, XGFabricSession): session.update_connected_edges(edges) if session._plugin: session._notify_state()
# -- Route handlers -------------------------------------------------------
[docs] async def get_workdir(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, XGFabricSession.get_config_dir)
[docs] async def set_workdir(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] data = await request.json() path = data.get('path', '') return await self._forward(sid, XGFabricSession.set_config_dir, path=path)
[docs] async def list_configs(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, XGFabricSession.list_configs)
[docs] async def get_default_config(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, XGFabricSession.load_config, name='default')
[docs] async def get_test_config(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, XGFabricSession.load_config, name='test')
[docs] async def load_config(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] name = request.path_params['name'] return await self._forward(sid, XGFabricSession.load_config, name=name)
[docs] async def save_config(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] data = await request.json() return await self._forward(sid, XGFabricSession.save_config, data=data)
[docs] async def delete_config(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] name = request.path_params['name'] return await self._forward(sid, XGFabricSession.delete_config, name=name)
[docs] async def get_status(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, XGFabricSession.get_status)
[docs] async def start_workflow(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] data = await request.json() # Accept both new-style {workflow, resource} and legacy {config_name} from explorer workflow = data.get('workflow') or data.get('config_name') or '__default__' resource = data.get('resource', '__default__') return await self._forward(sid, XGFabricSession.start_workflow, workflow=workflow, resource=resource)
[docs] async def stop_workflow(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, XGFabricSession.stop_workflow)