2. Embedding the Edge Service

The Radical Edge Service can be embedded directly into Python applications, supporting both asyncio and synchronous execution models. This allows you to run the Edge Service as a component within your larger application without managing a separate process or local network ports.

2.1. Architecture

The EdgeService class provides a self-contained service that:

  • Connects to the Radical Bridge via WebSocket.

  • Hosts plugins in an internal, in-memory FastAPI application.

  • Routes requests from the Bridge directly to plugins without opening local TCP ports.

2.2. Usage

2.2.1. Synchronous Application (Threaded)

For standard Python applications (scripts, Flask apps, etc.), use start_background() to run the service in a daemon thread.

import time
from radical.edge import EdgeService, PluginXGFabric

def main():
    # Initialize service with desired plugins
    service = EdgeService(
        bridge_url="wss://radical-pilot.org/bridge/register",
        plugins=[PluginXGFabric]
    )

    print("Starting Edge Service...")
    # Runs the service loop in a separate daemon thread
    service.start_background()

    try:
        # Your main application logic here
        while True:
            time.sleep(1)

    except KeyboardInterrupt:
        print("Stopping...")
        service.stop()

if __name__ == "__main__":
    main()

2.2.2. Asyncio Application

For asyncio applications (FastAPI, value-added services), await service.run() in a task.

import asyncio
from radical.edge import EdgeService, PluginLucid

async def main():
    # Initialize service
    service = EdgeService(
        bridge_url="wss://radical-pilot.org/bridge/register",
        plugins=[PluginLucid]
    )

    # Run service concurrently
    service_task = asyncio.create_task(service.run())

    try:
        # Your async application logic
        await asyncio.sleep(3600)

    finally:
        service.stop()
        await service_task

if __name__ == "__main__":
    asyncio.run(main())

2.3. API Reference

class radical.edge.service.EdgeService(bridge_url: str | None = None, name: str | None = None, plugins: list | None = None, tunnel: bool = False, tunnel_via: str | None = None)[source]

Bases: PluginHostBase

Embedded Radical Edge Service.

This class runs the Edge Service logic within an application, supporting both asyncio-based and synchronous applications. It manages the connection to the Bridge and hosts the local plugin execution environment.

The service automatically loads the ‘sysinfo’ plugin to provide system metrics.

Attributes:

app (FastAPI): The internal FastAPI application hosting the plugins.

property bridge_url

Get the current Bridge URL.

async send_notification(plugin_name: str, topic: str, data: Dict[str, Any]) None[source]

Send an unsolicited notification to the bridge to broadcast to UI clients.

Args:

plugin_name: Name of the plugin sending the notification. topic: Notification topic (e.g., “task_status”, “job_status”). data: Notification payload data.

async run() None[source]

Main async entry point. Connects to Bridge and starts processing loop.

stop()[source]

Signal the service to stop.

start_background()[source]

Start the service in a separate daemon thread (for sync apps).

2.4. Configuration

The service respects the following environment variables:

  • BRIDGE_URL: Default URL for the Radical Bridge connection.

  • RADICAL_DEBUG: Enables debug logging if set.

2.5. Notes

  • No Local Ports: The embedded service uses in-memory transport. It does not open a local HTTP port (like 8001).

  • Plugins: Plugins are instantiated with the internal FastAPI app. If passing custom plugin instances, ensure they are compatible.

2.6. Developing External Plugins

You can define custom plugins in your own modules and register them with the Edge Service. Inheriting from radical.edge.ClientManagedPlugin (or Plugin) and defining plugin_name automatically registers your plugin class.

2.6.1. Example: Weather Plugin

1. Define the Plugin

# file: my_project/plugins/weather.py

import radical.edge as re
from starlette.requests import Request
from starlette.responses import JSONResponse

class WeatherPlugin(re.ClientManagedPlugin):
    """A plugin that provides weather data."""

    # Unique name for registry discovery
    plugin_name = "my_org.weather"

    def __init__(self, app, instance_name="weather"):
        # instance_name determines the URL namespace (e.g. /weather)
        super().__init__(app, instance_name)

        self.add_route_get("forecast", self.get_forecast)
        self.add_route_get("current",  self.get_current)

    async def get_forecast(self, request: Request) -> JSONResponse:
        return JSONResponse({"forecast": "sunny", "temp": 72})

    async def get_current(self, request: Request) -> JSONResponse:
        return JSONResponse({"temp": 68, "humidity": 45})

2. Use the Plugin

Simply importing the plugin module registers it. You can then pass it to EdgeService.

# file: app.py

from radical.edge import EdgeService
# Import triggers automatic registration
from my_project.plugins.weather import WeatherPlugin

service = EdgeService(
    bridge_url="ws://localhost:8000/register",
    plugins=[WeatherPlugin]  # Loads the plugin immediately
)

service.start_background()

2.7. Using PSIJ Plugin

The PluginPSIJ provides an interface to submit and manage jobs via the psij-python library. This allows you to interact with various HPC schedulers (Slurm, PBS, LSF, etc.) using a unified API.

2.7.1. Prerequisites

Ensure psij-python is installed in your environment:

pip install psij-python

2.7.2. Usage

To use the PSIJ plugin, simply include it when initializing the EdgeService.

from radical.edge import EdgeService, PluginPSIJ

service = EdgeService(
    bridge_url="wss://radical-pilot.org/bridge/register",
    plugins=[PluginPSIJ]
)
service.start_background()

2.7.3. API Endpoints

The plugin exposes the following endpoints under the /psij namespace (default):

  • POST /psij/{uid}/register_session Registers a new session and returns a session ID (sid).

  • POST /psij/{uid}/submit/{sid} Submits a job. Requires a JSON body with job_spec and optional executor.

    {
        "job_spec": {
            "executable": "/bin/echo",
            "arguments": ["Hello World"],
            "directory": "/tmp",
            "environment": {"MY_VAR": "value"},
            "attributes": {
                "queue_name": "debug",
                "account": "my_project",
                "duration": "600"
            }
        },
        "executor": "slurm"
    }
    
  • GET /psij/{uid}/status/{sid}/{job_id} Retrieves the status of a specific job.

  • POST /psij/{uid}/cancel/{sid}/{job_id} Cancels a specific job.

  • POST /psij/{uid}/unregister_session/{sid} Unregisters a session and cleans up resources.

2.7.4. Registering a Session

Before submitting jobs, you must register a session to get a sid (Session ID):

import requests

BRIDGE_URL = "https://localhost:8000"
EDGE_NAME = "my-edge"

# Register session
resp = requests.post(f"{BRIDGE_URL}/{EDGE_NAME}/psij/register_session")
sid = resp.json()['sid']

# Submit a job
job_spec = {
    "executable": "/bin/sleep",
    "arguments": ["10"],
    "attributes": {"queue_name": "debug"}
}
resp = requests.post(
    f"{BRIDGE_URL}/{EDGE_NAME}/psij/submit/{sid}",
    json={"job_spec": job_spec, "executor": "slurm"}
)
job_id = resp.json()['job_id']

# Check status
resp = requests.get(f"{BRIDGE_URL}/{EDGE_NAME}/psij/status/{sid}/{job_id}")
print(resp.json())