__author__ = 'Radical Development Team'
__email__ = 'radical@radical-project.org'
__copyright__ = 'Copyright 2024, RADICAL@Rutgers'
__license__ = 'MIT'
import asyncio
import logging
log = logging.getLogger('radical.edge')
from fastapi import FastAPI, HTTPException
from starlette.requests import Request
from .plugin_session_base import PluginSession
from .plugin_base import Plugin
from .client import PluginClient
from .queue_info import make_queue_info, QueueInfo
# Re-exported for tests / external callers that patch this name on the
# plugin_queue_info module; the real class lives in queue_info_slurm.
from .queue_info_slurm import QueueInfoSlurm # noqa: F401
from .batch_system import detect_batch_system
# Re-exported for tests / external callers that imported this from the
# old location. The real implementation lives in batch_system_slurm.
from .batch_system_slurm import _parse_slurm_time # noqa: F401
[docs]
class QueueInfoSession(PluginSession):
"""
QueueInfo session with shared backend.
All sessions share a single backend instance for cache efficiency.
"""
def __init__(self, sid: str, backend: QueueInfo):
"""
Initialize a QueueInfoSession instance.
Args:
sid (str): The unique session ID.
backend (QueueInfo): Shared backend instance from the plugin.
"""
super().__init__(sid)
self._backend = backend
[docs]
async def close(self) -> dict:
"""
Close this session.
Note: Backend is shared and not cleaned up here.
Returns:
dict: An empty dictionary indicating successful closure.
"""
return await super().close()
[docs]
async def get_info(self, user=None, force=False):
"""
Return queue/partition info.
Args:
user (str): User to filter partitions for. When None (default),
defaults to the current user. Pass user='*' to return all
partitions (admin view).
force (bool): Bypass cache if True.
Returns:
dict: Queue information from the backend.
"""
self._check_active()
return await asyncio.to_thread(self._backend.get_info,
user=user, force=force)
[docs]
async def list_jobs(self, queue, user=None, force=False):
"""
List jobs in a queue.
Args:
queue (str): Partition name.
user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user='*' to return all
jobs (admin view).
force (bool): Bypass cache if True.
Returns:
dict: Job listing from the backend.
"""
self._check_active()
return await asyncio.to_thread(self._backend.list_jobs,
queue, user, force)
[docs]
async def list_all_jobs(self, user=None, force=False):
"""
List all jobs for the user across all partitions.
Args:
user (str): User to filter jobs for. When None (default),
defaults to the current user. Pass user='*' to return all
jobs (admin view).
force (bool): Bypass cache if True.
Returns:
dict: Job listing from the backend.
"""
self._check_active()
return await asyncio.to_thread(self._backend.list_all_jobs,
user, force)
[docs]
async def cancel_job(self, job_id: str) -> dict:
"""Cancel a job via the active batch system (scancel/qdel/...)."""
self._check_active()
batch = detect_batch_system()
try:
await asyncio.to_thread(batch.cancel, job_id)
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc))
return {'job_id': job_id, 'status': 'canceled'}
[docs]
async def list_allocations(self, user=None, force=False):
"""
List allocations/projects.
Args:
user (str): Optional user name to filter on.
force (bool): Bypass cache if True.
Returns:
dict: Allocation listing from the backend.
"""
self._check_active()
return await asyncio.to_thread(self._backend.list_allocations,
user, force)
[docs]
class QueueInfoClient(PluginClient):
"""
Client-side interface for the QueueInfo plugin.
"""
[docs]
def get_info(self, user: str = None, force: bool = False) -> dict:
"""
Return queue/partition information.
Args:
user (str): User to filter partitions for. When None (default),
uses the edge service user. Pass user='*' to return all
partitions (admin view).
force (bool): Bypass cache if True.
Returns:
dict: Queue information filtered by user access.
"""
self._require_session()
url = self._url(f"get_info/{self.sid}")
params = {"force": str(force).lower()}
if user:
params["user"] = user
resp = self._http.get(url, params=params)
self._raise(resp)
return resp.json()
[docs]
def list_jobs(self, queue: str, user: str = None, force: bool = False) -> dict:
"""
List jobs in a specified queue/partition.
Args:
queue (str): Partition name to list jobs for.
user (str): User to filter jobs for. When None (default),
uses the edge service user. Pass user='*' to return all
jobs (admin view).
force (bool): Bypass cache if True.
Returns:
dict: Job listing filtered by user.
"""
self._require_session()
url = self._url(f"list_jobs/{self.sid}/{queue}")
params = {"force": str(force).lower()}
if user:
params["user"] = user
resp = self._http.get(url, params=params)
self._raise(resp)
return resp.json()
[docs]
def list_all_jobs(self, user: str = None, force: bool = False) -> dict:
"""
List all jobs for the user across all partitions.
Args:
user (str): User to filter jobs for.
force (bool): Bypass cache if True.
Returns:
dict: Job listing.
"""
self._require_session()
url = self._url(f"list_all_jobs/{self.sid}")
params = {"force": str(force).lower()}
if user:
params["user"] = user
resp = self._http.get(url, params=params)
self._raise(resp)
return resp.json()
[docs]
def cancel_job(self, job_id: str) -> dict:
"""Cancel a job by ID."""
self._require_session()
resp = self._http.post(self._url(f"cancel/{self.sid}/{job_id}"))
self._raise(resp, f"cancel job {job_id!r}")
return resp.json()
[docs]
def list_allocations(self, user: str = None, force: bool = False) -> dict:
"""
List allocations/projects.
"""
self._require_session()
url = self._url(f"list_allocations/{self.sid}")
params = {"force": str(force).lower()}
if user:
params["user"] = user
resp = self._http.get(url, params=params)
self._raise(resp)
return resp.json()
[docs]
def job_allocation(self) -> 'dict | None':
"""Return edge job allocation info, or None if not inside a batch job.
No session is required. The information reflects the environment of
the **edge** process, not the client.
Returns:
None: Edge is running on a login node.
dict: Allocation summary with keys ``job_id``, ``partition``,
``n_nodes``, ``nodelist``, ``cpus_per_node``,
``gpus_per_node``, ``account``, ``job_name``, ``runtime``.
Raises:
RuntimeError: Edge is inside an allocation but the scheduler did
not provide enough info to summarise it.
"""
resp = self._http.get(self._url('job_allocation'))
self._raise(resp, 'job_allocation')
return resp.json().get('allocation')
[docs]
def backend(self) -> str:
"""Return the active batch backend name on the edge.
Returns:
str: ``'slurm'``, ``'pbs'``, or ``'none'``.
"""
resp = self._http.get(self._url('backend'))
self._raise(resp, 'backend')
return resp.json().get('backend', 'none')
[docs]
class PluginQueueInfo(Plugin):
"""
QueueInfo plugin for Radical Edge.
This plugin exposes batch system queue information, job listings, and
allocation data via REST endpoints. ``is_enabled()`` prevents loading on
edges where no recognised batch system is installed.
Backend selection is automatic: the plugin uses
:func:`make_queue_info` which dispatches to ``QueueInfoSlurm``,
``QueueInfoPBSPro``, or ``QueueInfoNone`` based on what's available.
"""
plugin_name = "queue_info"
session_class = QueueInfoSession
client_class = QueueInfoClient
version = '0.0.1'
ui_config = {
"icon": "📋",
"title": "Queue Info",
"description": "Inspect batch partitions, jobs and allocations.",
"refresh_button": True,
"monitors": [{
"id": "partitions",
"title": "Partitions / Queues",
"type": "table",
"css_class": "queueinfo-content",
"auto_load": "get_info/{sid}"
}]
}
def __init__(self, app: FastAPI, instance_name='queue_info',
backend_conf=None, slurm_conf=None):
"""
Initialize the QueueInfo plugin.
Args:
app (FastAPI): The FastAPI application instance.
instance_name (str): Plugin instance name (used in namespace).
Defaults to 'queue_info'. Override for multi-cluster setups.
backend_conf (str): Optional path to a scheduler config file
(e.g. slurm.conf). Forwarded to the backend; only the SLURM
backend uses it today.
slurm_conf (str): Deprecated alias for ``backend_conf``.
"""
super().__init__(app, instance_name)
# Back-compat: prefer backend_conf when both are given.
conf = backend_conf if backend_conf is not None else slurm_conf
# Create shared backend for all sessions
self._backend = make_queue_info(conf_path=conf)
# Start background prefetch to populate cache
self._backend.start_prefetch()
# Register QueueInfo-specific routes
self.add_route_get('job_allocation', self.job_allocation_endpoint)
self.add_route_get('backend', self.backend_endpoint)
self.add_route_get('get_info/{sid}', self.get_info)
self.add_route_get('list_jobs/{sid}/{queue}', self.list_jobs)
self.add_route_get('list_all_jobs/{sid}', self.list_all_jobs)
self.add_route_get('list_allocations/{sid}', self.list_allocations)
self.add_route_post('cancel/{sid}/{job_id}', self.cancel_job)
def _create_session(self, sid: str, **kwargs):
"""
Override to pass shared backend to each session.
Args:
sid (str): The session ID.
**kwargs: Additional keyword arguments (unused).
Returns:
QueueInfoSession: A new session instance using the shared backend.
"""
return self.session_class(sid, backend=self._backend)
[docs]
@classmethod
def is_enabled(cls, app: FastAPI) -> bool:
"""Load on edges with a recognised batch system (SLURM or PBSPro)."""
return detect_batch_system().name != 'none'
[docs]
def get_job_allocation(self) -> 'dict | None':
"""Return edge job allocation info, or None if not inside a job.
Delegates to the active :class:`BatchSystem`.
Returns:
None: Edge is running on a login node.
dict: Allocation details (see ``BatchSystem.job_allocation``).
Raises:
RuntimeError: Edge is inside an allocation but the scheduler did
not provide enough info to summarise it.
"""
alloc = detect_batch_system().job_allocation()
log.debug('[queue_info] get_job_allocation result: %s', alloc)
return alloc
[docs]
async def backend_endpoint(self, request: Request) -> dict:
"""Session-less endpoint: report which batch backend is active.
Response::
{"backend": "slurm" | "pbs" | "none"}
"""
return {'backend': self._backend.backend_name}
[docs]
async def job_allocation_endpoint(self, request: Request) -> dict:
"""Session-less endpoint: returns current edge job allocation info.
Response::
{"allocation": null} # login node
{"allocation": {"n_nodes": 4, "runtime": 3600}} # inside a job
{"allocation": {"n_nodes": 4, "runtime": null}} # unlimited walltime
"""
try:
alloc = await asyncio.to_thread(self.get_job_allocation)
return {'allocation': alloc}
except RuntimeError as exc:
raise HTTPException(status_code=500, detail=str(exc))
[docs]
async def get_info(self, request: Request) -> dict:
"""Return queue/partition information."""
data = request.path_params
sid = data['sid']
user = request.query_params.get('user')
force = request.query_params.get('force', '').lower() == 'true'
return await self._forward(sid, QueueInfoSession.get_info,
user=user, force=force)
[docs]
async def list_jobs(self, request: Request) -> dict:
"""List jobs in a specified queue/partition."""
data = request.path_params
sid = data['sid']
queue = data['queue']
user = request.query_params.get('user')
force = request.query_params.get('force', '').lower() == 'true'
return await self._forward(sid, QueueInfoSession.list_jobs,
queue, user=user, force=force)
[docs]
async def list_all_jobs(self, request: Request) -> dict:
"""List all jobs for the user across all partitions."""
data = request.path_params
sid = data['sid']
user = request.query_params.get('user')
force = request.query_params.get('force', '').lower() == 'true'
return await self._forward(sid, QueueInfoSession.list_all_jobs,
user=user, force=force)
[docs]
async def list_allocations(self, request: Request) -> dict:
"""List allocations/projects."""
data = request.path_params
sid = data['sid']
user = request.query_params.get('user')
force = request.query_params.get('force', '').lower() == 'true'
return await self._forward(sid, QueueInfoSession.list_allocations,
user=user, force=force)
[docs]
async def cancel_job(self, request: Request) -> dict:
"""Cancel a job by ID."""
sid = request.path_params['sid']
job_id = request.path_params['job_id']
return await self._forward(sid, QueueInfoSession.cancel_job, job_id=job_id)