gmf_forge_ai_orchestration.protocols
Protocol adapters and clients for interoperability surfaces.
76class A2AClient(BaseAgent): 77 """ 78 Proxies ``execute()`` calls to a remote A2A-compliant agent service. 79 80 Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 ``tasks/send`` over HTTP. 81 Context (user_assertion, obo_token, locale, etc.) is forwarded as 82 ``message.metadata`` in the JSON-RPC payload. 83 84 This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) 85 treat a separately-deployed A2A agent service as a first-class agent — no code 86 changes needed in the orchestrator. 87 88 Args: 89 endpoint_url: Base URL of the remote agent service 90 (e.g. ``"http://search-agent:8080"``). 91 ``/rpc`` is appended automatically. 92 timeout: HTTP request timeout in seconds (default: 30). 93 headers: Extra HTTP headers sent with every request (e.g. ``Authorization``). 94 agent_id: Stable identifier used in logging/metrics. Defaults to ``"A2AClient"``. 95 logger: Optional :class:`BasicLogger`. 96 metrics: Optional :class:`BasicMetricsCollector`. 97 98 Example:: 99 100 agent = A2AClient( 101 endpoint_url="http://search-agent-service:8080", 102 agent_id="search_agent", 103 ) 104 result = await agent.execute("Find recent AI papers") 105 106 Discovery:: 107 108 card = await A2AClient.fetch_agent_card("http://search-agent-service:8080") 109 agent = A2AClient( 110 endpoint_url=card["url"], 111 agent_id=card["name"], 112 ) 113 """ 114 115 def __init__( 116 self, 117 endpoint_url: str, 118 timeout: float = 30.0, 119 headers: Optional[Dict[str, str]] = None, 120 behaviors: Optional[List["BaseBehavior"]] = None, 121 agent_id: Optional[str] = None, 122 logger: Optional[BasicLogger] = None, 123 metrics: Optional[BasicMetricsCollector] = None, 124 ) -> None: 125 super().__init__( 126 llm_gateway=None, 127 behaviors=behaviors, 128 agent_id=agent_id or "A2AClient", 129 logger=logger, 130 metrics=metrics, 131 ) 132 self.endpoint_url = endpoint_url.rstrip("/") 133 self.timeout = timeout 134 self._headers = {"Content-Type": "application/json", **(headers or {})} 135 136 # ------------------------------------------------------------------ 137 # BaseAgent implementation 138 # ------------------------------------------------------------------ 139 140 async def execute( 141 self, task: str, context: Optional[Dict[str, Any]] = None 142 ) -> AgentResult: 143 bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {}) 144 bctx = await self._apply_behaviors_before(bctx) 145 146 while True: 147 self._log_execution_start(task) 148 try: 149 result = await self._call_remote(task, context or {}) 150 except Exception as exc: 151 fallback = await self._apply_behaviors_on_error(bctx, exc) 152 if fallback is RETRY_SENTINEL: 153 bctx.attempt += 1 154 continue 155 if fallback is not None: 156 return fallback # type: ignore[return-value] 157 raise 158 159 result = await self._apply_behaviors_after(bctx, result) 160 self._log_execution_end(task, result.success, len(result.steps)) 161 return result 162 163 async def stream_execute( 164 self, task: str, context: Optional[Dict[str, Any]] = None 165 ) -> AsyncIterator[AgentStep]: 166 """Executes remotely then yields each step from the response.""" 167 result = await self.execute(task, context) 168 for step in result.steps: 169 yield step 170 171 # ------------------------------------------------------------------ 172 # A2A JSON-RPC transport 173 # ------------------------------------------------------------------ 174 175 async def _call_remote(self, task: str, context: Dict[str, Any]) -> AgentResult: 176 """Send an A2A tasks/send JSON-RPC 2.0 request and return an AgentResult.""" 177 url = f"{self.endpoint_url}/rpc" 178 request_id = str(uuid4()) 179 task_id = str(uuid4()) 180 181 payload = JsonRpcRequest( 182 id=request_id, 183 method="tasks/send", 184 params=A2ATaskParams( 185 id=task_id, 186 message=A2AMessage( 187 parts=[A2APart(type="text", text=task)], 188 # Context (user_assertion, obo_token, locale, etc.) is carried 189 # in message.metadata — A2A agents read it from there. 190 metadata=context, 191 ), 192 ), 193 ).model_dump() 194 195 self._logger.info( 196 "A2AClient tasks/send", 197 url=url, 198 agent_id=self.agent_id, 199 task_id=task_id, 200 ) 201 202 async with httpx.AsyncClient(timeout=self.timeout) as client: 203 try: 204 response = await client.post(url, json=payload, headers=self._headers) 205 except httpx.ConnectError as exc: 206 raise A2AClientError( 207 f"Cannot reach remote agent at {url}: {exc}" 208 ) from exc 209 except httpx.TimeoutException as exc: 210 raise A2AClientError( 211 f"Remote agent timed out after {self.timeout}s: {exc}" 212 ) from exc 213 214 if response.status_code != 200: 215 raise A2AClientError( 216 f"Remote agent returned HTTP {response.status_code}: {response.text[:200]}" 217 ) 218 219 return self._parse_response(response.text) 220 221 def _parse_response(self, body: str) -> AgentResult: 222 """Parse an A2A JSON-RPC tasks/send response into an AgentResult.""" 223 try: 224 envelope = json.loads(body) 225 except json.JSONDecodeError as exc: 226 raise A2AClientError(f"Remote agent returned non-JSON body: {body[:200]}") from exc 227 228 if "error" in envelope: 229 err = JsonRpcErrorResponse.model_validate(envelope) 230 raise A2AClientError( 231 f"A2A JSON-RPC error {err.error.code}: {err.error.message}" 232 ) 233 234 try: 235 response = JsonRpcResponse.model_validate(envelope) 236 except Exception as exc: 237 raise A2AClientError(f"Malformed A2A response: {exc}") from exc 238 239 task = response.result 240 success = task.status.state == "completed" 241 error_msg: Optional[str] = task.status.message if not success else None 242 243 # Extract text output from the first artifact's first text part 244 output = "" 245 for artifact in task.artifacts: 246 for part in artifact.parts: 247 if part.type == "text": 248 output = part.text or "" 249 break 250 if output: 251 break 252 253 # Steps — validate each raw step dict through AgentStepModel 254 steps: List[AgentStep] = [] 255 for raw_step in task.metadata.get("steps", []): 256 s = AgentStepModel.model_validate(raw_step) 257 steps.append( 258 AgentStep( 259 thought=s.thought, 260 action=s.action, 261 action_input=s.action_input, 262 observation=s.observation, 263 metadata=s.metadata, 264 ) 265 ) 266 267 # Propagate all task metadata except "steps" (already parsed above) 268 metadata = {k: v for k, v in task.metadata.items() if k != "steps"} 269 metadata["a2a_task_id"] = task.id 270 271 return AgentResult( 272 output=output, 273 steps=steps, 274 metadata=metadata, 275 success=success, 276 error=error_msg, 277 ) 278 279 # ------------------------------------------------------------------ 280 # A2A discovery helper 281 # ------------------------------------------------------------------ 282 283 @staticmethod 284 async def fetch_agent_card( 285 base_url: str, 286 timeout: float = 10.0, 287 headers: Optional[Dict[str, str]] = None, 288 ) -> AgentCard: 289 """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``. 290 291 Returns an :class:`AgentCard` instance. Standard fields: ``name``, 292 ``description``, ``url``, ``version``, ``skills``. 293 294 Args: 295 base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``). 296 timeout: HTTP timeout in seconds. 297 headers: Optional extra request headers. 298 299 Raises: 300 A2AClientError: if the endpoint is unreachable or returns non-200. 301 """ 302 url = f"{base_url.rstrip('/')}/.well-known/agent.json" 303 async with httpx.AsyncClient(timeout=timeout) as client: 304 try: 305 response = await client.get(url, headers=headers or {}) 306 except httpx.ConnectError as exc: 307 raise A2AClientError( 308 f"Cannot reach agent card at {url}: {exc}" 309 ) from exc 310 except httpx.TimeoutException as exc: 311 raise A2AClientError( 312 f"Timed out fetching agent card from {url}: {exc}" 313 ) from exc 314 315 if response.status_code != 200: 316 raise A2AClientError( 317 f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}" 318 ) 319 320 try: 321 return AgentCard.model_validate(response.json()) 322 except json.JSONDecodeError as exc: 323 raise A2AClientError( 324 f"Agent card at {url} returned non-JSON: {response.text[:200]}" 325 ) from exc 326 except Exception as exc: 327 raise A2AClientError( 328 f"Agent card at {url} has invalid schema: {exc}" 329 ) from exc
Proxies execute() calls to a remote A2A-compliant agent service.
Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 tasks/send over HTTP.
Context (user_assertion, obo_token, locale, etc.) is forwarded as
message.metadata in the JSON-RPC payload.
This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) treat a separately-deployed A2A agent service as a first-class agent — no code changes needed in the orchestrator.
Args:
endpoint_url: Base URL of the remote agent service
(e.g. "http://search-agent:8080").
/rpc is appended automatically.
timeout: HTTP request timeout in seconds (default: 30).
headers: Extra HTTP headers sent with every request (e.g. Authorization).
agent_id: Stable identifier used in logging/metrics. Defaults to "A2AClient".
logger: Optional BasicLogger.
metrics: Optional BasicMetricsCollector.
Example::
agent = A2AClient(
endpoint_url="http://search-agent-service:8080",
agent_id="search_agent",
)
result = await agent.execute("Find recent AI papers")
Discovery::
card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
agent = A2AClient(
endpoint_url=card["url"],
agent_id=card["name"],
)
115 def __init__( 116 self, 117 endpoint_url: str, 118 timeout: float = 30.0, 119 headers: Optional[Dict[str, str]] = None, 120 behaviors: Optional[List["BaseBehavior"]] = None, 121 agent_id: Optional[str] = None, 122 logger: Optional[BasicLogger] = None, 123 metrics: Optional[BasicMetricsCollector] = None, 124 ) -> None: 125 super().__init__( 126 llm_gateway=None, 127 behaviors=behaviors, 128 agent_id=agent_id or "A2AClient", 129 logger=logger, 130 metrics=metrics, 131 ) 132 self.endpoint_url = endpoint_url.rstrip("/") 133 self.timeout = timeout 134 self._headers = {"Content-Type": "application/json", **(headers or {})}
140 async def execute( 141 self, task: str, context: Optional[Dict[str, Any]] = None 142 ) -> AgentResult: 143 bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {}) 144 bctx = await self._apply_behaviors_before(bctx) 145 146 while True: 147 self._log_execution_start(task) 148 try: 149 result = await self._call_remote(task, context or {}) 150 except Exception as exc: 151 fallback = await self._apply_behaviors_on_error(bctx, exc) 152 if fallback is RETRY_SENTINEL: 153 bctx.attempt += 1 154 continue 155 if fallback is not None: 156 return fallback # type: ignore[return-value] 157 raise 158 159 result = await self._apply_behaviors_after(bctx, result) 160 self._log_execution_end(task, result.success, len(result.steps)) 161 return result
Execute the task and return a result.
163 async def stream_execute( 164 self, task: str, context: Optional[Dict[str, Any]] = None 165 ) -> AsyncIterator[AgentStep]: 166 """Executes remotely then yields each step from the response.""" 167 result = await self.execute(task, context) 168 for step in result.steps: 169 yield step
Executes remotely then yields each step from the response.
283 @staticmethod 284 async def fetch_agent_card( 285 base_url: str, 286 timeout: float = 10.0, 287 headers: Optional[Dict[str, str]] = None, 288 ) -> AgentCard: 289 """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``. 290 291 Returns an :class:`AgentCard` instance. Standard fields: ``name``, 292 ``description``, ``url``, ``version``, ``skills``. 293 294 Args: 295 base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``). 296 timeout: HTTP timeout in seconds. 297 headers: Optional extra request headers. 298 299 Raises: 300 A2AClientError: if the endpoint is unreachable or returns non-200. 301 """ 302 url = f"{base_url.rstrip('/')}/.well-known/agent.json" 303 async with httpx.AsyncClient(timeout=timeout) as client: 304 try: 305 response = await client.get(url, headers=headers or {}) 306 except httpx.ConnectError as exc: 307 raise A2AClientError( 308 f"Cannot reach agent card at {url}: {exc}" 309 ) from exc 310 except httpx.TimeoutException as exc: 311 raise A2AClientError( 312 f"Timed out fetching agent card from {url}: {exc}" 313 ) from exc 314 315 if response.status_code != 200: 316 raise A2AClientError( 317 f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}" 318 ) 319 320 try: 321 return AgentCard.model_validate(response.json()) 322 except json.JSONDecodeError as exc: 323 raise A2AClientError( 324 f"Agent card at {url} returned non-JSON: {response.text[:200]}" 325 ) from exc 326 except Exception as exc: 327 raise A2AClientError( 328 f"Agent card at {url} has invalid schema: {exc}" 329 ) from exc
Fetch the A2A Agent Card from GET /.well-known/agent.json.
Returns an AgentCard instance. Standard fields: name,
description, url, version, skills.
Args:
base_url: Base URL of the remote agent (e.g. "http://agent:8080").
timeout: HTTP timeout in seconds.
headers: Optional extra request headers.
Raises: A2AClientError: if the endpoint is unreachable or returns non-200.
72class A2AClientError(RuntimeError): 73 """Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol."""
Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol.
43class A2AAdapter: 44 """Protocol-only A2A receiver adapter. 45 46 The adapter does not know about tools, checkpoints, routing, or any other 47 orchestration concerns. It only translates between the A2A wire protocol and 48 the host agent's execution callback. 49 """ 50 51 def __init__( 52 self, 53 agent_id: str, 54 description: str, 55 url: str, 56 version: str = "0.1.0", 57 skills: Optional[Sequence[Mapping[str, Any]]] = None, 58 logger: Optional[BasicLogger] = None, 59 ) -> None: 60 self.agent_id = agent_id 61 self.description = description 62 self.url = url.rstrip("/") 63 self.version = version 64 self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])] 65 self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}") 66 67 def agent_card(self) -> AgentCard: 68 """Return the A2A Agent Card for ``GET /.well-known/agent.json``.""" 69 return AgentCard( 70 name=self.agent_id, 71 description=self.description, 72 url=self.url, 73 version=self.version, 74 skills=[AgentSkill(**s) for s in self._skills], 75 ) 76 77 def build_jsonrpc_error( 78 self, 79 rpc_id: Optional[str], 80 code: int, 81 message: str, 82 data: Optional[Any] = None, 83 ) -> Dict[str, Any]: 84 """Build a JSON-RPC 2.0 error response.""" 85 return JsonRpcErrorResponse( 86 id=rpc_id, 87 error=JsonRpcError(code=code, message=message, data=data), 88 ).model_dump() 89 90 def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]: 91 """Parse a ``tasks/send`` JSON-RPC request body. 92 93 Returns: 94 A tuple of ``(rpc_id, task_id, task_text, context)``. 95 """ 96 method = str(body.get("method", "")) 97 if method != "tasks/send": 98 raise A2AAdapterError(f"Method not found: {method}") 99 100 params = body.get("params", {}) 101 if not isinstance(params, Mapping): 102 raise A2AAdapterError("Invalid params payload") 103 104 rpc_id = body.get("id") 105 task_id = str(params.get("id") or uuid4()) 106 107 message = params.get("message", {}) 108 if not isinstance(message, Mapping): 109 raise A2AAdapterError("Invalid message payload") 110 111 task_text = self._extract_task_text(message.get("parts", [])) 112 context = message.get("metadata", {}) 113 if not isinstance(context, Mapping): 114 raise A2AAdapterError("Invalid message metadata payload") 115 116 return rpc_id, task_id, task_text, dict(context) 117 118 def build_task_response( 119 self, 120 rpc_id: Optional[str], 121 task_id: str, 122 result: AgentResult, 123 ) -> Dict[str, Any]: 124 """Convert an :class:`AgentResult` into an A2A JSON-RPC success response.""" 125 metadata = dict(result.metadata) 126 metadata["steps"] = [self._serialize_step(step) for step in result.steps] 127 metadata.setdefault("a2a_task_id", task_id) 128 129 artifacts = ( 130 [A2AArtifact(parts=[A2APart(type="text", text=result.output)])] 131 if result.output 132 else [] 133 ) 134 135 return JsonRpcResponse( 136 id=rpc_id, 137 result=A2ATask( 138 id=task_id, 139 status=A2ATaskStatus( 140 state="completed" if result.success else "failed", 141 message=result.error if not result.success else None, 142 ), 143 artifacts=artifacts, 144 metadata=metadata, 145 ), 146 ).model_dump() 147 148 async def handle_rpc( 149 self, 150 body: Mapping[str, Any], 151 execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]], 152 ) -> Dict[str, Any]: 153 """Handle an incoming A2A JSON-RPC body and return the response payload.""" 154 rpc_id = body.get("id") if isinstance(body, Mapping) else None 155 156 try: 157 rpc_id, task_id, task_text, context = self.parse_tasks_send(body) 158 except A2AAdapterError as exc: 159 code = -32601 if str(exc).startswith("Method not found") else -32602 160 return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc)) 161 162 self._logger.info( 163 "A2A tasks/send received", 164 agent_id=self.agent_id, 165 task_id=task_id, 166 task_preview=task_text[:80], 167 ) 168 169 try: 170 result = await execute(task_text, context) 171 except Exception as exc: 172 self._logger.error( 173 "Agent execution failed", 174 agent_id=self.agent_id, 175 task_id=task_id, 176 error=str(exc), 177 ) 178 return self.build_jsonrpc_error( 179 rpc_id if isinstance(rpc_id, str) else None, 180 -32603, 181 str(exc), 182 ) 183 184 return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result) 185 186 def _extract_task_text(self, parts: Any) -> str: 187 if not isinstance(parts, Sequence): 188 raise A2AAdapterError("Invalid message parts payload") 189 190 for part in parts: 191 if isinstance(part, Mapping) and part.get("type") == "text": 192 return str(part.get("text", "")) 193 return "" 194 195 def _serialize_step(self, step: AgentStep) -> Dict[str, Any]: 196 return AgentStepModel( 197 thought=step.thought, 198 action=step.action, 199 action_input=step.action_input, 200 observation=step.observation, 201 metadata=step.metadata, 202 ).model_dump()
Protocol-only A2A receiver adapter.
The adapter does not know about tools, checkpoints, routing, or any other orchestration concerns. It only translates between the A2A wire protocol and the host agent's execution callback.
51 def __init__( 52 self, 53 agent_id: str, 54 description: str, 55 url: str, 56 version: str = "0.1.0", 57 skills: Optional[Sequence[Mapping[str, Any]]] = None, 58 logger: Optional[BasicLogger] = None, 59 ) -> None: 60 self.agent_id = agent_id 61 self.description = description 62 self.url = url.rstrip("/") 63 self.version = version 64 self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])] 65 self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
67 def agent_card(self) -> AgentCard: 68 """Return the A2A Agent Card for ``GET /.well-known/agent.json``.""" 69 return AgentCard( 70 name=self.agent_id, 71 description=self.description, 72 url=self.url, 73 version=self.version, 74 skills=[AgentSkill(**s) for s in self._skills], 75 )
Return the A2A Agent Card for GET /.well-known/agent.json.
77 def build_jsonrpc_error( 78 self, 79 rpc_id: Optional[str], 80 code: int, 81 message: str, 82 data: Optional[Any] = None, 83 ) -> Dict[str, Any]: 84 """Build a JSON-RPC 2.0 error response.""" 85 return JsonRpcErrorResponse( 86 id=rpc_id, 87 error=JsonRpcError(code=code, message=message, data=data), 88 ).model_dump()
Build a JSON-RPC 2.0 error response.
90 def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]: 91 """Parse a ``tasks/send`` JSON-RPC request body. 92 93 Returns: 94 A tuple of ``(rpc_id, task_id, task_text, context)``. 95 """ 96 method = str(body.get("method", "")) 97 if method != "tasks/send": 98 raise A2AAdapterError(f"Method not found: {method}") 99 100 params = body.get("params", {}) 101 if not isinstance(params, Mapping): 102 raise A2AAdapterError("Invalid params payload") 103 104 rpc_id = body.get("id") 105 task_id = str(params.get("id") or uuid4()) 106 107 message = params.get("message", {}) 108 if not isinstance(message, Mapping): 109 raise A2AAdapterError("Invalid message payload") 110 111 task_text = self._extract_task_text(message.get("parts", [])) 112 context = message.get("metadata", {}) 113 if not isinstance(context, Mapping): 114 raise A2AAdapterError("Invalid message metadata payload") 115 116 return rpc_id, task_id, task_text, dict(context)
Parse a tasks/send JSON-RPC request body.
Returns:
A tuple of (rpc_id, task_id, task_text, context).
118 def build_task_response( 119 self, 120 rpc_id: Optional[str], 121 task_id: str, 122 result: AgentResult, 123 ) -> Dict[str, Any]: 124 """Convert an :class:`AgentResult` into an A2A JSON-RPC success response.""" 125 metadata = dict(result.metadata) 126 metadata["steps"] = [self._serialize_step(step) for step in result.steps] 127 metadata.setdefault("a2a_task_id", task_id) 128 129 artifacts = ( 130 [A2AArtifact(parts=[A2APart(type="text", text=result.output)])] 131 if result.output 132 else [] 133 ) 134 135 return JsonRpcResponse( 136 id=rpc_id, 137 result=A2ATask( 138 id=task_id, 139 status=A2ATaskStatus( 140 state="completed" if result.success else "failed", 141 message=result.error if not result.success else None, 142 ), 143 artifacts=artifacts, 144 metadata=metadata, 145 ), 146 ).model_dump()
Convert an AgentResult into an A2A JSON-RPC success response.
148 async def handle_rpc( 149 self, 150 body: Mapping[str, Any], 151 execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]], 152 ) -> Dict[str, Any]: 153 """Handle an incoming A2A JSON-RPC body and return the response payload.""" 154 rpc_id = body.get("id") if isinstance(body, Mapping) else None 155 156 try: 157 rpc_id, task_id, task_text, context = self.parse_tasks_send(body) 158 except A2AAdapterError as exc: 159 code = -32601 if str(exc).startswith("Method not found") else -32602 160 return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc)) 161 162 self._logger.info( 163 "A2A tasks/send received", 164 agent_id=self.agent_id, 165 task_id=task_id, 166 task_preview=task_text[:80], 167 ) 168 169 try: 170 result = await execute(task_text, context) 171 except Exception as exc: 172 self._logger.error( 173 "Agent execution failed", 174 agent_id=self.agent_id, 175 task_id=task_id, 176 error=str(exc), 177 ) 178 return self.build_jsonrpc_error( 179 rpc_id if isinstance(rpc_id, str) else None, 180 -32603, 181 str(exc), 182 ) 183 184 return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)
Handle an incoming A2A JSON-RPC body and return the response payload.
39class A2AAdapterError(RuntimeError): 40 """Raised when an incoming A2A request is malformed or unsupported."""
Raised when an incoming A2A request is malformed or unsupported.