Skip to main content

streams.loop

WebSocket pub/sub infrastructure and monitoring snapshot builder.

This module provides the generic streaming loop used by the agent-runtimes WebSocket endpoints. It was extracted from routes/tool_approvals.py so that the common logic (subscriber management, snapshot assembly, stream loop) is decoupled from the tool-approval CRUD routes.

subscribe_stream

def subscribe_stream(
agent_id: str | None) -> asyncio.Queue[AgentStreamMessage]

Register a subscriber queue for agent_id (or global).

unsubscribe_stream

def unsubscribe_stream(agent_id: str | None,
queue: asyncio.Queue[AgentStreamMessage]) -> None

Remove a previously registered subscriber queue.

enqueue_stream_message

def enqueue_stream_message(agent_id: str | None,
message: AgentStreamMessage) -> None

Push message to every queue subscribed to agent_id.

build_context_snapshot

def build_context_snapshot(agent_id: str) -> dict[str, Any] | None

Build the lightweight context snapshot dict for agent_id.

build_full_context

def build_full_context(agent_id: str) -> dict[str, Any] | None

Build the full context snapshot (tools, messages, model config).

build_mcp_status

def build_mcp_status() -> dict[str, Any] | None

Build MCP toolsets status dict.

build_codemode_status

def build_codemode_status() -> dict[str, Any] | None

Build codemode status dict (sync — safe to call from async context).

build_monitoring_snapshot_payload

async def build_monitoring_snapshot_payload(
agent_id: str | None,
*,
list_approvals: Any | None = None) -> AgentMonitoringSnapshotPayload

Assemble a full monitoring snapshot for agent_id.

Parameters

list_approvals : callable, optional An async callable (agent_id, status) -> list[Record] that returns the current pending tool approvals. When None the approval fields are left empty (useful when the caller does not have access to the approval store).

publish_stream_event

async def publish_stream_event(*,
event_type: str,
payload: dict[str, Any],
agent_id: str | None,
list_approvals: Any | None = None) -> None

Publish a stream event and follow it with a fresh snapshot.

stream_loop

async def stream_loop(
websocket: WebSocket,
agent_id: str | None,
*,
list_approvals: Any | None = None,
decide_approval: Callable[[str, bool, str | None], Awaitable[Any]]
| None = None
) -> None

Run the main WebSocket stream loop.

Accepts the connection, streams an initial snapshot, then pushes periodic diffs and reactive messages until the client disconnects.