transports.clients.acp_client
ACP (Agent Communication Protocol) client.
Provides:
- WebSocket-based communication with ACP-compatible agent servers
- Session management
- Message streaming
- Permission handling
Uses the official ACP Python SDK from https://github.com/agentclientprotocol/python-sdk
Example:
async with ACPClient("ws://localhost:8000/api/v1/acp/ws/agent-1") as client: session = await client.create_session() async for event in client.run("Hello, agent!", stream=True): print(event)
ACPClientError Objects
class ACPClientError(Exception)
Error raised during ACP client operations.
ACPClient Objects
class ACPClient()
ACP (Agent Communication Protocol) client.
Connects to ACP-compatible agent servers via WebSocket.
Example:
async with ACPClient("ws://localhost:8000/api/v1/acp/ws/agent-1") as client: session = await client.create_session() async for event in client.run("Hello, agent!", stream=True): print(event)
__init__
def __init__(url: str,
headers: dict[str, str] | None = None,
timeout: float = 30.0)
Initialize the ACP client.
Arguments:
url- WebSocket URL of the ACP server.headers- Optional headers for the WebSocket connection.timeout- Connection timeout in seconds.
connect
async def connect() -> "ACPClient"
Connect to the ACP server.
Returns:
Self for chaining.
disconnect
async def disconnect() -> None
Disconnect from the ACP server.
__aenter__
async def __aenter__() -> "ACPClient"
Async context manager entry.
__aexit__
async def __aexit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None
Async context manager exit.
create_session
async def create_session(
cwd: str = ".",
mcp_servers: list[dict[str, Any]] | None = None) -> str
Create a new session with the agent using SDK types.
Arguments:
cwd- Current working directory for the session.mcp_servers- Optional MCP server configurations.
Returns:
Session ID.
run
async def run(
input_text: str,
stream: bool = True
) -> AsyncGenerator[SessionNotification | PromptResponse, None]
Run the agent on the given input using SDK types.
Arguments:
input_text- The input text to send to the agent.stream- Whether to stream responses.
Yields:
SessionNotification updates during streaming, then PromptResponse.
send_message
async def send_message(
content: str
) -> AsyncGenerator[SessionNotification | PromptResponse, None]
Send a message to the agent and stream responses.
Convenience method that wraps run().
Arguments:
content- The message content.
Yields:
SessionNotification updates and PromptResponse.
respond_to_permission
async def respond_to_permission(request_id: str | int,
option_id: str | None = None) -> None
Respond to a permission request from the agent using SDK types.
Arguments:
request_id- The JSON-RPC request ID to respond to.option_id- Optional permission option ID (if None, denies).
shutdown
async def shutdown() -> None
Gracefully shutdown the connection.
session_id
@property
def session_id() -> str | None
Get the current session ID.
agent_info
@property
def agent_info() -> Implementation | None
Get information about the connected agent (SDK type).
agent_capabilities
@property
def agent_capabilities() -> AgentCapabilities | None
Get agent capabilities (SDK type).
is_connected
@property
def is_connected() -> bool
Check if connected to the server.
connect_acp
async def connect_acp(url: str,
headers: dict[str, str] | None = None,
timeout: float = 30.0) -> ACPClient
Connect to an ACP-compatible agent server.
Convenience function for creating and connecting an ACP client.
Arguments:
url- WebSocket URL of the ACP server.headers- Optional headers for the WebSocket connection.timeout- Connection timeout in seconds.
Returns:
Connected ACP client.
Example:
client = await connect_acp("ws://localhost:8000/api/v1/acp/ws/my-agent") async for event in client.run("Hello!"): print(event) await client.disconnect()