4. Plugin API Reference¶
This section documents all available plugins in the Radical Edge system.
4.1. Base Classes¶
4.1.1. PluginClient¶
4.1.2. ClientManagedPlugin¶
4.1.3. Plugin¶
- class radical.edge.plugin_base.Plugin(app: FastAPI, instance_name: str)[source]¶
Bases:
objectBase class for Edge plugins.
Each plugin gets its own URL namespace and built-in session management. Routes are added with add_route_post / add_route_get.
plugin_name vs instance_name
plugin_nameis a class-level attribute that uniquely identifies the plugin type (e.g."psij","queue_info"). It is the key used in the globalPlugin._registryand in client-side lookups (edge.get_plugin("psij")).instance_nameis set at construction time (defaults toplugin_namewhen only one instance is needed) and drives the URL namespace:/{instance_name}/…. Multiple instances of the same plugin type on the same edge must be given distinct instance names.Subclasses that define a plugin_name class attribute will be automatically registered in the global plugin registry.
- Subclasses must define:
session_class: The session class to instantiate (must inherit from PluginSession)
- Subclasses may define:
client_class: The local helper class for the application-side client. version: The version string for the plugin. session_ttl: Session timeout in seconds (default: 3600 = 1 hour, 0 = no timeout) ui_config: UI configuration dict for portal rendering (see ui_schema.py)
4. Notifications¶
Plugins can send real-time notifications to clients via Server-Sent Events (SSE). The notification flow is: Session -> Plugin -> EdgeService -> Bridge -> SSE clients.
Sending notifications from a session:
# In your PluginSession subclass method: if self._plugin:
self._plugin._dispatch_notify(“my_topic”, {“key”: “value”, “status”: “running”})
The _plugin reference is automatically injected into sessions by the plugin. _dispatch_notify works from both sync and async contexts, including background threads.
Sending notifications from a plugin:
# In your Plugin subclass method: await self.send_notification(“my_topic”, {“key”: “value”})
Subscribing to notifications (browser/JavaScript):
const eventSource = new EventSource(‘/events’); eventSource.onmessage = (event) => {
const msg = JSON.parse(event.data); if (msg.topic === ‘notification’) {
const {edge, plugin, topic, data} = msg.data; console.log(${edge}/${plugin}: ${topic}, data);
}
};
Subscribing to notifications (Python client):
import sseclient import requests
response = requests.get(’http://bridge:8000/events’, stream=True) client = sseclient.SSEClient(response) for event in client.events():
msg = json.loads(event.data) if msg[‘topic’] == ‘notification’:
print(msg[‘data’])
4. Topology Updates¶
Plugins can receive notifications when edges connect or disconnect by overriding the on_topology_change method:
- async def on_topology_change(self, edges: dict):
‘’’Called when edges connect/disconnect.
- Args:
- edges: Dict mapping edge names to their plugin info.
Example: {“edge1”: {“plugins”: [“sysinfo”, “psij”]}}
‘’’ for edge_name, info in edges.items():
print(f”Edge {edge_name} has plugins: {info.get(‘plugins’, [])}”)
- classmethod get_plugin_class(name: str) Type | None[source]¶
Look up a registered plugin class by name.
- async register_session(request: Request) JSONResponse[source]¶
Register a new session and return its unique session ID.
- async unregister_session(request: Request) JSONResponse[source]¶
Unregister a session by its session ID and close it.
- async get_ui_config(request: Request) JSONResponse[source]¶
Return UI configuration for portal rendering.
External plugins can define ui_config to describe their forms, monitors, and notification handlers, enabling seamless portal integration.
- async health_check(request: Request) JSONResponse[source]¶
Health check endpoint for monitoring.
Returns plugin status including: - Plugin name and version - Uptime in seconds - Number of active sessions - Whether the plugin is healthy
- is_enabled() bool[source]¶
Return True if this plugin should be loaded and registered on this edge.
Override in subclasses to gate loading on runtime conditions (e.g. presence of an external binary). Plugins that return False are never instantiated by the edge service and therefore never appear in the Explorer or in /edge/list responses.
4.2. Plugins¶
4.2.1. PluginLucid¶
4.2.2. PluginXGFabric¶
XGFabric Plugin for Radical Edge.
Orchestrates CFDaAI workflows across multiple HPC clusters. Provides: - Configuration management (load/save workflow configs) - Workflow execution (start/stop/status) - Real-time progress notifications via SSE
The plugin runs on a local edge and communicates with remote edges (UCSB, Perlmutter) via the bridge.
- class radical.edge.plugin_xgfabric.ResourceConfig(name: str = 'default', bridge_url: str = 'https://localhost:8000', bridge_cert: str | None = None, cluster_configs: ~typing.Dict[str, ~typing.Dict] = <factory>)[source]¶
Bases:
objectResource configuration — bridge connection and per-cluster scheduler settings.
- radical.edge.plugin_xgfabric.dict_to_resource_config(d: Dict) ResourceConfig[source]¶
Convert dict to ResourceConfig, ignoring unknown fields.
- class radical.edge.plugin_xgfabric.WorkflowConfig(name: str = 'default', description: str = '', local_workspace: str = '/tmp/xgfabric_workspace', cspot_woof_url: str = 'woof://128.111.45.61/davisstations/daviscupsout', cspot_limit: int = 10, num_simulations: int = 16, batch_size: int = 4, train_models: ~typing.List[str] = <factory>, simulation_task: ~typing.Dict | None = None, training_tasks: ~typing.Dict[str, ~typing.Dict] = <factory>, evaluation_task: ~typing.Dict | None = None, mock_sensor_data: bool = False)[source]¶
Bases:
objectWorkflow configuration — task templates and execution parameters.
- radical.edge.plugin_xgfabric.config_to_dict(cfg: WorkflowConfig) Dict[source]¶
Convert config to JSON-serializable dict.
- radical.edge.plugin_xgfabric.dict_to_config(d: Dict) WorkflowConfig[source]¶
Convert dict to WorkflowConfig, filtering unknown fields.
- class radical.edge.plugin_xgfabric.ClusterStatus(name: str, edge_name: str, cluster_type: str, has_gpu: bool = False, online: bool = False, tasks_running: int = 0, pilot_job_id: str | None = None, pilot_status: str | None = None)[source]¶
Bases:
objectStatus of a single cluster.
- class radical.edge.plugin_xgfabric.WorkflowState(status: str = 'idle', phase: str = '', progress: int = 0, message: str = '', start_time: str | None = None, end_time: str | None = None, error: str | None = None, active_cluster: str | None = None, completed_simulations: int = 0, total_simulations: int = 0, current_batch: int = 0, total_batches: int = 0, pilot_jobs: ~typing.Dict[str, str] = <factory>, immediate_clusters: ~typing.List[~typing.Dict] = <factory>, allocate_clusters: ~typing.List[~typing.Dict] = <factory>, log: ~typing.List[~typing.Dict] = <factory>, config_name: str | None = None, config_dir: str | None = None)[source]¶
Bases:
objectRuntime state of a workflow execution.
- class radical.edge.plugin_xgfabric.XGFabricSession(sid: str, workdir: str | None = None, edge_name: str | None = None, bridge_url: str | None = None, bridge_cert: str | None = None)[source]¶
Bases:
PluginSessionXGFabric session - manages workflow configuration and execution.
- async load_config(name: str) Dict[source]¶
Load a workflow config by name, path, or builtin alias (‘default’, ‘test’).
- class radical.edge.plugin_xgfabric.XGFabricClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the XGFabric plugin.
- class radical.edge.plugin_xgfabric.PluginXGFabric(app: FastAPI, workdir: str | None = None)[source]¶
Bases:
PluginXGFabric plugin for Radical Edge.
Orchestrates CFDaAI workflows across multiple HPC clusters. Provides configuration management and workflow execution via REST API.
- plugin_name = 'xgfabric'¶
- session_class¶
alias of
XGFabricSession
- client_class¶
alias of
XGFabricClient
4.2.3. PluginQueueInfo¶
- class radical.edge.plugin_queue_info.QueueInfoSession(sid: str, backend: QueueInfoSlurm)[source]¶
Bases:
PluginSessionQueueInfo session with shared backend.
All sessions share a single backend instance for cache efficiency.
- async close() dict[source]¶
Close this session.
Note: Backend is shared and not cleaned up here.
- Returns:
dict: An empty dictionary indicating successful closure.
- async get_info(user=None, force=False)[source]¶
Return queue/partition info.
- Args:
- user (str): User to filter partitions for. When None (default),
defaults to the current user. Pass user=’*’ to return all partitions (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Queue information from the backend.
- async list_jobs(queue, user=None, force=False)[source]¶
List jobs in a queue.
- Args:
queue (str): Partition name. user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user=’*’ to return all jobs (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Job listing from the backend.
- async list_all_jobs(user=None, force=False)[source]¶
List all jobs for the user across all partitions.
- Args:
- user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user=’*’ to return all jobs (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Job listing from the backend.
- class radical.edge.plugin_queue_info.QueueInfoClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the QueueInfo plugin.
- get_info(user: str | None = None, force: bool = False) dict[source]¶
Return queue/partition information.
- Args:
- user (str): User to filter partitions for. When None (default),
uses the edge service user. Pass user=’*’ to return all partitions (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Queue information filtered by user access.
- list_jobs(queue: str, user: str | None = None, force: bool = False) dict[source]¶
List jobs in a specified queue/partition.
- Args:
queue (str): Partition name to list jobs for. user (str): User to filter jobs for. When None (default),
uses the edge service user. Pass user=’*’ to return all jobs (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: Job listing filtered by user.
- list_all_jobs(user: str | None = None, force: bool = False) dict[source]¶
List all jobs for the user across all partitions.
- Args:
user (str): User to filter jobs for. force (bool): Bypass cache if True.
- Returns:
dict: Job listing.
- list_allocations(user: str | None = None, force: bool = False) dict[source]¶
List allocations/projects.
- is_enabled() bool[source]¶
Return whether SLURM is available on the edge.
No session is required. Calls the edge-side
is_enabledendpoint which checks for the presence and functionality ofsinfo --json.
- job_allocation() dict | None[source]¶
Return edge job allocation info, or None if not inside a SLURM job.
No session is required. The information reflects the environment of the edge process, not the client.
- Returns:
None: Edge is running on a login node (no
SLURM_JOB_ID). dict:{"n_nodes": int, "runtime": int | None}— number ofallocated nodes and walltime limit in seconds (
Nonefor UNLIMITED).- Raises:
- RuntimeError: Edge has
SLURM_JOB_IDset but cannot determine allocation details.
- RuntimeError: Edge has
- class radical.edge.plugin_queue_info.PluginQueueInfo(app: FastAPI, instance_name='queue_info', slurm_conf=None)[source]¶
Bases:
PluginQueueInfo plugin for Radical Edge.
This plugin exposes batch system queue information, job listings, and allocation data via REST endpoints. It overrides
is_enabled()to return False on edges where SLURM (sinfo) is not present; the edge service will not load or register it on such edges.- Session-less endpoints (no sid required):
- GET /queue_info/is_enabled – returns {“available”: bool} indicating
whether SLURM (sinfo) is present on this edge. Used by other plugins (e.g. xgfabric) to classify edges as batch-capable without creating a full session.
- plugin_name = 'queue_info'¶
- session_class¶
alias of
QueueInfoSession
- client_class¶
alias of
QueueInfoClient
- ui_config: Dict | UIConfig | None = {'description': 'Inspect Slurm partitions, jobs and allocations.', 'icon': '📋', 'monitors': [{'auto_load': 'get_info/{sid}', 'css_class': 'queueinfo-content', 'id': 'partitions', 'title': 'Partitions / Queues', 'type': 'table'}], 'refresh_button': True, 'title': 'Queue Info'}¶
- async is_enabled_endpoint(request: Request) JSONResponse[source]¶
Session-less endpoint: returns {“available”: bool} for remote callers.
- get_job_allocation() dict | None[source]¶
Return edge job allocation info, or None if not inside a SLURM job.
Checks SLURM environment variables to determine whether the edge process is running inside a batch job allocation. If it is, queries
squeuefor the walltime limit.- Returns:
None: If the edge is running on a login node (no
SLURM_JOB_ID). dict:{"n_nodes": int, "runtime": int | None}wheren_nodesis the number of allocated nodes and
runtimeis the walltime limit in seconds (Nonefor UNLIMITED).- Raises:
- RuntimeError: If
SLURM_JOB_IDis set but node count or runtime cannot be determined (missing env var, squeue failure, timeout).
- RuntimeError: If
- async job_allocation_endpoint(request: Request) JSONResponse[source]¶
Session-less endpoint: returns current edge job allocation info.
Response:
{"allocation": null} # login node {"allocation": {"n_nodes": 4, "runtime": 3600}} # inside a job {"allocation": {"n_nodes": 4, "runtime": null}} # unlimited walltime
4.2.4. PluginSysInfo¶
- class radical.edge.plugin_sysinfo.SysInfoProvider[source]¶
Bases:
objectHelper class to gather system information using psutil and standard tools.
- class radical.edge.plugin_sysinfo.SysInfoSession(sid: str, provider: SysInfoProvider)[source]¶
Bases:
PluginSessionSysInfo session (Service-side).
Provides methods to gather system metrics.
- class radical.edge.plugin_sysinfo.SysInfoClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the SysInfo plugin.
- class radical.edge.plugin_sysinfo.PluginSysInfo(app: FastAPI)[source]¶
Bases:
PluginSysInfo plugin for Radical Edge.
Provides system hardware configuration and resource utilization metrics.
- plugin_name = 'sysinfo'¶
- session_class¶
alias of
SysInfoSession
- client_class¶
alias of
SysInfoClient
- ui_config: Dict | UIConfig | None = {'description': 'Live CPU, memory, disk, network and GPU metrics.', 'icon': '🖥️', 'monitors': [{'auto_load': 'metrics/{sid}', 'css_class': 'sysinfo-content', 'id': 'metrics', 'title': 'System Metrics', 'type': 'metrics'}], 'refresh_button': True, 'title': 'System Info'}¶
4.2.5. PluginPSIJ¶
PsiJ plugin for RADICAL Edge — HPC job submission.
4.2.5.1. Three-class pattern¶
- PSIJSession Edge-side session: holds one PsiJ
Executorper submit call, manages job state via callbacks and background polling, streams stdout/stderr incrementally.
- PSIJClient Application-side thin HTTP wrapper: delegates to the edge service
over the bridge (
submit_job,get_job_status,list_jobs,cancel_job,submit_tunneled,tunnel_status).- PluginPSIJ Registers the plugin with the edge, adds URL routes, and wires
requests to the correct PSIJSession via
_forward().
- class radical.edge.plugin_psij.PSIJSession(sid: str, **kwargs: Any)[source]¶
Bases:
PluginSessionSession-specific PSIJ state.
- poll_interval = 5.0¶
- async submit_job(job_spec_dict: Dict[str, Any], executor_name: str = 'local') Dict[str, Any][source]¶
Submit a job via PSIJ.
- async get_job_status(job_id: str, stdout_offset: int = 0, stderr_offset: int = 0) Dict[str, Any][source]¶
Get job status with metadata and optional stdout/stderr offset.
- class radical.edge.plugin_psij.PSIJClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the PSIJ plugin.
- submit_job(job_spec: Dict[str, Any], executor: str = 'local') Dict[str, Any][source]¶
Submit a job.
- Args:
job_spec (dict): The job specification. executor (str): The executor to use.
- Returns:
dict: Job submission result (job_id, native_id).
- get_job_status(job_id: str, stdout_offset: int = 0, stderr_offset: int = 0) Dict[str, Any][source]¶
Get the status of a job.
- Args:
job_id: The job ID to query. stdout_offset: Byte offset for stdout (0 = full). stderr_offset: Byte offset for stderr (0 = full).
- Returns:
Job status info including metadata and stdout/stderr.
- cancel_job(job_id: str) Dict[str, Any][source]¶
Cancel a job.
- Args:
job_id: The job ID to cancel.
- Returns:
Cancellation result.
- submit_tunneled(job_spec: Dict[str, Any], executor: str = 'local', tunnel: bool = False) Dict[str, Any][source]¶
Submit a job that launches a child Edge service on a compute node.
The
job_spec.argumentslist must contain-n <edge_name>or--name <edge_name>so the child edge can register under the correct name.When tunnel is
Truethe server automatically appends--tunnelto the job arguments. The plugin-side watcher then opens a reverse SSH tunnel (login node → compute node) once the job is running and writes the port to~/.radical/edge/tunnels/{edge_name}.port. The child edge service reads that file at startup and rewrites its bridge URL to connect through the tunnel.- Args:
- job_spec: PsiJ job specification dict.
argumentsmust include -n <edge_name>.
executor: PsiJ executor name (default:
"local"). tunnel: Whether to set up a reverse SSH tunnel (default: False).- job_spec: PsiJ job specification dict.
- Returns:
dict with
job_id,native_id, andedge_name.- Raises:
RuntimeError: If the server returns an error response.
- tunnel_status(edge_name: str) Dict[str, Any][source]¶
Return the current tunnel status for a named edge.
This endpoint is session-less (no session required).
- Args:
edge_name: The logical name of the child edge service.
- Returns:
dict with fields:
edge_name— echoed back.status— one of"pending","active","failed","done", or"no_tunnel".port— assigned tunnel port (int) once active, else null.pid— SSH process PID, once spawned, else null.
- class radical.edge.plugin_psij.PluginPSIJ(app: FastAPI, instance_name: str = 'psij')[source]¶
Bases:
PluginPSIJ plugin for Radical Edge.
This plugin provides an interface to submit and manage jobs via the psij-python library.
- plugin_name = 'psij'¶
- session_class¶
alias of
PSIJSession
- client_class¶
alias of
PSIJClient
- ui_config: Dict | UIConfig | None = {'description': 'Submit and monitor HPC batch jobs via PsiJ.', 'forms': [{'fields': [{'column': 0, 'css_class': 'p-exec', 'default': 'radical-edge-wrapper.sh', 'label': 'Executable', 'name': 'exec', 'type': 'text'}, {'column': 0, 'css_class': 'p-args', 'label': 'Arguments (space-separated)', 'name': 'args', 'placeholder': 'auto-filled with --url and --name', 'type': 'text'}, {'column': 0, 'css_class': 'p-executor', 'label': 'Executor', 'name': 'executor', 'options': ['local', 'slurm', 'pbs', 'lsf'], 'type': 'select'}, {'column': 1, 'css_class': 'p-queue', 'label': 'Queue / Partition', 'name': 'queue', 'placeholder': 'optional', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-account', 'label': 'Account / Project', 'name': 'account', 'placeholder': 'optional', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-duration', 'label': 'Duration (seconds)', 'name': 'duration', 'placeholder': 'e.g. 600', 'required': False, 'type': 'text'}, {'column': 1, 'css_class': 'p-node-count', 'label': 'Number of Nodes', 'name': 'node_count', 'placeholder': 'e.g. 1', 'required': False, 'type': 'number'}, {'column': 1, 'css_class': 'p-custom-attr', 'label': '🔧 Custom Attributes', 'name': 'custom', 'required': False, 'type': 'custom_attributes'}], 'id': 'submit', 'layout': 'grid2', 'submit': {'label': '🚀 Submit Job', 'style': 'success'}, 'title': '📝 Submit Job'}], 'icon': '🚀', 'monitors': [{'css_class': 'psij-output', 'empty_text': 'No jobs submitted yet.', 'id': 'jobs', 'title': '📊 Job Monitor', 'type': 'task_list'}], 'notifications': {'id_field': 'job_id', 'state_field': 'state', 'topic': 'job_status'}, 'title': 'PsiJ Jobs'}¶
- async submit_tunneled(request: Request) JSONResponse[source]¶
Submit a job that starts a new Edge service on a compute node.
The job must pass
-n/--name <edge_name>in its arguments so the child edge service can register under the correct name.When
tunnel=truethe plugin automatically appends--tunnelto the job’s argument list. The child edge service reads this flag at startup and waits for a relay port file at the hardcoded path~/.radical/edge/tunnels/{edge_name}.portbefore connecting to the bridge. The watcher on the parent edge writes that file once the reverse SSH tunnel is established.When
tunnel=truethe plugin:Cleans up any stale relay file from a previous run.
Injects
--tunnelinto the job arguments.Spawns an async watcher that waits for the SLURM job to reach RUNNING, then opens a reverse SSH tunnel (login → compute) and writes the allocated port to the relay file.
Request body JSON fields:
job_spec(dict) — PsiJ job specification.executor(str) — PsiJ executor name (default:"local").tunnel(bool) — Whether to set up a reverse SSH tunnel(default:
false).
- Returns:
JSON with
job_id,native_id, andedge_name.- Raises:
422 if
-n/--nameis missing fromjob_spec.arguments. 409 if a tunnel watcher for the same edge name is already active.
- async tunnel_status(request: Request) JSONResponse[source]¶
Return the current tunnel status for a named edge.
Path param:
edge_nameReturns a JSON object with fields:
edge_name— echoed back.status— one of"pending","active","failed","done", or"no_tunnel".
port— allocated tunnel port (int) once active, else null.pid— SSH process PID, once spawned, else null.
4.2.6. PluginRhapsody¶
Rhapsody Plugin for Radical Edge.
Exposes the RHAPSODY Session/Task API so that remote clients can submit and monitor compute / AI tasks on edge nodes.
- class radical.edge.plugin_rhapsody.RhapsodySession(sid: str, backend_names: list[str] | None = None)[source]¶
Bases:
PluginSessionRhapsody session (service-side).
Wraps a
rhapsody.Sessioninstance, forwarding task submission, monitoring, cancellation and statistics queries.- async submit_tasks(task_dicts: list[dict]) list[dict][source]¶
Submit a list of tasks.
Each dict is converted to a
ComputeTaskorAITaskviaBaseTask.from_dict().- Returns:
list[dict]: Submitted task representations (uid, state).
- class radical.edge.plugin_rhapsody.RhapsodyClient(http_client: Client, base_url: str, bridge_client: BridgeClient | None = None, edge_id: str | None = None, plugin_name: str | None = None)[source]¶
Bases:
PluginClientClient-side interface for the Rhapsody plugin.
- register_session(backends: list[str] | None = None)[source]¶
Register a session, optionally specifying backend names.
- Args:
- backends: List of backend names (e.g.
['concurrent']). Defaults to
['concurrent']on the server side.
- backends: List of backend names (e.g.
- submit_tasks(task_dicts: list[dict]) list[dict][source]¶
Submit tasks to the edge.
- Args:
task_dicts: List of task specification dicts.
- Returns:
list[dict]: Submitted task info (uid, state).
- class radical.edge.plugin_rhapsody.PluginRhapsody(app: FastAPI, instance_name: str = 'rhapsody')[source]¶
Bases:
PluginRhapsody plugin for Radical Edge.
Exposes the RHAPSODY Session / Task API via REST endpoints:
POST /rhapsody/submit/{sid}
POST /rhapsody/wait/{sid}
GET /rhapsody/task/{sid}/{uid}
POST /rhapsody/cancel/{sid}/{uid}
GET /rhapsody/statistics/{sid}
- plugin_name = 'rhapsody'¶
- session_class¶
alias of
RhapsodySession
- client_class¶
alias of
RhapsodyClient
- ui_config: Dict | UIConfig | None = {'description': 'Submit compute tasks, wait for results, view stdout/stderr.', 'forms': [{'fields': [{'css_class': 'rh-exec', 'default': '/bin/echo', 'label': 'Executable', 'name': 'exec', 'type': 'text'}, {'css_class': 'rh-args', 'default': 'hello from rhapsody', 'label': 'Arguments (space-separated)', 'name': 'args', 'type': 'text'}, {'css_class': 'rh-backends', 'label': 'Backend', 'name': 'backends', 'options': ['concurrent', 'dragon_v3'], 'type': 'select'}], 'id': 'submit', 'layout': 'single', 'submit': {'label': '▶ Submit Task', 'style': 'success'}, 'title': '📝 Submit Task'}], 'icon': '🎼', 'monitors': [{'css_class': 'rh-output', 'empty_text': 'No tasks submitted yet.', 'id': 'tasks', 'title': '📊 Task Monitor', 'type': 'task_list'}], 'notifications': {'id_field': 'uid', 'state_field': 'state', 'topic': 'task_status'}, 'title': 'Rhapsody Tasks'}¶
4.3. Queue Info Backend¶
4.3.1. QueueInfo¶
- class radical.edge.queue_info.QueueInfo[source]¶
Bases:
ABCAbstract base class for batch system queue information backends.
Subclasses implement _collect_info, _collect_jobs, _collect_allocations to gather data from a specific batch system. Results are cached with a configurable TTL.
- start_prefetch()[source]¶
Start background threads to prefetch queue info and allocations in parallel so both caches are warm as quickly as possible.
- get_info(user=None, force=False)[source]¶
Return queue/partition info. force=True bypasses cache.
- Args:
- user (str): User to filter partitions for. When None (default),
defaults to the current user. Pass user=’*’ to return all partitions (admin view).
force (bool): Bypass cache if True.
- Returns:
dict: {“queues”: {<partition_name>: {…}, …}}
- list_jobs(queue, user=None, force=False)[source]¶
List jobs in a queue.
- Args:
queue (str): Partition name to list jobs for. user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user=’*’ to return all jobs.
force (bool): Bypass cache if True.
- Returns:
dict: {“jobs”: [<job_dict>, …]}
- class radical.edge.queue_info.QueueInfoSlurm(slurm_conf=None)[source]¶
Bases:
QueueInfoSLURM backend for queue information.
Calls sinfo, squeue, scontrol, and sacctmgr with –json and parses the results. Target SLURM version: 24.11.5+.
- Args:
- slurm_conf (str): Optional path to slurm.conf. When set, all
subprocess calls run with SLURM_CONF=<path> in their environment, allowing a single edge service to query multiple clusters.
- static get_job_nodes(native_id: str, env: dict | None = None) list[source]¶
Return hostnames of nodes allocated to a running SLURM job.
- Args:
native_id: SLURM job ID (string or int). env: Environment dict (e.g. with SLURM_CONF).
Defaults to inheriting the current environment.
- Returns:
List of hostname strings, or empty list if not determinable.