transports.base
Base protocol adapter interface.
AdapterEvent Objects
@dataclass
class AdapterEvent()
Event emitted by protocol adapters.
Attributes:
type- Type of the event.data- Event payload data.metadata- Optional metadata about the event.
BaseTransport Objects
class BaseTransport(ABC)
Abstract base class for protocol adapters.
Protocol adapters translate between agent-runtimes' internal BaseAgent interface and external protocols like ACP, AG-UI, A2A, etc.
Example:
class MyProtocolAdapter(BaseAdapter): async def handle_request(self, request):
Translate protocol request to agent run
response = await self.agent.run(...)
Translate response back to protocol format
return protocol_response
__init__
def __init__(agent: BaseAgent)
Initialize the adapter.
Arguments:
agent- The agent to adapt.
protocol_name
@property
def protocol_name() -> str
Get the protocol name (e.g., 'acp', 'ag-ui', 'a2a').
handle_request
@abstractmethod
async def handle_request(request: dict[str, Any]) -> dict[str, Any]
Handle a protocol request.
Arguments:
request- Protocol-specific request data.
Returns:
Protocol-specific response data.
handle_stream
@abstractmethod
async def handle_stream(
request: dict[str, Any]) -> AsyncIterator[dict[str, Any]]
Handle a streaming protocol request.
Arguments:
request- Protocol-specific request data.
Yields:
Protocol-specific stream events.
initialize
async def initialize() -> None
Initialize the adapter.
Called when the adapter is first set up.
cleanup
async def cleanup() -> None
Clean up adapter resources.
Called when the adapter is being shut down.