2. Embedding the Edge Service¶
The Radical Edge Service can be embedded directly into Python applications, supporting both
asyncio and synchronous execution models. This allows you to run the Edge Service as a
component within your larger application without managing a separate process or local network ports.
2.1. Architecture¶
The EdgeService class provides a self-contained service that:
Connects to the Radical Bridge via WebSocket.
Hosts plugins in an internal, in-memory FastAPI application.
Routes requests from the Bridge directly to plugins without opening local TCP ports.
2.2. Usage¶
2.2.1. Synchronous Application (Threaded)¶
For standard Python applications (scripts, Flask apps, etc.), use start_background()
to run the service in a daemon thread.
import time
from radical.edge import EdgeService, PluginXGFabric
def main():
# Initialize service with desired plugins
service = EdgeService(
bridge_url="wss://radical-pilot.org/bridge/register",
plugins=[PluginXGFabric]
)
print("Starting Edge Service...")
# Runs the service loop in a separate daemon thread
service.start_background()
try:
# Your main application logic here
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping...")
service.stop()
if __name__ == "__main__":
main()
2.2.2. Asyncio Application¶
For asyncio applications (FastAPI, value-added services), await service.run()
in a task.
import asyncio
from radical.edge import EdgeService, PluginLucid
async def main():
# Initialize service
service = EdgeService(
bridge_url="wss://radical-pilot.org/bridge/register",
plugins=[PluginLucid]
)
# Run service concurrently
service_task = asyncio.create_task(service.run())
try:
# Your async application logic
await asyncio.sleep(3600)
finally:
service.stop()
await service_task
if __name__ == "__main__":
asyncio.run(main())
2.3. API Reference¶
- class radical.edge.service.EdgeService(bridge_url: str | None = None, name: str | None = None, plugins: list | None = None, tunnel: bool = False, tunnel_via: str | None = None)[source]¶
Bases:
PluginHostBaseEmbedded Radical Edge Service.
This class runs the Edge Service logic within an application, supporting both asyncio-based and synchronous applications. It manages the connection to the Bridge and hosts the local plugin execution environment.
The service automatically loads the ‘sysinfo’ plugin to provide system metrics.
- Attributes:
app (FastAPI): The internal FastAPI application hosting the plugins.
- property bridge_url¶
Get the current Bridge URL.
- async send_notification(plugin_name: str, topic: str, data: Dict[str, Any]) None[source]¶
Send an unsolicited notification to the bridge to broadcast to UI clients.
- Args:
plugin_name: Name of the plugin sending the notification. topic: Notification topic (e.g., “task_status”, “job_status”). data: Notification payload data.
2.4. Configuration¶
The service respects the following environment variables:
BRIDGE_URL: Default URL for the Radical Bridge connection.RADICAL_DEBUG: Enables debug logging if set.
2.5. Notes¶
No Local Ports: The embedded service uses in-memory transport. It does not open a local HTTP port (like 8001).
Plugins: Plugins are instantiated with the internal FastAPI app. If passing custom plugin instances, ensure they are compatible.
2.6. Developing External Plugins¶
You can define custom plugins in your own modules and register them with the Edge Service.
Inheriting from radical.edge.ClientManagedPlugin (or Plugin) and defining plugin_name
automatically registers your plugin class.
2.6.1. Example: Weather Plugin¶
1. Define the Plugin
# file: my_project/plugins/weather.py
import radical.edge as re
from starlette.requests import Request
from starlette.responses import JSONResponse
class WeatherPlugin(re.ClientManagedPlugin):
"""A plugin that provides weather data."""
# Unique name for registry discovery
plugin_name = "my_org.weather"
def __init__(self, app, instance_name="weather"):
# instance_name determines the URL namespace (e.g. /weather)
super().__init__(app, instance_name)
self.add_route_get("forecast", self.get_forecast)
self.add_route_get("current", self.get_current)
async def get_forecast(self, request: Request) -> JSONResponse:
return JSONResponse({"forecast": "sunny", "temp": 72})
async def get_current(self, request: Request) -> JSONResponse:
return JSONResponse({"temp": 68, "humidity": 45})
2. Use the Plugin
Simply importing the plugin module registers it. You can then pass it to EdgeService.
# file: app.py
from radical.edge import EdgeService
# Import triggers automatic registration
from my_project.plugins.weather import WeatherPlugin
service = EdgeService(
bridge_url="ws://localhost:8000/register",
plugins=[WeatherPlugin] # Loads the plugin immediately
)
service.start_background()
2.7. Using PSIJ Plugin¶
The PluginPSIJ provides an interface to submit and manage jobs via the psij-python library. This allows you to interact with various HPC schedulers (Slurm, PBS, LSF, etc.) using a unified API.
2.7.1. Prerequisites¶
Ensure psij-python is installed in your environment:
pip install psij-python
2.7.2. Usage¶
To use the PSIJ plugin, simply include it when initializing the EdgeService.
from radical.edge import EdgeService, PluginPSIJ
service = EdgeService(
bridge_url="wss://radical-pilot.org/bridge/register",
plugins=[PluginPSIJ]
)
service.start_background()
2.7.3. API Endpoints¶
The plugin exposes the following endpoints under the /psij namespace (default):
POST /psij/{uid}/register_session Registers a new session and returns a session ID (
sid).POST /psij/{uid}/submit/{sid} Submits a job. Requires a JSON body with
job_specand optionalexecutor.{ "job_spec": { "executable": "/bin/echo", "arguments": ["Hello World"], "directory": "/tmp", "environment": {"MY_VAR": "value"}, "attributes": { "queue_name": "debug", "account": "my_project", "duration": "600" } }, "executor": "slurm" }
GET /psij/{uid}/status/{sid}/{job_id} Retrieves the status of a specific job.
POST /psij/{uid}/cancel/{sid}/{job_id} Cancels a specific job.
POST /psij/{uid}/unregister_session/{sid} Unregisters a session and cleans up resources.
2.7.4. Registering a Session¶
Before submitting jobs, you must register a session to get a sid (Session ID):
import requests
BRIDGE_URL = "https://localhost:8000"
EDGE_NAME = "my-edge"
# Register session
resp = requests.post(f"{BRIDGE_URL}/{EDGE_NAME}/psij/register_session")
sid = resp.json()['sid']
# Submit a job
job_spec = {
"executable": "/bin/sleep",
"arguments": ["10"],
"attributes": {"queue_name": "debug"}
}
resp = requests.post(
f"{BRIDGE_URL}/{EDGE_NAME}/psij/submit/{sid}",
json={"job_spec": job_spec, "executor": "slurm"}
)
job_id = resp.json()['job_id']
# Check status
resp = requests.get(f"{BRIDGE_URL}/{EDGE_NAME}/psij/status/{sid}/{job_id}")
print(resp.json())