1. Module Documentation

1.1. Overview

The radical.edge module provides a plugin-based REST API framework for managing distributed computing resources and workflows. It is built on FastAPI and provides a standardized way to create plugins that manage multiple client sessions.

1.2. Core Components

The module consists of several key components:

  1. Plugin System: Base classes for creating plugins with client management

  2. Edge Service: Embedded service runner for hosting plugins

  3. Plugins: Pre-built plugins for various services (Lucid, XGFabric, QueueInfo)

  4. Queue Info: Backend for querying batch system information

1.3. Architecture

1.3.1. Plugin Hierarchy

The plugin system uses a three-tier architecture:

Plugin (base class)
  └── ClientManagedPlugin (manages multiple clients)
        ├── PluginLucid (Radical Pilot integration)
        ├── PluginXGFabric (XGFabric integration)
        └── PluginQueueInfo (Batch system queries)

Each plugin manages multiple client sessions, where each client has:

  • Unique client ID

  • Independent session state

  • Isolated resources (or shared backend, depending on plugin)

1.3.2. Client Lifecycle

  1. Registration: Client calls POST /{plugin}/{uid}/register_client

  2. Operations: Client performs plugin-specific operations

  3. Unregistration: Client calls POST /{plugin}/{uid}/unregister_client/{cid}

All plugins automatically provide:

  • Client registration/unregistration

  • Echo service for testing

  • Error handling and logging

  • Thread-safe ID generation

1.4. Module API

1.4.1. Main Module

1.4.2. Edge Service

class radical.edge.service.RequestShim(path_params: dict, query_params: dict, body_bytes: bytes, content_type: str = 'application/json')[source]

Bases: object

Lightweight adapter for starlette Request.

Provides the three interfaces that every plugin handler uses: path_params, query_params, and await .json() / await .body(). Encoding-agnostic: stores raw bytes, decodes lazily based on content_type.

async body() bytes[source]

Raw body bytes (matches Request.body()).

async json() dict[source]

Parse body into a Python dict (matches Request.json()).

Content-type-aware: JSON or msgpack based on Content-Type header.

class radical.edge.service.EdgeService(bridge_url: str | None = None, name: str | None = None, plugins: list | None = None, tunnel: bool = False, tunnel_via: str | None = None)[source]

Bases: PluginHostBase

Embedded Radical Edge Service.

This class runs the Edge Service logic within an application, supporting both asyncio-based and synchronous applications. It manages the connection to the Bridge and hosts the local plugin execution environment.

The service automatically loads the ‘sysinfo’ plugin to provide system metrics.

Attributes:

app (FastAPI): The internal FastAPI application hosting the plugins.

property bridge_url

Get the current Bridge URL.

async send_notification(plugin_name: str, topic: str, data: Dict[str, Any]) None[source]

Send an unsolicited notification to the bridge to broadcast to UI clients.

Args:

plugin_name: Name of the plugin sending the notification. topic: Notification topic (e.g., “task_status”, “job_status”). data: Notification payload data.

async run() None[source]

Main async entry point. Connects to Bridge and starts processing loop.

stop()[source]

Signal the service to stop.

start_background()[source]

Start the service in a separate daemon thread (for sync apps).

1.4.3. Logging Configuration

Logging configuration for radical.edge

This module sets up standard Python logging with: - Uvicorn-style colored output - Support for correlation IDs in request context - Structured log format

Import this module early in your application to configure logging.

radical.edge.logging_config.set_correlation_id(req_id: str) None[source]

Set the correlation ID for the current async context.

radical.edge.logging_config.get_correlation_id() str | None[source]

Get the correlation ID for the current async context.

radical.edge.logging_config.clear_correlation_id() None[source]

Clear the correlation ID for the current async context.

class radical.edge.logging_config.ColoredFormatter(fmt: str | None = None, datefmt: str | None = None, style: str = '%', use_colors: bool | None = None)[source]

Bases: Formatter

Log formatter with Uvicorn-style coloring and correlation ID support.

format(record: LogRecord) str[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

radical.edge.logging_config.configure_logging(level: int = 20, format_string: str | None = None) None[source]

Configure logging for radical.edge.

Args:

level: Logging level (default: logging.INFO). format_string: Custom format string (optional).