Source code for radical.edge.plugin_rhapsody

'''
Rhapsody Plugin for Radical Edge.

Exposes the RHAPSODY Session/Task API so that remote clients can submit
and monitor compute / AI tasks on edge nodes.
'''

import asyncio
import logging
import threading
import time
import uuid

from fastapi import FastAPI, HTTPException, Request
from starlette.responses import JSONResponse

from .plugin_session_base import PluginSession
from .plugin_base import Plugin
from .client import PluginClient

log = logging.getLogger("radical.edge")

TERMINAL_STATES = {'DONE', 'FAILED', 'CANCELED', 'COMPLETED'}

# Guard rhapsody import — it is an optional dependency
try:
    import rhapsody as rh
except ImportError:
    rh = None


# ---------------------------------------------------------------------------
# Edge-side session
# ---------------------------------------------------------------------------

[docs] class RhapsodySession(PluginSession): """ Rhapsody session (service-side). Wraps a ``rhapsody.Session`` instance, forwarding task submission, monitoring, cancellation and statistics queries. """ def __init__(self, sid: str, backend_names: list[str] | None = None): """ Initialize a RhapsodySession. Args: sid (str): Unique session identifier. backend_names (list[str] | None): Backends to configure. Defaults to ``['concurrent']``. """ super().__init__(sid) if rh is None: raise RuntimeError("rhapsody package is not installed") self.backend_names = backend_names or ['concurrent'] self._rh_session = None self._tasks: dict[str, dict] = {}
[docs] async def initialize(self) -> None: """Asynchronously initialize the session and its backends.""" backends = [] for name in self.backend_names: b = rh.get_backend(name) if hasattr(b, '__await__'): b = await b backends.append(b) self._rh_session = rh.Session(backends=backends, uid=self._sid) # Register state-change callbacks for intermediate notifications self._notified_states: dict[str, str] = {} self._notified_lock = threading.Lock() for b in backends: if hasattr(b, 'register_callback'): orig = getattr(b, '_callback_func', None) def _on_state(task, state, _orig=orig): self._on_task_state_change(task, state) if _orig: _orig(task, state) b.register_callback(_on_state)
def _on_task_state_change(self, task, state): """Fire notification on intermediate state changes (e.g. RUNNING). Called from backend threads — uses lock for _notified_states access. """ uid = self._get_attr(task, 'uid') uid_str = str(uid) if uid else '?' state_str = str(state) with self._notified_lock: # Skip if we already notified this state if self._notified_states.get(uid_str) == state_str: return self._notified_states[uid_str] = state_str # Only fire for non-terminal states; terminal is handled by _watch_task if state_str.upper() in TERMINAL_STATES: return if self._plugin: self._plugin._dispatch_notify("task_status", { "uid": uid_str, "state": state_str, })
[docs] async def submit_tasks(self, task_dicts: list[dict]) -> list[dict]: """ Submit a list of tasks. Each dict is converted to a ``ComputeTask`` or ``AITask`` via ``BaseTask.from_dict()``. Returns: list[dict]: Submitted task representations (uid, state). """ self._check_active() tasks = [rh.BaseTask.from_dict(td) for td in task_dicts] await self._rh_session.submit_tasks(tasks) results = [] for t in tasks: self._tasks[str(t.uid)] = t state = t.get("state") if state is not None: state = str(state) results.append({"uid": t.uid, "state": state}) # Start one background watcher per task so each notifies independently if self._plugin: for t in tasks: asyncio.ensure_future(self._watch_task(t)) return results
async def _watch_task(self, task): """Background watcher for a single task: notify as soon as it completes.""" uid = self._get_attr(task, 'uid') uid_str = str(uid) if uid else '?' log.debug("[%s] Watcher started for task %s", self._sid, uid_str) try: # Check session is still valid if not self._rh_session: log.warning("[%s] Session closed before task %s completed", self._sid, uid_str) self._send_error_notification(uid_str, "Session closed") return await self._rh_session.wait_tasks([task]) # State might be in the object or the dict state = self._get_attr(task, 'state') log.info("[%s] Task %s completed with state: %s", self._sid, uid_str, state) d = self._sanitize_task(task) log.debug("[%s] Sending notification for task %s: %s", self._sid, uid_str, d) if self._plugin: self._plugin._dispatch_notify("task_status", d) except Exception as e: log.warning("[%s] Rhapsody watch error for task %s: %s", self._sid, uid_str, e) # Send error notification so UI doesn't stay stuck in SUBMITTED self._send_error_notification(uid_str, str(e)) def _send_error_notification(self, uid: str, error: str) -> None: """Send a FAILED notification when watcher encounters an error.""" if self._plugin: self._plugin._dispatch_notify("task_status", { "uid": uid, "state": "FAILED", "error": error })
[docs] async def wait_tasks(self, uids: list[str], timeout: float | None = None) -> list[dict]: """ Wait for tasks to reach a terminal state. Args: uids (list[str]): Task UIDs to wait for. timeout (float | None): Seconds to wait (``None`` = forever). Returns: list[dict]: Final task state dicts. """ self._check_active() tasks = [self._tasks[uid] for uid in uids if uid in self._tasks] if not tasks: raise HTTPException(status_code=404, detail="none of the requested tasks found") await self._rh_session.wait_tasks(tasks, timeout=timeout) return [self._sanitize_task(t) for t in tasks]
def _get_attr(self, obj, attr, default=None): """Helper to get attribute from object or dict.""" val = getattr(obj, attr, None) if val is None and isinstance(obj, dict): val = obj.get(attr) return val if val is not None else default def _sanitize_task(self, t) -> dict: """Sanitize a Rhapsody task dict so it's JSON serializable.""" if hasattr(t, 'to_dict'): d = t.to_dict() else: d = dict(t) # Ensure 'uid' is present and a string uid = self._get_attr(t, 'uid') if uid: d['uid'] = str(uid) # Ensure 'state' is present and a string state = self._get_attr(t, 'state') if state: d['state'] = str(state) d.pop('future', None) if 'exception' in d and d['exception'] is not None: d['exception'] = str(d['exception']) return d
[docs] async def list_tasks(self) -> dict: """Return all tasks in this session with current state.""" self._check_active() tasks = [] for uid, task in self._tasks.items(): tasks.append(self._sanitize_task(task)) return {"tasks": tasks}
[docs] async def get_task(self, uid: str) -> dict: """ Return info for a single cached task. """ self._check_active() task = self._tasks.get(uid) if not task: raise HTTPException(status_code=404, detail=f"task {uid} not found") return self._sanitize_task(task)
[docs] async def cancel_task(self, uid: str) -> dict: """ Cancel a running task. """ self._check_active() task = self._tasks.get(uid) if not task: raise HTTPException(status_code=404, detail=f"task {uid} not found") backend_name = task.get("backend") if backend_name and backend_name in self._rh_session.backends: backend = self._rh_session.backends[backend_name] await backend.cancel_task(uid) return {"uid": uid, "status": "canceled"}
[docs] async def get_statistics(self) -> dict: """ Return session-level statistics. """ self._check_active() return self._rh_session.get_statistics()
[docs] async def close(self) -> dict: """ Shutdown RHAPSODY session and clean up. """ if self._rh_session: await self._rh_session.close() self._rh_session = None self._tasks = {} return await super().close()
# --------------------------------------------------------------------------- # Application-side client # ---------------------------------------------------------------------------
[docs] class RhapsodyClient(PluginClient): """ Client-side interface for the Rhapsody plugin. """
[docs] def register_session(self, backends: list[str] | None = None): """ Register a session, optionally specifying backend names. Args: backends: List of backend names (e.g. ``['concurrent']``). Defaults to ``['concurrent']`` on the server side. """ payload = {} if backends: payload['backends'] = backends resp = self._http.post(self._url('register_session'), json=payload) self._raise(resp) self._sid = resp.json()['sid']
[docs] def submit_tasks(self, task_dicts: list[dict]) -> list[dict]: """ Submit tasks to the edge. Args: task_dicts: List of task specification dicts. Returns: list[dict]: Submitted task info (uid, state). """ self._require_session() url = self._url(f"submit/{self.sid}") resp = self._http.post(url, json={"tasks": task_dicts}) exes = [t.get('executable', '?') for t in task_dicts[:3]] self._raise(resp, f"submit {len(task_dicts)} task(s): {exes}") return resp.json()
[docs] def wait_tasks(self, uids: list[str], timeout: float | None = None) -> list[dict]: """ Wait for tasks to complete. Args: uids: Task UIDs to wait for. timeout: Seconds to wait (None = forever). Returns: list[dict]: Completed task dicts. """ self._require_session() url = self._url(f"wait/{self.sid}") payload: dict = {"uids": uids} if timeout is not None: payload["timeout"] = timeout resp = self._http.post(url, json=payload) self._raise(resp, f"wait {len(uids)} task(s)") return resp.json()
[docs] def list_tasks(self) -> dict: """List all tasks in this session.""" self._require_session() resp = self._http.get(self._url(f"list_tasks/{self.sid}")) self._raise(resp) return resp.json()
[docs] def get_task(self, uid: str) -> dict: """ Retrieve info for a single task. """ self._require_session() url = self._url(f"task/{self.sid}/{uid}") resp = self._http.get(url) self._raise(resp) return resp.json()
[docs] def cancel_task(self, uid: str) -> dict: """ Cancel a task. """ self._require_session() url = self._url(f"cancel/{self.sid}/{uid}") resp = self._http.post(url) self._raise(resp) return resp.json()
[docs] def get_statistics(self) -> dict: """ Request session statistics. """ self._require_session() url = self._url(f"statistics/{self.sid}") resp = self._http.get(url) self._raise(resp) return resp.json()
# --------------------------------------------------------------------------- # Server-side plugin # ---------------------------------------------------------------------------
[docs] class PluginRhapsody(Plugin): ''' Rhapsody plugin for Radical Edge. Exposes the RHAPSODY Session / Task API via REST endpoints: - POST /rhapsody/submit/{sid} - POST /rhapsody/wait/{sid} - GET /rhapsody/task/{sid}/{uid} - POST /rhapsody/cancel/{sid}/{uid} - GET /rhapsody/statistics/{sid} ''' plugin_name = "rhapsody" session_class = RhapsodySession client_class = RhapsodyClient version = '0.0.1' ui_config = { "icon": "🎼", "title": "Rhapsody Tasks", "description": "Submit compute tasks, wait for results, view stdout/stderr.", "forms": [{ "id": "submit", "title": "📝 Submit Task", "layout": "single", "fields": [ {"name": "exec", "type": "text", "label": "Executable", "default": "/bin/echo", "css_class": "rh-exec"}, {"name": "args", "type": "text", "label": "Arguments (space-separated)", "default": "hello from rhapsody", "css_class": "rh-args"}, {"name": "backends", "type": "select", "label": "Backend", "options": ["concurrent", "dragon_v3"], "css_class": "rh-backends"}, ], "submit": {"label": "▶ Submit Task", "style": "success"} }], "monitors": [{ "id": "tasks", "title": "📊 Task Monitor", "type": "task_list", "css_class": "rh-output", "empty_text": "No tasks submitted yet." }], "notifications": { "topic": "task_status", "id_field": "uid", "state_field": "state" } } def __init__(self, app: FastAPI, instance_name: str = "rhapsody"): super().__init__(app, instance_name) self.add_route_post('submit/{sid}', self.submit_tasks) self.add_route_post('wait/{sid}', self.wait_tasks) self.add_route_get('list_tasks/{sid}', self.list_tasks) self.add_route_get('task/{sid}/{uid}', self.get_task) self.add_route_post('cancel/{sid}/{uid}', self.cancel_task) self.add_route_get('statistics/{sid}', self.get_statistics)
[docs] async def register_session(self, request: Request) -> JSONResponse: """Register a new Rhapsody session. Accepts an optional JSON body with ``{"backends": ["name", ...]}``. """ try: data = await request.json() except Exception: data = {} backend_names = data.get('backends') # Build session directly to avoid race on shared plugin state self._ensure_cleanup_task() sid = f"session.{uuid.uuid4().hex[:8]}" session = self.session_class(sid, backend_names=backend_names) session._plugin = self self._sessions[sid] = session self._session_last_access[sid] = time.time() log.info("[%s] Registered session %s", self.instance_name, sid) if hasattr(session, 'initialize'): await session.initialize() return JSONResponse({"sid": sid})
# -- route handlers -----------------------------------------------------
[docs] async def submit_tasks(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] data = await request.json() task_dicts = data.get('tasks', []) return await self._forward(sid, RhapsodySession.submit_tasks, task_dicts=task_dicts)
[docs] async def wait_tasks(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] data = await request.json() uids = data.get('uids', []) timeout = data.get('timeout') return await self._forward(sid, RhapsodySession.wait_tasks, uids=uids, timeout=timeout)
[docs] async def list_tasks(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, RhapsodySession.list_tasks)
[docs] async def get_task(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] uid = request.path_params['uid'] return await self._forward(sid, RhapsodySession.get_task, uid=uid)
[docs] async def cancel_task(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] uid = request.path_params['uid'] return await self._forward(sid, RhapsodySession.cancel_task, uid=uid)
[docs] async def get_statistics(self, request: Request) -> JSONResponse: sid = request.path_params['sid'] return await self._forward(sid, RhapsodySession.get_statistics)