3. Plugin Development Guide¶
3.1. Overview¶
The Radical Edge plugin system lets you extend edge nodes with domain-specific functionality. Each plugin gets its own URL namespace, session management, and notification support out of the box.
3.2. Architecture¶
3.2.1. Base Classes¶
The plugin system provides three base classes:
Plugin (
plugin_base.py) — Server-side plugin registered on the edge.Manages sessions, routes, and notifications
Auto-registers via
plugin_nameclass attributeProvides
add_route_post()/add_route_get()helpersForwards requests to sessions via
_forward()
PluginSession (
plugin_session_base.py) — Per-client session state.Created when a client calls
register_sessionHolds domain-specific state (jobs, tasks, connections)
Sends notifications via
self._notify(topic, data)
PluginClient (
client.py) — Application-side client helper.Wraps HTTP calls to the bridge/edge REST API
Manages session registration and lifecycle
Optional: only needed for Python client usage
3.2.2. Inheritance Hierarchy¶
Plugin (server-side)
├── PluginSysinfo
├── PluginPSIJ
├── PluginRhapsody
├── PluginQueueInfo
├── PluginLucid
└── PluginXGFabric
PluginSession (server-side)
├── SysinfoSession
├── PSIJSession
├── RhapsodySession
├── QueueInfoSession
└── ...
PluginClient (client-side)
├── PSIJClient
├── RhapsodyClient
├── QueueInfoClient
└── ...
3.3. Creating a New Plugin¶
3.3.1. Step 1: Define Your Session Class¶
Create a session class that inherits from PluginSession:
from radical.edge.plugin_session_base import PluginSession
class MySession(PluginSession):
"""Server-side session for MyPlugin."""
def __init__(self, sid: str):
super().__init__(sid)
self._data = {} # Per-session state
async def do_work(self, param: str) -> dict:
"""Perform a domain-specific operation."""
self._check_active()
result = f"processed: {param}"
self._data[param] = result
# Send real-time notification to clients
if self._notify:
self._notify("work_status", {
"param": param,
"status": "done"
})
return {"result": result}
async def close(self) -> dict:
"""Clean up session resources."""
self._data = {}
return await super().close()
Key Points:
Call
super().__init__(sid)to initialize base functionalityUse
self._check_active()to validate session is openUse
self._notify(topic, data)for real-time notificationsCall
await super().close()in your close method
3.3.2. Step 2: Define Your Plugin Class¶
Create a plugin class that inherits from Plugin:
from fastapi import FastAPI, Request
from starlette.responses import JSONResponse
from radical.edge.plugin_base import Plugin
class PluginMyService(Plugin):
"""MyService plugin for Radical Edge."""
plugin_name = "myservice" # URL namespace and registry key
session_class = MySession # Required!
version = '0.1.0'
def __init__(self, app: FastAPI, instance_name: str = "myservice"):
super().__init__(app, instance_name)
# Add plugin-specific routes
self.add_route_post('do_work/{sid}', self.do_work)
async def do_work(self, request: Request) -> JSONResponse:
"""Route handler — forwards to session method."""
sid = request.path_params['sid']
data = await request.json()
return await self._forward(sid, MySession.do_work,
param=data['param'])
Key Points:
Set
plugin_namefor auto-registration and URL namespaceSet
session_classto your session classUse
self.add_route_post()/self.add_route_get()for routesUse
self._forward(sid, method, **kwargs)to dispatch to sessions_forwardhandles session lookup, error wrapping, and JSON response
3.3.3. Auto-Registered Routes¶
Every plugin automatically gets these routes:
POST /{plugin_name}/register_session— Create a new sessionPOST /{plugin_name}/unregister_session/{sid}— Close a sessionGET /{plugin_name}/version— Plugin versionGET /{plugin_name}/list_sessions— List active sessionsGET /{plugin_name}/health— Health checkGET /{plugin_name}/ui_config— UI configuration for the Explorer
3.3.4. Step 3: Define Your Client Class (Optional)¶
For Python client access, create a client class:
from radical.edge.client import PluginClient
class MyServiceClient(PluginClient):
"""Client-side interface for MyService plugin."""
def do_work(self, param: str) -> dict:
"""Call do_work on the edge."""
if not self.sid:
raise RuntimeError("No active session")
url = self._url(f"do_work/{self.sid}")
resp = self._http.post(url, json={"param": param})
self._raise(resp, f"do_work({param!r})")
return resp.json()
Key Points:
self.sidis set afterregister_session()self._url(path)builds the full URL with namespaceself._httpis the HTTP client (httpx.Client)self._raise(resp)raises on non-2xx status codes
3.4. Advanced Patterns¶
3.4.1. Custom Session Creation¶
Override _create_session() for custom initialization:
class PluginMyService(Plugin):
session_class = MySession
def _create_session(self, sid: str, **kwargs) -> MySession:
"""Pass extra config to sessions."""
return self.session_class(sid, config=self._config)
3.4.2. Custom Session Registration¶
Override register_session() for custom registration logic:
async def register_session(self, request: Request) -> JSONResponse:
"""Register with custom parameters."""
import uuid as _uuid
data = await request.json()
backends = data.get('backends', ['default'])
sid = f"session.{_uuid.uuid4().hex[:8]}"
session = self._create_session(sid, backends=backends)
if hasattr(session, 'initialize'):
await session.initialize()
self._sessions[sid] = session
return JSONResponse({"sid": sid})
3.4.3. Notifications¶
Sessions send notifications via self._notify(topic, data).
Notifications flow: Session → Plugin → Edge → Bridge → SSE clients.
# In your session method:
if self._notify:
self._notify("job_status", {
"job_id": "abc123",
"state": "RUNNING"
})
Clients receive notifications via SSE at /events on the bridge.
See the main CLAUDE.md for subscription examples (JavaScript, Python).
3.4.4. Topology Updates¶
Override on_topology_change to react when edges connect or disconnect:
class PluginMyService(Plugin):
async def on_topology_change(self, edges: dict):
for edge_name, info in edges.items():
plugins = info.get('plugins', [])
print(f"Edge {edge_name}: {plugins}")
3.5. UI Configuration¶
Plugins can provide a ui_config dict that the Explorer UI uses to
render forms, monitors, and notification subscriptions automatically:
class PluginMyService(Plugin):
ui_config = {
"icon": "🔧",
"title": "My Service",
"description": "Does useful things.",
"forms": [{
"id": "submit",
"title": "Submit Work",
"fields": [
{"name": "param", "type": "text", "label": "Parameter",
"default": "hello"},
],
"submit": {"label": "▶ Submit", "style": "success"}
}],
"monitors": [{
"id": "tasks",
"title": "Task Monitor",
"type": "task_list",
"empty_text": "No tasks yet."
}],
"notifications": {
"topic": "work_status",
"id_field": "task_id",
"state_field": "state"
}
}
Alternatively, plugins can provide a custom JS module by setting
ui_module to the path of a .js file. See the next section for
the complete JS Module API reference.
3.6. JS Plugin Module API¶
When ui_module is set to a .js file path, the Explorer loads and runs
the module. The module must be an ES module (type="module") and may export
the following functions and constants:
3.6.1. Required Exports¶
// Unique plugin name — used for routing and session lookup
export const name = 'myplugin';
// Return the HTML for the plugin page (called once per edge)
export function template() { return '<div>...</div>'; }
// Return plugin-scoped CSS (injected into a <style> tag)
export function css() { return '.my-class { ... }'; }
// Called when the plugin page is mounted; bind event listeners here
export function init(page, api) { ... }
3.6.2. Optional Exports¶
// Called when the plugin's tab is shown (page already mounted)
export function onShow(page, api) { ... }
// Called when an SSE notification arrives matching notificationConfig
export function onNotification(data, page, api) { ... }
// Declare which SSE topic this plugin subscribes to
export const notificationConfig = {
topic: 'job_status', // SSE topic to subscribe to
idField: 'job_id', // Field in data.data used as entity ID
};
3.6.3. The api Object¶
The api object is passed to init(), onShow(), and
onNotification(). It exposes:
Session management
api.getSession(pluginName)Returns a Promise resolving to the active session ID for the named plugin, creating one if needed.
HTTP
api.fetch(path, options)Fetch relative to the current plugin namespace on the bridge. Returns parsed JSON. Throws on HTTP errors.
api.fetchRaw(path, options)Same as
fetchbut returns the rawResponseobject. Used when you need headers or streaming (e.g. file download).
UI helpers
api.flash(message, ok=true)Show a transient status message.
ok=falsestyles it as an error.api.escHtml(s)HTML-escape a string for safe
innerHTMLinsertion.api.showOverlay(title, bodyHtml)Open the shared full-screen overlay with the given title and HTML body.
Task tracking
api.registerTask(plugin, id, label)Register a task ID in the global task list (shown in the taskbar).
Queue data cache
api.getQueueData()Return cached queue/allocation data for this edge (populated by the
queue_infoplugin on load), orundefinedif not available.api.setQueueData(data)Store queue data for this edge (called by
queue_info).
Edge info (read-only properties)
api.edgeNameThe name of the current edge (e.g.
"hpc1").api.pluginNameThe plugin module name.
api.bridgeUrlFull URL of the bridge (e.g.
"https://bridge:8000").api.getPluginNames()Returns an array of all plugin names registered on this edge.
Edge management
api.disconnectEdge(event)Initiate graceful disconnection of this edge. Pass the click event to prevent default and stop propagation.
3.6.4. Notifications¶
SSE notifications are delivered to onNotification(data, page, api) only
if the module exports a matching notificationConfig:
export const notificationConfig = {
topic: 'job_status', // Must match the server-side notify() topic
idField: 'job_id', // Field in notification data used as entity key
};
The data argument passed to onNotification has this shape:
{
topic: 'job_status',
data: { job_id: '...', state: 'RUNNING', ... }
}
Buffering pattern: Notifications may arrive before the entity row exists
in the DOM (e.g. a status update arrives before submit returns). Buffer
them in a module-level dict keyed by entity ID, then drain the buffer after
adding the row:
const pending = {}; // id -> notification data
export function onNotification(data, page, api) {
const id = data.data?.job_id;
const row = page.querySelector(`[data-job-id="${CSS.escape(id)}"]`);
if (row) {
updateRow(page, id, data.data.state);
} else if (id) {
pending[id] = data.data; // buffer for later
}
}
// After creating the row:
if (pending[id]) {
updateRow(page, id, pending[id].state);
delete pending[id];
}
See psij.js and rhapsody.js for complete examples of this pattern.
3.7. Session Lifecycle¶
Sessions are created on the first api.getSession() call and persist until:
The browser tab is closed or navigated away
The edge disconnects (all sessions are lost; the edge has no persistence)
The session TTL expires (default 1 hour of inactivity)
The client calls
unregister_session/{sid}
When close() is called on a PluginSession:
The session should release all resources (threads, backend connections, file handles)
Any background polling or watchers must be cancelled
The base
super().close()sets the session status to inactive
Sessions are not persisted across edge restarts. Clients must re-register after an edge reconnects.
3.8. Async / Sync Guidelines¶
All plugin route handlers must be async def. Blocking operations
(file I/O, subprocess calls, network requests) must be offloaded to a thread
pool using asyncio.to_thread:
async def my_handler(self, param: str) -> dict:
# Blocking call — run in thread to avoid blocking the event loop
result = await asyncio.to_thread(subprocess.check_output, ['cmd', param])
return {'output': result.decode()}
Callbacks from external libraries (e.g. PsiJ status callbacks, Rhapsody
backend callbacks) run in background threads, not the event loop. Use
self._notify(topic, data) from those callbacks — it is thread-safe and
schedules the SSE send on the main event loop automatically.
3.9. Testing Your Plugin¶
import pytest
from fastapi import FastAPI
from starlette.testclient import TestClient
@pytest.mark.asyncio
async def test_my_plugin():
app = FastAPI()
plugin = PluginMyService(app)
client = TestClient(app)
# Register session
resp = client.post(f"{plugin.namespace}/register_session")
assert resp.status_code == 200
sid = resp.json()['sid']
# Call plugin endpoint
resp = client.post(
f"{plugin.namespace}/do_work/{sid}",
json={"param": "test"}
)
assert resp.status_code == 200
assert resp.json()['result'] == "processed: test"
3.10. Summary¶
To create a new plugin:
Create a session class inheriting from
PluginSessionCreate a plugin class inheriting from
PluginSet
plugin_nameandsession_classAdd routes in
__init__usingadd_route_post/add_route_getOptionally create a
PluginClientsubclass for Python clientsOptionally provide
ui_configfor the Explorer UI
See the existing plugins (plugin_sysinfo.py, plugin_psij.py,
plugin_rhapsody.py) for real-world examples.