4. Plugin API Reference

This section documents all available plugins in the Radical Edge system.

4.1. Base Classes

4.1.1. PluginClient

4.1.2. ClientManagedPlugin

4.1.3. Plugin

class radical.edge.plugin_base.Plugin(app: FastAPI, instance_name: str)[source]

Bases: object

Base class for Edge plugins.

Each plugin gets its own URL namespace and built-in session management. Routes are added with add_route_post / add_route_get.

plugin_name vs instance_name

plugin_name is a class-level attribute that uniquely identifies the plugin type (e.g. "psij", "queue_info"). It is the key used in the global Plugin._registry and in client-side lookups (edge.get_plugin("psij")).

instance_name is set at construction time (defaults to plugin_name when only one instance is needed) and drives the URL namespace: /{instance_name}/…. Multiple instances of the same plugin type on the same edge must be given distinct instance names.

Subclasses that define a plugin_name class attribute will be automatically registered in the global plugin registry.

Subclasses must define:

session_class: The session class to instantiate (must inherit from PluginSession)

Subclasses may define:

client_class: The local helper class for the application-side client. version: The version string for the plugin. session_ttl: Session timeout in seconds (default: 3600 = 1 hour, 0 = no timeout) ui_config: UI configuration dict for portal rendering (see ui_schema.py)

4. Notifications

Plugins can send real-time notifications to clients via Server-Sent Events (SSE). The notification flow is: Session -> Plugin -> EdgeService -> Bridge -> SSE clients.

Sending notifications from a session:

# In your PluginSession subclass method: if self._plugin:

self._plugin._dispatch_notify(“my_topic”, {“key”: “value”, “status”: “running”})

The _plugin reference is automatically injected into sessions by the plugin. _dispatch_notify works from both sync and async contexts, including background threads.

Sending notifications from a plugin:

# In your Plugin subclass method: await self.send_notification(“my_topic”, {“key”: “value”})

Subscribing to notifications (browser/JavaScript):

const eventSource = new EventSource(‘/events’); eventSource.onmessage = (event) => {

const msg = JSON.parse(event.data); if (msg.topic === ‘notification’) {

const {edge, plugin, topic, data} = msg.data; console.log(${edge}/${plugin}: ${topic}, data);

}

};

Subscribing to notifications (Python client):

import sseclient import requests

response = requests.get(’http://bridge:8000/events’, stream=True) client = sseclient.SSEClient(response) for event in client.events():

msg = json.loads(event.data) if msg[‘topic’] == ‘notification’:

print(msg[‘data’])

4. Topology Updates

Plugins can receive notifications when edges connect or disconnect by overriding the on_topology_change method:

async def on_topology_change(self, edges: dict):

‘’’Called when edges connect/disconnect.

Args:
edges: Dict mapping edge names to their plugin info.

Example: {“edge1”: {“plugins”: [“sysinfo”, “psij”]}}

‘’’ for edge_name, info in edges.items():

print(f”Edge {edge_name} has plugins: {info.get(‘plugins’, [])}”)

session_class: Type[PluginSession] | None = None
client_class: Type | None = None
version: str = '0.0.1'
session_ttl: int = 3600
ui_config: Dict | UIConfig | None = None
ui_module: str | None = None
classmethod get_plugin_class(name: str) Type | None[source]

Look up a registered plugin class by name.

classmethod get_plugin_names() list[str][source]

Get a list of registered plugin names.

property is_bridge: bool

True when this plugin is hosted on the bridge (not on an edge).

property is_compute_node: bool

True when running inside a batch job allocation (compute node).

property is_login_node: bool

True when running on a login node (not inside a batch job).

property namespace: str

Get the namespace of the plugin.

property instance_name: str

Get the instance name of the plugin.

property uid: str

Get the unique ID of the plugin instance.

add_route_post(path: str, method: Callable)[source]

Add a POST route to the plugin’s namespace.

add_route_get(path: str, method: Callable)[source]

Add a GET route to the plugin’s namespace.

async register_session(request: Request) dict[source]

Register a new session and return its unique session ID.

async unregister_session(request: Request) dict[source]

Unregister a session by its session ID and close it.

async get_version(request: Request) dict[source]

Return the plugin version.

async get_ui_config(request: Request) dict[source]

Return UI configuration for portal rendering.

External plugins can define ui_config to describe their forms, monitors, and notification handlers, enabling seamless portal integration.

async list_sessions(request: Request) dict[source]

Return a list of active session IDs.

async health_check(request: Request) dict[source]

Health check endpoint for monitoring.

Returns plugin status including: - Plugin name and version - Uptime in seconds - Number of active sessions - Whether the plugin is healthy

classmethod is_enabled(app: FastAPI) bool[source]

Return False to skip loading this plugin on this host.

Checked before instantiation so no routes are registered when the plugin is not applicable. Override in subclasses to gate on host type (bridge vs edge) or runtime conditions (e.g. scheduler presence). Default: always load.

async send_notification(topic: str, data: dict)[source]

Broadcast a UI event over the bridge SSE channels. Depends on app.state.edge_service having been injected by EdgeService.

async on_topology_change(edges: dict)[source]

Called when the bridge topology changes (edge connect/disconnect).

Subclasses can override this to react to topology changes. Default implementation does nothing.

Args:

edges: Dict mapping edge names to their plugin info.

4.2. Plugins

4.2.1. PluginLucid

4.2.2. PluginXGFabric

XGFabric Plugin for Radical Edge.

Orchestrates CFDaAI workflows across multiple HPC clusters. Provides: - Configuration management (load/save workflow configs) - Workflow execution (start/stop/status) - Real-time progress notifications via SSE

The plugin runs on a local edge and communicates with remote edges (UCSB, Perlmutter) via the bridge.

class radical.edge.plugin_xgfabric.ResourceConfig(name: str = 'default', bridge_url: str = 'https://localhost:8000', bridge_cert: str | None = None, cluster_configs: ~typing.Dict[str, ~typing.Dict] = <factory>)[source]

Bases: object

Resource configuration — bridge connection and per-cluster scheduler settings.

name: str = 'default'
bridge_url: str = 'https://localhost:8000'
bridge_cert: str | None = None
cluster_configs: Dict[str, Dict]
radical.edge.plugin_xgfabric.dict_to_resource_config(d: Dict) ResourceConfig[source]

Convert dict to ResourceConfig, ignoring unknown fields.

class radical.edge.plugin_xgfabric.WorkflowConfig(name: str = 'default', description: str = '', local_workspace: str = '/tmp/xgfabric_workspace', cspot_woof_url: str = 'woof://128.111.45.61/davisstations/daviscupsout', cspot_limit: int = 10, num_simulations: int = 16, batch_size: int = 4, train_models: ~typing.List[str] = <factory>, simulation_task: ~typing.Dict | None = None, training_tasks: ~typing.Dict[str, ~typing.Dict] = <factory>, evaluation_task: ~typing.Dict | None = None, mock_sensor_data: bool = False)[source]

Bases: object

Workflow configuration — task templates and execution parameters.

name: str = 'default'
description: str = ''
local_workspace: str = '/tmp/xgfabric_workspace'
cspot_woof_url: str = 'woof://128.111.45.61/davisstations/daviscupsout'
cspot_limit: int = 10
num_simulations: int = 16
batch_size: int = 4
train_models: List[str]
simulation_task: Dict | None = None
training_tasks: Dict[str, Dict]
evaluation_task: Dict | None = None
mock_sensor_data: bool = False
radical.edge.plugin_xgfabric.config_to_dict(cfg: WorkflowConfig) Dict[source]

Convert config to JSON-serializable dict.

radical.edge.plugin_xgfabric.dict_to_config(d: Dict) WorkflowConfig[source]

Convert dict to WorkflowConfig, filtering unknown fields.

class radical.edge.plugin_xgfabric.ClusterStatus(name: str, edge_name: str, cluster_type: str, has_gpu: bool = False, online: bool = False, tasks_running: int = 0, pilot_job_id: str | None = None, pilot_status: str | None = None)[source]

Bases: object

Status of a single cluster.

name: str
edge_name: str
cluster_type: str
has_gpu: bool = False
online: bool = False
tasks_running: int = 0
pilot_job_id: str | None = None
pilot_status: str | None = None
class radical.edge.plugin_xgfabric.WorkflowState(status: str = 'idle', phase: str = '', progress: int = 0, message: str = '', start_time: str | None = None, end_time: str | None = None, error: str | None = None, active_cluster: str | None = None, completed_simulations: int = 0, total_simulations: int = 0, current_batch: int = 0, total_batches: int = 0, pilot_jobs: ~typing.Dict[str, str] = <factory>, immediate_clusters: ~typing.List[~typing.Dict] = <factory>, allocate_clusters: ~typing.List[~typing.Dict] = <factory>, log: ~typing.List[~typing.Dict] = <factory>, config_name: str | None = None, config_dir: str | None = None)[source]

Bases: object

Runtime state of a workflow execution.

status: str = 'idle'
phase: str = ''
progress: int = 0
message: str = ''
start_time: str | None = None
end_time: str | None = None
error: str | None = None
active_cluster: str | None = None
completed_simulations: int = 0
total_simulations: int = 0
current_batch: int = 0
total_batches: int = 0
pilot_jobs: Dict[str, str]
immediate_clusters: List[Dict]
allocate_clusters: List[Dict]
log: List[Dict]
config_name: str | None = None
config_dir: str | None = None
class radical.edge.plugin_xgfabric.XGFabricSession(sid: str, workdir: str | None = None, edge_name: str | None = None, bridge_url: str | None = None, bridge_cert: str | None = None)[source]

Bases: PluginSession

XGFabric session - manages workflow configuration and execution.

update_connected_edges(edges: Dict[str, Any])[source]

Update the cached list of connected edges.

async get_config_dir() Dict[source]

Get current config directory.

async set_config_dir(path: str) Dict[source]

Set config directory.

async list_configs() List[Dict][source]

List all saved configurations.

async load_config(name: str) Dict[source]

Load a workflow config by name, path, or builtin alias (‘default’, ‘test’).

async save_config(data: Dict) Dict[source]

Save a configuration.

async delete_config(name: str) Dict[source]

Delete a configuration.

async get_status() Dict[source]

Get current workflow status including cluster info.

async start_workflow(workflow: str = '__default__', resource: str = '__default__') Dict[source]

Start workflow execution.

async stop_workflow() Dict[source]

Stop running workflow.

async close() dict[source]

Close the session.

class radical.edge.plugin_xgfabric.XGFabricClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]

Bases: PluginClient

Client-side interface for the XGFabric plugin.

get_workdir() Dict[source]

Get current working directory.

set_workdir(path: str) Dict[source]

Set working directory.

list_configs() List[Dict][source]

List all saved configurations.

load_config(name: str) Dict[source]

Load a configuration by name.

save_config(config: Dict) Dict[source]

Save a configuration.

delete_config(name: str) Dict[source]

Delete a configuration.

get_default_config() Dict[source]

Get default configuration template.

get_test_config() Dict[source]

Get test configuration template (stub tasks, no CSPOT required).

get_status() Dict[source]

Get current workflow status.

start_workflow(workflow: str = '__default__', resource: str = '__default__') Dict[source]

Start workflow execution.

stop_workflow() Dict[source]

Stop running workflow.

class radical.edge.plugin_xgfabric.PluginXGFabric(app: FastAPI, workdir: str | None = None)[source]

Bases: Plugin

XGFabric plugin for Radical Edge.

Orchestrates CFDaAI workflows across multiple HPC clusters. Provides configuration management and workflow execution via REST API.

plugin_name = 'xgfabric'
session_class

alias of XGFabricSession

client_class

alias of XGFabricClient

version: str = '0.1.0'
ui_config: Dict | UIConfig | None = {'custom_template': True, 'description': 'CFDaAI workflow orchestrator for HPC clusters.', 'icon': '🌊', 'title': 'XGFabric Workflow'}
classmethod is_enabled(app: FastAPI) bool[source]

XGFabric loads on edge nodes (login or compute) — not on the bridge.

async on_topology_change(edges: dict)[source]

Handle topology updates from the bridge.

async get_workdir(request: Request) dict[source]
async set_workdir(request: Request) dict[source]
async list_configs(request: Request) dict[source]
async get_default_config(request: Request) dict[source]
async get_test_config(request: Request) dict[source]
async load_config(request: Request) dict[source]
async save_config(request: Request) dict[source]
async delete_config(request: Request) dict[source]
async get_status(request: Request) dict[source]
async start_workflow(request: Request) dict[source]
async stop_workflow(request: Request) dict[source]

4.2.3. PluginQueueInfo

class radical.edge.plugin_queue_info.QueueInfoSession(sid: str, backend: QueueInfo)[source]

Bases: PluginSession

QueueInfo session with shared backend.

All sessions share a single backend instance for cache efficiency.

async close() dict[source]

Close this session.

Note: Backend is shared and not cleaned up here.

Returns:

dict: An empty dictionary indicating successful closure.

async get_info(user=None, force=False)[source]

Return queue/partition info.

Args:
user (str): User to filter partitions for. When None (default),

defaults to the current user. Pass user=’*’ to return all partitions (admin view).

force (bool): Bypass cache if True.

Returns:

dict: Queue information from the backend.

async list_jobs(queue, user=None, force=False)[source]

List jobs in a queue.

Args:

queue (str): Partition name. user (str): User to filter jobs for. When None (default),

defaults to the current user. Pass user=’*’ to return all jobs (admin view).

force (bool): Bypass cache if True.

Returns:

dict: Job listing from the backend.

async list_all_jobs(user=None, force=False)[source]

List all jobs for the user across all partitions.

Args:
user (str): User to filter jobs for. When None (default),

defaults to the current user. Pass user=’*’ to return all jobs (admin view).

force (bool): Bypass cache if True.

Returns:

dict: Job listing from the backend.

async cancel_job(job_id: str) dict[source]

Cancel a job via the active batch system (scancel/qdel/…).

async list_allocations(user=None, force=False)[source]

List allocations/projects.

Args:

user (str): Optional user name to filter on. force (bool): Bypass cache if True.

Returns:

dict: Allocation listing from the backend.

class radical.edge.plugin_queue_info.QueueInfoClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]

Bases: PluginClient

Client-side interface for the QueueInfo plugin.

get_info(user: str | None = None, force: bool = False) dict[source]

Return queue/partition information.

Args:
user (str): User to filter partitions for. When None (default),

uses the edge service user. Pass user=’*’ to return all partitions (admin view).

force (bool): Bypass cache if True.

Returns:

dict: Queue information filtered by user access.

list_jobs(queue: str, user: str | None = None, force: bool = False) dict[source]

List jobs in a specified queue/partition.

Args:

queue (str): Partition name to list jobs for. user (str): User to filter jobs for. When None (default),

uses the edge service user. Pass user=’*’ to return all jobs (admin view).

force (bool): Bypass cache if True.

Returns:

dict: Job listing filtered by user.

list_all_jobs(user: str | None = None, force: bool = False) dict[source]

List all jobs for the user across all partitions.

Args:

user (str): User to filter jobs for. force (bool): Bypass cache if True.

Returns:

dict: Job listing.

cancel_job(job_id: str) dict[source]

Cancel a job by ID.

list_allocations(user: str | None = None, force: bool = False) dict[source]

List allocations/projects.

job_allocation() dict | None[source]

Return edge job allocation info, or None if not inside a batch job.

No session is required. The information reflects the environment of the edge process, not the client.

Returns:

None: Edge is running on a login node. dict: Allocation summary with keys job_id, partition,

n_nodes, nodelist, cpus_per_node, gpus_per_node, account, job_name, runtime.

Raises:
RuntimeError: Edge is inside an allocation but the scheduler did

not provide enough info to summarise it.

backend() str[source]

Return the active batch backend name on the edge.

Returns:

str: 'slurm', 'pbs', or 'none'.

class radical.edge.plugin_queue_info.PluginQueueInfo(app: FastAPI, instance_name='queue_info', backend_conf=None, slurm_conf=None)[source]

Bases: Plugin

QueueInfo plugin for Radical Edge.

This plugin exposes batch system queue information, job listings, and allocation data via REST endpoints. is_enabled() prevents loading on edges where no recognised batch system is installed.

Backend selection is automatic: the plugin uses make_queue_info() which dispatches to QueueInfoSlurm, QueueInfoPBSPro, or QueueInfoNone based on what’s available.

plugin_name = 'queue_info'
session_class

alias of QueueInfoSession

client_class

alias of QueueInfoClient

version: str = '0.0.1'
ui_config: Dict | UIConfig | None = {'description': 'Inspect batch partitions, jobs and allocations.', 'icon': '📋', 'monitors': [{'auto_load': 'get_info/{sid}', 'css_class': 'queueinfo-content', 'id': 'partitions', 'title': 'Partitions / Queues', 'type': 'table'}], 'refresh_button': True, 'title': 'Queue Info'}
classmethod is_enabled(app: FastAPI) bool[source]

Load on edges with a recognised batch system (SLURM or PBSPro).

get_job_allocation() dict | None[source]

Return edge job allocation info, or None if not inside a job.

Delegates to the active BatchSystem.

Returns:

None: Edge is running on a login node. dict: Allocation details (see BatchSystem.job_allocation).

Raises:
RuntimeError: Edge is inside an allocation but the scheduler did

not provide enough info to summarise it.

async backend_endpoint(request: Request) dict[source]

Session-less endpoint: report which batch backend is active.

Response:

{"backend": "slurm" | "pbs" | "none"}
async job_allocation_endpoint(request: Request) dict[source]

Session-less endpoint: returns current edge job allocation info.

Response:

{"allocation": null}                              # login node
{"allocation": {"n_nodes": 4, "runtime": 3600}}  # inside a job
{"allocation": {"n_nodes": 4, "runtime": null}}  # unlimited walltime
async get_info(request: Request) dict[source]

Return queue/partition information.

async list_jobs(request: Request) dict[source]

List jobs in a specified queue/partition.

async list_all_jobs(request: Request) dict[source]

List all jobs for the user across all partitions.

async list_allocations(request: Request) dict[source]

List allocations/projects.

async cancel_job(request: Request) dict[source]

Cancel a job by ID.

4.2.4. PluginSysInfo

class radical.edge.plugin_sysinfo.SysInfoProvider[source]

Bases: object

Helper class to gather system information using psutil and standard tools.

start_prefetch()[source]

Start a background thread to prefetch hardware detection.

This lazily fills the detection cache so later queries are faster.

get_metrics() Dict[str, Any][source]

Collect current system metrics.

class radical.edge.plugin_sysinfo.SysInfoSession(sid: str, provider: SysInfoProvider)[source]

Bases: PluginSession

SysInfo session (Service-side).

Provides methods to gather system metrics.

async get_metrics() dict[source]

Return current system metrics.

class radical.edge.plugin_sysinfo.SysInfoClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]

Bases: PluginClient

Client-side interface for the SysInfo plugin.

homedir() str[source]

Return the home directory of the edge-side process.

No session is required.

host_role() dict[source]

Return {'role', 'scheduler', 'job_id'} for the edge host.

role is one of 'bridge', 'login' or 'compute'. scheduler is 'slurm' | 'pbs' | 'lsf' | None and job_id is the allocation id (None outside an allocation).

No session is required.

get_metrics() dict[source]

Return current system metrics.

class radical.edge.plugin_sysinfo.PluginSysInfo(app: FastAPI)[source]

Bases: Plugin

SysInfo plugin for Radical Edge.

Provides system hardware configuration and resource utilization metrics.

plugin_name = 'sysinfo'
session_class

alias of SysInfoSession

client_class

alias of SysInfoClient

version: str = '0.0.1'
ui_config: Dict | UIConfig | None = {'description': 'Live CPU, memory, disk, network and GPU metrics.', 'icon': '🖥️', 'monitors': [{'auto_load': 'metrics/{sid}', 'css_class': 'sysinfo-content', 'id': 'metrics', 'title': 'System Metrics', 'type': 'metrics'}], 'refresh_button': True, 'title': 'System Info'}
async homedir_endpoint(request: Request) dict[source]

Return the home directory of the edge-side process.

async host_role_endpoint(request: Request) dict[source]

Return the role of the host this edge runs on.

Role is one of bridge / login / compute. When the edge is running inside a batch allocation, scheduler and job_id carry the detected scheduler name and the allocation id; otherwise both are None (login nodes with a scheduler installed but no active job report scheduler=None).

async get_metrics_endpoint(request: Request) dict[source]

Return current system metrics for the specified session.

4.2.5. PluginPSIJ

PsiJ plugin for RADICAL Edge — HPC job submission.

4.2.5.1. Three-class pattern

PSIJSession Edge-side session: holds one PsiJ Executor per submit call,

manages job state via callbacks and background polling, streams stdout/stderr incrementally.

PSIJClient Application-side thin HTTP wrapper: delegates to the edge service

over the bridge (submit_job, get_job_status, list_jobs, cancel_job, submit_tunneled, tunnel_status).

PluginPSIJ Registers the plugin with the edge, adds URL routes, and wires

requests to the correct PSIJSession via _forward().

class radical.edge.plugin_psij.PSIJSession(sid: str, **kwargs: Any)[source]

Bases: PluginSession

Session-specific PSIJ state.

poll_interval = 5.0
async submit_job(job_spec_dict: Dict[str, Any], executor_name: str = 'local') Dict[str, Any][source]

Submit a job via PSIJ.

async get_job_status(job_id: str, stdout_offset: int = 0, stderr_offset: int = 0) Dict[str, Any][source]

Get job status with metadata and optional stdout/stderr offset.

async list_jobs() Dict[str, Any][source]

List all jobs in this session with current state and metadata.

async cancel_job(job_id: str) Dict[str, Any][source]

Cancel a job.

async close() dict[source]

Close the session and stop polling.

class radical.edge.plugin_psij.PSIJClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]

Bases: PluginClient

Client-side interface for the PSIJ plugin.

submit_job(job_spec: Dict[str, Any], executor: str = 'local') Dict[str, Any][source]

Submit a job.

Args:

job_spec (dict): The job specification. executor (str): The executor to use.

Returns:

dict: Job submission result (job_id, native_id).

get_job_status(job_id: str, stdout_offset: int = 0, stderr_offset: int = 0) Dict[str, Any][source]

Get the status of a job.

Args:

job_id: The job ID to query. stdout_offset: Byte offset for stdout (0 = full). stderr_offset: Byte offset for stderr (0 = full).

Returns:

Job status info including metadata and stdout/stderr.

list_jobs() Dict[str, Any][source]

List all jobs in this session.

Returns:

dict with ‘jobs’ list.

cancel_job(job_id: str) Dict[str, Any][source]

Cancel a job.

Args:

job_id: The job ID to cancel.

Returns:

Cancellation result.

submit_tunneled(job_spec: Dict[str, Any], executor: str = 'local', tunnel: bool = False) Dict[str, Any][source]

Submit a job that launches a child Edge service on a compute node.

The job_spec.arguments list must contain -n <edge_name> or --name <edge_name> so the child edge can register under the correct name.

When tunnel is True the server automatically appends --tunnel to the job arguments. The plugin-side watcher then opens a reverse SSH tunnel (login node → compute node) once the job is running and writes the port to ~/.radical/edge/tunnels/{edge_name}.port. The child edge service reads that file at startup and rewrites its bridge URL to connect through the tunnel.

Args:
job_spec: PsiJ job specification dict. arguments must include

-n <edge_name>.

executor: PsiJ executor name (default: "local"). tunnel: Whether to set up a reverse SSH tunnel (default: False).

Returns:

dict with job_id, native_id, and edge_name.

Raises:

RuntimeError: If the server returns an error response.

tunnel_status(edge_name: str) Dict[str, Any][source]

Return the current tunnel status for a named edge.

This endpoint is session-less (no session required).

Args:

edge_name: The logical name of the child edge service.

Returns:

dict with fields:

  • edge_name — echoed back.

  • status — one of "pending", "active", "failed", "done", or "no_tunnel".

  • port — assigned tunnel port (int) once active, else null.

  • pid — SSH process PID, once spawned, else null.

class radical.edge.plugin_psij.PluginPSIJ(app: FastAPI, instance_name: str = 'psij')[source]

Bases: Plugin

PSIJ plugin for Radical Edge.

This plugin provides an interface to submit and manage jobs via the psij-python library.

plugin_name = 'psij'
session_class

alias of PSIJSession

client_class

alias of PSIJClient

version: str = '0.0.1'
ui_config: Dict | UIConfig | None = {'description': 'Submit and monitor HPC batch jobs via PsiJ.', 'forms': [{'fields': [{'column': 0, 'css_class': 'p-exec', 'default': 'radical-edge-wrapper.sh', 'label': 'Executable', 'name': 'exec', 'type': 'text'}, {'column': 0, 'css_class': 'p-args', 'label': 'Arguments (space-separated)', 'name': 'args', 'placeholder': 'auto-filled with --url and --name', 'type': 'text'}, {'column': 0, 'css_class': 'p-executor', 'label': 'Executor', 'name': 'executor', 'options': ['local', 'slurm', 'pbs', 'lsf'], 'type': 'select'}, {'column': 1, 'css_class': 'p-queue', 'label': 'Queue / Partition', 'name': 'queue', 'placeholder': 'optional', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-account', 'label': 'Account / Project', 'name': 'account', 'placeholder': 'optional', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-duration', 'label': 'Duration (seconds)', 'name': 'duration', 'placeholder': 'e.g. 600', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-node-count', 'label': 'Number of Nodes', 'name': 'node_count', 'placeholder': 'e.g. 1', 'required': False, 'type': 'number'}, {'column': 1, 'css_class': 'p-custom-attr', 'label': '🔧 Custom Attributes', 'name': 'custom', 'required': False, 'type': 'custom_attributes'}], 'id': 'submit', 'layout': 'grid2', 'submit': {'label': '🚀 Submit Job', 'style': 'success'}, 'title': '📝 Submit Job'}], 'icon': '🚀', 'monitors': [{'css_class': 'psij-output', 'empty_text': 'No jobs submitted yet.', 'id': 'jobs', 'title': '📊 Job Monitor', 'type': 'task_list'}], 'notifications': {'id_field': 'job_id', 'state_field': 'state', 'topic': 'job_status'}, 'title': 'PsiJ Jobs'}
classmethod is_enabled(app: FastAPI) bool[source]

PsiJ loads on edge nodes (login or compute) — not on the bridge.

async submit_job(request: Request) dict[source]
async get_job_status(request: Request) dict[source]
async list_jobs(request: Request) dict[source]
async cancel_job(request: Request) dict[source]
async submit_tunneled(request: Request) dict[source]

Submit a job that starts a new Edge service on a compute node.

The job must pass -n/--name <edge_name> in its arguments so the child edge service can register under the correct name.

When tunnel=true the plugin automatically appends --tunnel to the job’s argument list. The child edge service reads this flag at startup and waits for a relay port file at the hardcoded path ~/.radical/edge/tunnels/{edge_name}.port before connecting to the bridge. The watcher on the parent edge writes that file once the reverse SSH tunnel is established.

When tunnel=true the plugin:

  1. Cleans up any stale relay file from a previous run.

  2. Injects --tunnel into the job arguments.

  3. Spawns an async watcher that waits for the SLURM job to reach RUNNING, then opens a reverse SSH tunnel (login → compute) and writes the allocated port to the relay file.

Request body JSON fields:

  • job_spec (dict) — PsiJ job specification.

  • executor (str) — PsiJ executor name (default: "local").

  • tunnel (bool) — Whether to set up a reverse SSH tunnel

    (default: false).

Returns:

JSON with job_id, native_id, and edge_name.

Raises:

422 if -n/--name is missing from job_spec.arguments. 409 if a tunnel watcher for the same edge name is already active.

async tunnel_status(request: Request) dict[source]

Return the current tunnel status for a named edge.

Path param: edge_name

Returns a JSON object with fields:

  • edge_name — echoed back.

  • status — one of "pending", "active", "failed",

    "done", or "no_tunnel".

  • port — allocated tunnel port (int) once the child edge

    has published it, else null.

  • pid — SSH process PID on the compute node (read from

    the pid rendezvous file) once active, else null.

4.2.6. PluginRhapsody

Rhapsody Plugin for Radical Edge.

Exposes the RHAPSODY Session/Task API so that remote clients can submit and monitor compute / AI tasks on edge nodes.

class radical.edge.plugin_rhapsody.RhapsodySession(sid: str, backend_names: list[str] | None = None, allow_pickled_tasks: bool = True, notify_batch_window: float = 0.25, notify_batch_size: int = 1024)[source]

Bases: PluginSession

Rhapsody session (service-side).

Wraps a rhapsody.Session instance, forwarding task submission, monitoring, cancellation and statistics queries.

property prof: Profiler
async initialize() None[source]

Asynchronously initialize the session and its backends.

async submit_tasks(task_dicts: list[dict], pre_expanded: bool = False) list[dict][source]

Submit a list of tasks.

Each dict is converted to a ComputeTask or AITask via BaseTask.from_dict(). Function fields encoded as cloudpickle blobs or import-path strings are deserialized first.

Uses a pipeline: deserialization of chunk N+1 runs concurrently with backend submission of chunk N, so the two dominant costs overlap.

Returns:

list[dict]: Minimal ack dicts {uid, state}.

async wait_tasks(uids: list[str], timeout: float | None = None) list[dict][source]

Return current task states (non-blocking snapshot).

This method no longer blocks until tasks complete. Clients should rely on SSE task_status notifications for real-time completion events, and call this endpoint only to fetch the current state snapshot.

Args:

uids (list[str]): Task UIDs to query. timeout (float | None): Ignored (kept for API compat).

Returns:

list[dict]: Current task state dicts.

async list_tasks() dict[source]

Return all tasks in this session with current state.

async get_task(uid: str) dict[source]

Return info for a single cached task.

async cancel_task(uid: str) dict[source]

Cancel a running task.

async cancel_all_tasks() dict[source]

Cancel all non-terminal tasks in this session.

Best-effort: Dragon V3 marks tasks as CANCELED but cannot truly abort running work. Per-task errors are swallowed. Cancels are issued concurrently via asyncio.gather.

async close() dict[source]

Shutdown RHAPSODY session and clean up.

class radical.edge.plugin_rhapsody.RhapsodyClient(*args, **kwargs)[source]

Bases: PluginClient

Client-side interface for the Rhapsody plugin.

register_session(backends: list[str] | None = None, init_timeout: float = 120, notify_batch_window: float | None = None, notify_batch_size: int | None = None)[source]

Register a session, optionally specifying backend names.

The edge initializes the session asynchronously. This method blocks until a session_status SSE notification confirms that the session is ready (or until init_timeout seconds).

Falls back to polling when no BridgeClient is available.

Args:
backends: List of backend names (e.g. ['dragon_v3']).

Defaults to ['dragon_v3'] on the server side.

init_timeout: Seconds to wait for session init (default 120). notify_batch_window: Seconds to accumulate notifications

before flushing (edge-side).

notify_batch_size: Max notifications per flush (edge-side).

submit_tasks(task_dicts: list[dict]) list[dict][source]

Submit tasks to the edge.

Large batches are automatically split so each payload stays within the WebSocket frame limit. Batches are submitted concurrently via a thread pool so that network round-trips overlap (pipelining).

UIDs are assigned client-side (if absent) so the caller can start waiting for SSE notifications immediately.

Args:

task_dicts: List of task specification dicts.

Returns:

list[dict]: Submitted task info (uid, state).

wait_tasks(uids: list[str], timeout: float | None = None) list[dict][source]

Wait for tasks to reach terminal state via SSE notifications.

Purely client-side: the persistent _on_task_done callback (registered at session init) accumulates completions into self._completed. This method checks the accumulator and blocks only until every requested UID appears there.

Falls back to periodic polling when no BridgeClient is available (e.g. direct construction in tests).

Args:

uids: Task UIDs to wait for. timeout: Seconds to wait (None = forever).

Returns:

list[dict]: Completed task dicts.

list_tasks() dict[source]

List all tasks in this session.

get_task(uid: str) dict[source]

Retrieve info for a single task.

cancel_task(uid: str) dict[source]

Cancel a task.

cancel_all_tasks() dict[source]

Cancel all non-terminal tasks in this session.

class radical.edge.plugin_rhapsody.PluginRhapsody(app: FastAPI, instance_name: str = 'rhapsody')[source]

Bases: Plugin

Rhapsody plugin for Radical Edge.

Exposes the RHAPSODY Session / Task API via REST endpoints:

  • POST /rhapsody/register_session – create session

  • POST /rhapsody/submit/{sid} – submit tasks

  • POST /rhapsody/wait/{sid} – query task states

  • GET /rhapsody/list_tasks/{sid} – list all tasks

  • GET /rhapsody/task/{sid}/{uid} – get single task

  • POST /rhapsody/cancel/{sid}/{uid} – cancel single task

  • POST /rhapsody/cancel_all/{sid} – cancel all tasks

Notification topics: session_status, task_status, task_status_batch.

plugin_name = 'rhapsody'
session_class

alias of RhapsodySession

client_class

alias of RhapsodyClient

version: str = '0.0.1'
ui_config: Dict | UIConfig | None = {'description': 'Submit compute tasks, wait for results, view stdout/stderr.', 'forms': [{'fields': [{'css_class': 'rh-exec', 'default': '/bin/echo', 'label': 'Executable', 'name': 'exec', 'type': 'text'}, {'css_class': 'rh-args', 'default': 'hello from rhapsody', 'label': 'Arguments (space-separated)', 'name': 'args', 'type': 'text'}, {'css_class': 'rh-backends', 'label': 'Backend', 'name': 'backends', 'options': ['dragon_v3', 'concurrent'], 'type': 'select'}, {'css_class': 'rh-timeout', 'default': '', 'label': 'Timeout (s)', 'name': 'timeout', 'type': 'number'}, {'css_class': 'rh-ranks', 'default': '', 'label': 'MPI Ranks', 'name': 'ranks', 'type': 'number'}, {'css_class': 'rh-type', 'label': 'Task Type', 'name': 'type', 'options': ['', 'mpi'], 'type': 'select'}, {'css_class': 'rh-cwd', 'default': '', 'label': 'Working Dir', 'name': 'cwd', 'type': 'text'}], 'id': 'submit', 'layout': 'single', 'submit': {'label': '▶ Submit Task', 'style': 'success'}, 'title': '📝 Submit Task'}], 'icon': '🎼', 'monitors': [{'css_class': 'rh-output', 'empty_text': 'No tasks submitted yet.', 'id': 'tasks', 'title': '📊 Task Monitor', 'type': 'task_list'}], 'notifications': {'id_field': 'uid', 'state_field': 'state', 'topic': 'task_status'}, 'title': 'Rhapsody Tasks'}
classmethod is_enabled(app: FastAPI) bool[source]

Rhapsody loads on compute nodes only (task execution).

async register_session(request: Request) dict[source]

Register a new Rhapsody session.

Accepts an optional JSON body with {"backends": ["name", ...]}.

Session initialization happens asynchronously in the background. The SID is returned immediately. The client should wait for a session_status SSE notification (status: "ready") before submitting tasks, or handle HTTP 409 on early requests.

async submit_tasks(request: Request) dict[source]
async wait_tasks(request: Request) dict[source]
async list_tasks(request: Request) dict[source]
async get_task(request: Request) dict[source]
async cancel_task(request: Request) dict[source]
async cancel_all_tasks(request: Request) dict[source]

4.3. Queue Info Backend

4.3.1. QueueInfo

QueueInfo abstract base + shared helpers + factory.

Backend implementations live in queue_info_slurm.py and queue_info_pbs.py.

class radical.edge.queue_info.QueueInfo[source]

Bases: ABC

Abstract base class for batch system queue information backends.

Subclasses implement _collect_info, _collect_jobs, _collect_allocations to gather data from a specific batch system. Results are cached with a configurable TTL.

backend_name = 'none'
start_prefetch()[source]

Start background threads to prefetch queue info and allocations in parallel so both caches are warm as quickly as possible.

get_info(user=None, force=False)[source]

Return queue/partition info. force=True bypasses cache.

Args:
user (str): User to filter partitions for. When None (default),

defaults to the current user. Pass user=’*’ to return all partitions (admin view).

force (bool): Bypass cache if True.

Returns:

dict: {“queues”: {<partition_name>: {…}, …}}

list_jobs(queue, user=None, force=False)[source]

List jobs in a queue.

Args:

queue (str): Partition name to list jobs for. user (str): User to filter jobs for. When None (default),

defaults to the current user. Pass user=’*’ to return all jobs.

force (bool): Bypass cache if True.

Returns:

dict: {“jobs”: [<job_dict>, …]}

list_all_jobs(user=None, force=False)[source]

List all jobs for a user across all partitions.

Args:
user (str): User to filter jobs for. When None (default),

defaults to the current user. Pass user=’*’ to return all jobs.

force (bool): Bypass cache if True.

Returns:

dict: {“jobs”: [<job_dict>, …]}

list_allocations(user=None, force=False)[source]

List allocations/projects. If user is set, filter to that user. When user=None, defaults to the current user. To return all rows, pass user=’*’.

radical.edge.queue_info.make_queue_info(batch=None, conf_path=None) QueueInfo[source]

Factory: return a QueueInfo subclass matching the active scheduler.

Args:
batch: Optional pre-detected BatchSystem instance. If None, calls

batch_system.detect_batch_system().

conf_path: Optional path to the scheduler’s configuration file

(forwarded to the backend; only SLURM uses it today).

Returns:
QueueInfo: a QueueInfoSlurm, QueueInfoPBSPro, or QueueInfoNone

instance depending on what the local system supports.