Source code for radical.edge.service


import asyncio
import base64
import json
import logging
import os
import pathlib
import random
import ssl
import socket
import threading
from importlib.metadata import entry_points
from typing import Any, Dict, Optional

import httpx
import websockets
from websockets import exceptions as ws_exc

from fastapi import FastAPI
from httpx import ASGITransport

import radical.edge.logging_config  # noqa: F401 # pylint: disable=unused-import

from radical.edge.plugin_base import Plugin
from radical.edge.models import (
    RequestMessage, PingMessage, ErrorMessage, ShutdownMessage, TopologyMessage,
    ResponseMessage, NotificationMessage, RegisterMessage,
    parse_bridge_message
)
from radical.edge.ui_schema import ui_config_to_dict

log = logging.getLogger("radical.edge")


def _resolve_plugin_names(requested: list, available: list) -> list:
    """Resolve a requested plugin list against the available plugin names.

    Supports prefix matching: 'sys' matches 'sysinfo', 'q' matches 'queue_info'.
    Exact matches take priority over prefix matches.

    Args:
        requested: List of tokens from the user, or ['all'].
        available: Full list of registered plugin names.

    Returns:
        Ordered list of resolved plugin names.

    Raises:
        ValueError: If a token matches nothing or is ambiguous.
    """
    if requested == ['all']:
        return list(available)

    result = []
    for token in requested:
        if token in available:
            result.append(token)
            continue
        matches = [p for p in available if p.startswith(token)]
        if not matches:
            raise ValueError(
                f"No plugin matches '{token}'. "
                f"Available: {', '.join(sorted(available))}"
            )
        if len(matches) > 1:
            raise ValueError(
                f"Ambiguous plugin name '{token}': matches {sorted(matches)}"
            )
        result.append(matches[0])
    return result


[docs] class EdgeService: """ Embedded Radical Edge Service. This class runs the Edge Service logic within an application, supporting both asyncio-based and synchronous applications. It manages the connection to the Bridge and hosts the local plugin execution environment. The service automatically loads the 'sysinfo' plugin to provide system metrics. Attributes: app (FastAPI): The internal FastAPI application hosting the plugins. """ def __init__(self, bridge_url: Optional[str] = None, name: Optional[str] = None, plugins: Optional[list] = None, tunnel: bool = False): """ Initialize the Edge Service. Args: bridge_url: WebSocket URL for the Bridge. Defaults to env var 'RADICAL_BRIDGE_URL' or internal default. name: Edge service name for identification. Defaults to hostname. """ self._bridge_url: str = bridge_url or os.environ.get("RADICAL_BRIDGE_URL", "") self._app: FastAPI = FastAPI(title="Embedded Edge Service") self._app.state.bridge_url = self._bridge_url if not self._bridge_url: raise ValueError("Bridge URL missing as argument or RADICAL_BRIDGE_URL") self._plugins: Dict[str, Plugin] = {} self._name: str = name or socket.gethostname() self._plugin_filter: list = plugins or ['all'] self._app.state.edge_name = self._name self._app.state.edge_service = self self._tunnel: bool = tunnel self._ws: Optional[websockets.WebSocketClientProtocol] = None self._http_client: Optional[httpx.AsyncClient] = None self._send_lock: asyncio.Lock = asyncio.Lock() self._stop_event: asyncio.Event = asyncio.Event() self._running_task: Optional[asyncio.Task] = None self._thread: Optional[threading.Thread] = None self._load_plugins() @property def bridge_url(self): """Get the current Bridge URL.""" return self._bridge_url def _load_plugins(self) -> None: """ Load all known and enabled plugins into the service. This method: 1. Discovers external plugins via 'radical.edge.plugins' entry points 2. Instantiates all registered plugins (built-in and external) """ # Discover and import external plugins via entry points try: eps = entry_points(group='radical.edge.plugins') for ep in eps: try: ep.load() # This imports the module and triggers auto-registration log.info("[Edge] Discovered external plugin: %s", ep.name) except Exception: log.exception("[Edge] Failed to load entry point: %s", ep.name) except Exception: log.debug("[Edge] No external plugins found via entry points") # Resolve requested plugin list (supports prefix matching) to_load = _resolve_plugin_names(self._plugin_filter, Plugin.get_plugin_names()) log.info("[Edge] Loading plugins: %s", to_load) # Instantiate resolved plugins for pname in to_load: try: pclass = Plugin.get_plugin_class(pname) pinstance = pclass(app=self._app) self._plugins[pname] = pinstance if pinstance.is_enabled(): log.info("[Edge] Loaded plugin: %s", pname) else: log.info("[Edge] Loaded plugin (disabled): %s", pname) except Exception: log.exception("[Edge] Failed to load plugin: %s", pname) async def _handle_request(self, msg: RequestMessage) -> None: """ Forward messages received from Bridge to local internal API. Args: msg: Validated request message from bridge. """ req_id = msg.req_id try: log.debug("[Edge] [req:%s] Handling %s %s", req_id, msg.method, msg.path) # Rehydrate body content: Optional[bytes] = None if msg.body is not None: if msg.is_binary: content = base64.b64decode(msg.body) else: content = msg.body.encode("utf-8") # Call the local Edge FastAPI server (in-memory) # URL host doesn't matter for ASGITransport, but must be valid URL url = f"http://local{msg.path}" resp = await self._http_client.request( msg.method, url, content=content, headers=msg.headers, timeout=40.0 ) resp_is_binary = False try: # Try text first out_body = resp.text except Exception: # Fallback to binary out_body = base64.b64encode(resp.content).decode("ascii") resp_is_binary = True response = ResponseMessage( req_id=req_id, status=resp.status_code, headers=dict(resp.headers), is_binary=resp_is_binary, body=out_body ) log.debug("[Edge] [req:%s] Response status=%d", req_id, resp.status_code) async with self._send_lock: if self._ws: await self._ws.send(response.model_dump_json()) except Exception as e: log.exception("[Edge] [req:%s] Error handling request", req_id) error_body = {"error": "edge-invoke-failed", "detail": str(e)} response = ResponseMessage( req_id=req_id, status=502, headers={"content-type": "application/json"}, is_binary=False, body=json.dumps(error_body) ) async with self._send_lock: if self._ws: await self._ws.send(response.model_dump_json()) async def _handle_topology(self, msg: TopologyMessage) -> None: """ Handle topology update from bridge (edge connect/disconnect). Args: msg: Validated topology message from bridge. """ log.debug("[Edge] Topology update: %d edges", len(msg.edges)) # Notify all plugins about the topology change for pname, plugin in self._plugins.items(): try: if hasattr(plugin, 'on_topology_change'): await plugin.on_topology_change(msg.edges) except Exception as e: log.warning("[Edge] Plugin %s topology handler failed: %s", pname, e)
[docs] async def send_notification(self, plugin_name: str, topic: str, data: Dict[str, Any]) -> None: """ Send an unsolicited notification to the bridge to broadcast to UI clients. Args: plugin_name: Name of the plugin sending the notification. topic: Notification topic (e.g., "task_status", "job_status"). data: Notification payload data. """ if not self._ws: log.warning("[Edge] Cannot send notification, not connected") return notification = NotificationMessage( edge=self._name, plugin=plugin_name, topic=topic, data=data ) async with self._send_lock: try: await self._ws.send(notification.model_dump_json()) log.debug("[Edge] Sent notification: %s/%s", plugin_name, topic) except Exception as e: log.warning("[Edge] Failed to send notification: %s", e)
[docs] async def run(self) -> None: """ Main async entry point. Connects to Bridge and starts processing loop. """ PING_INTERVAL = 20 PING_TIMEOUT = 30 MAX_BACKOFF = 10 JITTER_FACTOR = 0.3 # Add up to 30% jitter to prevent thundering herd BACKOFF_FACTOR = 1.2 backoff = 0.5 self._stop_event.clear() self._running_task = asyncio.current_task() # ── Reverse-tunnel relay (--tunnel flag) ───────────────────────────── # When --tunnel is passed, a parent edge has set up a reverse SSH # tunnel. The tunnel port is written to a shared file derived from # this edge's name; we wait for it and rewrite the bridge URL to go # through localhost:<port>. if self._tunnel: relay_file = ( pathlib.Path.home() / '.radical' / 'edge' / 'tunnels' / f'{self._name}.port' ) log.info("[Edge] --tunnel: waiting for relay port file: %s", relay_file) for _ in range(30): # 30 × 2s = 60 s if relay_file.exists(): break await asyncio.sleep(2) else: raise RuntimeError( f"Relay port file never appeared after 60 s: {relay_file}\n" f" The parent edge's SSH reverse-tunnel watcher writes this file " f"once the tunnel is active. Bridge URL: {self._bridge_url}") relay_port = int(relay_file.read_text().strip()) from urllib.parse import urlparse, urlunparse parsed = urlparse(self._bridge_url) self._bridge_url = urlunparse( parsed._replace(netloc=f'localhost:{relay_port}')) log.info("[Edge] Relay active; using %s", self._bridge_url) # ── End relay setup ─────────────────────────────────────────────────── transport = ASGITransport(app=self._app) while not self._stop_event.is_set(): try: async with httpx.AsyncClient(transport=transport, base_url="http://local") as http_client: self._http_client = http_client try: # For the ws connect, we change http(s) to ws(s) if self._bridge_url.startswith("https://"): ws_url = "wss://" + self._bridge_url[len("https://"):] elif self._bridge_url.startswith("http://"): ws_url = "ws://" + self._bridge_url[len("http://"):] else: ws_url = self._bridge_url # remove trailing slashes ws_url = ws_url.rstrip("/") if not ws_url.endswith("/register"): ws_url += "/register" # Determine if we need SSL ssl_ctx = None if ws_url.startswith("wss://"): ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE certfile = os.environ.get("RADICAL_BRIDGE_CERT") if certfile and os.path.exists(certfile): ssl_ctx.load_verify_locations(certfile) async with websockets.connect(ws_url, ssl=ssl_ctx, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, close_timeout=2) as ws: self._ws = ws log.info("[Edge] Connected to %s", self._bridge_url) backoff = 0.5 # Reset backoff on success # Register edge + all plugins in a single message async with self._send_lock: plugins_data = {} for pname, plugin in self._plugins.items(): ui_module_content = None ui_module_path = getattr(plugin.__class__, 'ui_module', None) if ui_module_path and os.path.isfile(ui_module_path): try: with open(ui_module_path, encoding='utf-8') as f: ui_module_content = f.read() except Exception: log.warning("[Edge] Could not read ui_module for %s: %s", pname, ui_module_path) plugins_data[pname] = { "type": pname, "namespace": f"/{self._name}{plugin.namespace}", "version": getattr(plugin, 'version', '0.0.1'), "enabled": plugin.is_enabled(), "ui_config": ui_config_to_dict( getattr(plugin, 'ui_config', None) ), "ui_module": ui_module_content, } reg = RegisterMessage( edge_name=self._name, endpoint={"type": "radical.edge"}, plugins=plugins_data, ) await ws.send(reg.model_dump_json()) # Processing Loop — use asyncio.wait so the loop wakes # immediately on either a new message or stop signal, # eliminating the 1-second idle timeout overhead. _recv_task = asyncio.ensure_future(ws.recv()) _stop_fut = asyncio.ensure_future(self._stop_event.wait()) try: while not self._stop_event.is_set(): done, _ = await asyncio.wait( {_recv_task, _stop_fut}, return_when=asyncio.FIRST_COMPLETED) if _stop_fut in done: _recv_task.cancel() break # _recv_task completed — retrieve result try: raw_msg = _recv_task.result() except websockets.exceptions.ConnectionClosed: if self._stop_event.is_set(): _stop_fut.cancel() break log.info("[Edge] Connection closed") _stop_fut.cancel() raise # Reconnect # Arm next recv immediately _recv_task = asyncio.ensure_future(ws.recv()) data = json.loads(raw_msg) try: msg = parse_bridge_message(data) except ValueError as ve: log.warning("[Edge] Invalid message: %s", ve) continue if isinstance(msg, ErrorMessage): log.error("[Edge] Registration error: %s", msg.message) self._stop_event.set() _recv_task.cancel() _stop_fut.cancel() return # Fatal error, stop if isinstance(msg, PingMessage): async with self._send_lock: await ws.send('{"type": "pong"}') continue if isinstance(msg, ShutdownMessage): log.info("[Edge] Shutdown requested: %s", msg.reason) self._stop_event.set() _recv_task.cancel() _stop_fut.cancel() return if isinstance(msg, RequestMessage): asyncio.create_task(self._handle_request(msg)) if isinstance(msg, TopologyMessage): asyncio.create_task(self._handle_topology(msg)) finally: _recv_task.cancel() _stop_fut.cancel() await asyncio.gather( _recv_task, _stop_fut, return_exceptions=True) except (ws_exc.ConnectionClosed, OSError) as e: if self._stop_event.is_set(): break # no reconnect # Add jitter to backoff to prevent thundering herd jitter = backoff * JITTER_FACTOR * random.random() sleep_time = backoff + jitter log.warning("[Edge] Connection lost: %s. Reconnecting in %.1fs...", e, sleep_time) await asyncio.sleep(sleep_time) backoff = min(backoff * BACKOFF_FACTOR, MAX_BACKOFF) except Exception as e: # Fatal errors set the stop event, so check that first if self._stop_event.is_set(): break log.exception("[Edge] Unexpected error: %s", e) jitter = 2 * JITTER_FACTOR * random.random() await asyncio.sleep(2 + jitter)
[docs] def stop(self): """Signal the service to stop.""" self._stop_event.set() if self._running_task: self._running_task.cancel()
[docs] def start_background(self): """Start the service in a separate daemon thread (for sync apps).""" if self._thread and self._thread.is_alive(): raise RuntimeError("Service already running in background") self._thread = threading.Thread(target=self._run_thread, daemon=True) self._thread.start()
def _run_thread(self): """Entry point for background thread.""" try: asyncio.run(self.run()) except asyncio.CancelledError: log.info("[Edge] Background service cancelled") except Exception as e: log.exception("[Edge] Background thread failed: %s", e)