streams.loop
WebSocket pub/sub infrastructure and monitoring snapshot builder.
This module provides the generic streaming loop used by the agent-runtimes WebSocket endpoints. It was extracted from routes/tool_approvals.py so that the common logic (subscriber management, snapshot assembly, stream loop) is decoupled from the tool-approval CRUD routes.
subscribe_stream
def subscribe_stream(
agent_id: str | None) -> asyncio.Queue[AgentStreamMessage]
Register a subscriber queue for agent_id (or global).
unsubscribe_stream
def unsubscribe_stream(agent_id: str | None,
queue: asyncio.Queue[AgentStreamMessage]) -> None
Remove a previously registered subscriber queue.
enqueue_stream_message
def enqueue_stream_message(agent_id: str | None,
message: AgentStreamMessage) -> None
Push message to every queue subscribed to agent_id.
build_context_snapshot
def build_context_snapshot(agent_id: str) -> dict[str, Any] | None
Build the lightweight context snapshot dict for agent_id.
build_full_context
def build_full_context(agent_id: str) -> dict[str, Any] | None
Build the full context snapshot (tools, messages, model config).
build_mcp_status
def build_mcp_status() -> dict[str, Any] | None
Build MCP toolsets status dict.
build_codemode_status
def build_codemode_status() -> dict[str, Any] | None
Build codemode status dict (sync — safe to call from async context).
build_monitoring_snapshot_payload
async def build_monitoring_snapshot_payload(
agent_id: str | None,
*,
list_approvals: Any | None = None) -> AgentMonitoringSnapshotPayload
Assemble a full monitoring snapshot for agent_id.
Parameters
list_approvals : callable, optional
An async callable (agent_id, status) -> list[Record] that returns
the current pending tool approvals. When None the approval fields
are left empty (useful when the caller does not have access to the
approval store).
publish_stream_event
async def publish_stream_event(*,
event_type: str,
payload: dict[str, Any],
agent_id: str | None,
list_approvals: Any | None = None) -> None
Publish a stream event and follow it with a fresh snapshot.
stream_loop
async def stream_loop(
websocket: WebSocket,
agent_id: str | None,
*,
list_approvals: Any | None = None,
decide_approval: Callable[[str, bool, str | None], Awaitable[Any]]
| None = None
) -> None
Run the main WebSocket stream loop.
Accepts the connection, streams an initial snapshot, then pushes periodic diffs and reactive messages until the client disconnects.