gmf_forge_ai_orchestration.protocols.a2a

A2A protocol client, receiver adapter, and wire models.

 1"""A2A protocol client, receiver adapter, and wire models."""
 2
 3from gmf_forge_ai_orchestration.protocols.a2a.a2a_adapter import (
 4    A2AAdapter,
 5    A2AAdapterError,
 6)
 7from gmf_forge_ai_orchestration.protocols.a2a.a2a_client import (
 8    A2AClient,
 9    A2AClientError,
10)
11from gmf_forge_ai_orchestration.protocols.a2a.models import (
12    AgentCard,
13    AgentSkill,
14    A2APart,
15    A2AMessage,
16    A2ATaskParams,
17    JsonRpcRequest,
18    A2ATaskStatus,
19    A2AArtifact,
20    A2ATask,
21    JsonRpcResponse,
22    JsonRpcError,
23    JsonRpcErrorResponse,
24    AgentStepModel,
25)
26
27__all__ = [
28    "A2AClient",
29    "A2AClientError",
30    "A2AAdapter",
31    "A2AAdapterError",
32    "AgentCard",
33    "AgentSkill",
34    "A2APart",
35    "A2AMessage",
36    "A2ATaskParams",
37    "JsonRpcRequest",
38    "A2ATaskStatus",
39    "A2AArtifact",
40    "A2ATask",
41    "JsonRpcResponse",
42    "JsonRpcError",
43    "JsonRpcErrorResponse",
44    "AgentStepModel",
45]
class A2AClient(gmf_forge_ai_orchestration.agents.base.BaseAgent):
 76class A2AClient(BaseAgent):
 77    """
 78    Proxies ``execute()`` calls to a remote A2A-compliant agent service.
 79
 80    Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 ``tasks/send`` over HTTP.
 81    Context (user_assertion, obo_token, locale, etc.) is forwarded as
 82    ``message.metadata`` in the JSON-RPC payload.
 83
 84    This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.)
 85    treat a separately-deployed A2A agent service as a first-class agent — no code
 86    changes needed in the orchestrator.
 87
 88    Args:
 89        endpoint_url: Base URL of the remote agent service
 90            (e.g. ``"http://search-agent:8080"``).
 91            ``/rpc`` is appended automatically.
 92        timeout: HTTP request timeout in seconds (default: 30).
 93        headers: Extra HTTP headers sent with every request (e.g. ``Authorization``).
 94        agent_id: Stable identifier used in logging/metrics. Defaults to ``"A2AClient"``.
 95        logger: Optional :class:`BasicLogger`.
 96        metrics: Optional :class:`BasicMetricsCollector`.
 97
 98    Example::
 99
100        agent = A2AClient(
101            endpoint_url="http://search-agent-service:8080",
102            agent_id="search_agent",
103        )
104        result = await agent.execute("Find recent AI papers")
105
106    Discovery::
107
108        card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
109        agent = A2AClient(
110            endpoint_url=card["url"],
111            agent_id=card["name"],
112        )
113    """
114
115    def __init__(
116        self,
117        endpoint_url: str,
118        timeout: float = 30.0,
119        headers: Optional[Dict[str, str]] = None,
120        behaviors: Optional[List["BaseBehavior"]] = None,
121        agent_id: Optional[str] = None,
122        logger: Optional[BasicLogger] = None,
123        metrics: Optional[BasicMetricsCollector] = None,
124    ) -> None:
125        super().__init__(
126            llm_gateway=None,
127            behaviors=behaviors,
128            agent_id=agent_id or "A2AClient",
129            logger=logger,
130            metrics=metrics,
131        )
132        self.endpoint_url = endpoint_url.rstrip("/")
133        self.timeout = timeout
134        self._headers = {"Content-Type": "application/json", **(headers or {})}
135
136    # ------------------------------------------------------------------
137    # BaseAgent implementation
138    # ------------------------------------------------------------------
139
140    async def execute(
141        self, task: str, context: Optional[Dict[str, Any]] = None
142    ) -> AgentResult:
143        bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {})
144        bctx = await self._apply_behaviors_before(bctx)
145
146        while True:
147            self._log_execution_start(task)
148            try:
149                result = await self._call_remote(task, context or {})
150            except Exception as exc:
151                fallback = await self._apply_behaviors_on_error(bctx, exc)
152                if fallback is RETRY_SENTINEL:
153                    bctx.attempt += 1
154                    continue
155                if fallback is not None:
156                    return fallback  # type: ignore[return-value]
157                raise
158
159            result = await self._apply_behaviors_after(bctx, result)
160            self._log_execution_end(task, result.success, len(result.steps))
161            return result
162
163    async def stream_execute(
164        self, task: str, context: Optional[Dict[str, Any]] = None
165    ) -> AsyncIterator[AgentStep]:
166        """Executes remotely then yields each step from the response."""
167        result = await self.execute(task, context)
168        for step in result.steps:
169            yield step
170
171    # ------------------------------------------------------------------
172    # A2A JSON-RPC transport
173    # ------------------------------------------------------------------
174
175    async def _call_remote(self, task: str, context: Dict[str, Any]) -> AgentResult:
176        """Send an A2A tasks/send JSON-RPC 2.0 request and return an AgentResult."""
177        url = f"{self.endpoint_url}/rpc"
178        request_id = str(uuid4())
179        task_id = str(uuid4())
180
181        payload = JsonRpcRequest(
182            id=request_id,
183            method="tasks/send",
184            params=A2ATaskParams(
185                id=task_id,
186                message=A2AMessage(
187                    parts=[A2APart(type="text", text=task)],
188                    # Context (user_assertion, obo_token, locale, etc.) is carried
189                    # in message.metadata — A2A agents read it from there.
190                    metadata=context,
191                ),
192            ),
193        ).model_dump()
194
195        self._logger.info(
196            "A2AClient tasks/send",
197            url=url,
198            agent_id=self.agent_id,
199            task_id=task_id,
200        )
201
202        async with httpx.AsyncClient(timeout=self.timeout) as client:
203            try:
204                response = await client.post(url, json=payload, headers=self._headers)
205            except httpx.ConnectError as exc:
206                raise A2AClientError(
207                    f"Cannot reach remote agent at {url}: {exc}"
208                ) from exc
209            except httpx.TimeoutException as exc:
210                raise A2AClientError(
211                    f"Remote agent timed out after {self.timeout}s: {exc}"
212                ) from exc
213
214        if response.status_code != 200:
215            raise A2AClientError(
216                f"Remote agent returned HTTP {response.status_code}: {response.text[:200]}"
217            )
218
219        return self._parse_response(response.text)
220
221    def _parse_response(self, body: str) -> AgentResult:
222        """Parse an A2A JSON-RPC tasks/send response into an AgentResult."""
223        try:
224            envelope = json.loads(body)
225        except json.JSONDecodeError as exc:
226            raise A2AClientError(f"Remote agent returned non-JSON body: {body[:200]}") from exc
227
228        if "error" in envelope:
229            err = JsonRpcErrorResponse.model_validate(envelope)
230            raise A2AClientError(
231                f"A2A JSON-RPC error {err.error.code}: {err.error.message}"
232            )
233
234        try:
235            response = JsonRpcResponse.model_validate(envelope)
236        except Exception as exc:
237            raise A2AClientError(f"Malformed A2A response: {exc}") from exc
238
239        task = response.result
240        success = task.status.state == "completed"
241        error_msg: Optional[str] = task.status.message if not success else None
242
243        # Extract text output from the first artifact's first text part
244        output = ""
245        for artifact in task.artifacts:
246            for part in artifact.parts:
247                if part.type == "text":
248                    output = part.text or ""
249                    break
250            if output:
251                break
252
253        # Steps — validate each raw step dict through AgentStepModel
254        steps: List[AgentStep] = []
255        for raw_step in task.metadata.get("steps", []):
256            s = AgentStepModel.model_validate(raw_step)
257            steps.append(
258                AgentStep(
259                    thought=s.thought,
260                    action=s.action,
261                    action_input=s.action_input,
262                    observation=s.observation,
263                    metadata=s.metadata,
264                )
265            )
266
267        # Propagate all task metadata except "steps" (already parsed above)
268        metadata = {k: v for k, v in task.metadata.items() if k != "steps"}
269        metadata["a2a_task_id"] = task.id
270
271        return AgentResult(
272            output=output,
273            steps=steps,
274            metadata=metadata,
275            success=success,
276            error=error_msg,
277        )
278
279    # ------------------------------------------------------------------
280    # A2A discovery helper
281    # ------------------------------------------------------------------
282
283    @staticmethod
284    async def fetch_agent_card(
285        base_url: str,
286        timeout: float = 10.0,
287        headers: Optional[Dict[str, str]] = None,
288    ) -> AgentCard:
289        """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``.
290
291        Returns an :class:`AgentCard` instance. Standard fields: ``name``,
292        ``description``, ``url``, ``version``, ``skills``.
293
294        Args:
295            base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``).
296            timeout: HTTP timeout in seconds.
297            headers: Optional extra request headers.
298
299        Raises:
300            A2AClientError: if the endpoint is unreachable or returns non-200.
301        """
302        url = f"{base_url.rstrip('/')}/.well-known/agent.json"
303        async with httpx.AsyncClient(timeout=timeout) as client:
304            try:
305                response = await client.get(url, headers=headers or {})
306            except httpx.ConnectError as exc:
307                raise A2AClientError(
308                    f"Cannot reach agent card at {url}: {exc}"
309                ) from exc
310            except httpx.TimeoutException as exc:
311                raise A2AClientError(
312                    f"Timed out fetching agent card from {url}: {exc}"
313                ) from exc
314
315        if response.status_code != 200:
316            raise A2AClientError(
317                f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}"
318            )
319
320        try:
321            return AgentCard.model_validate(response.json())
322        except json.JSONDecodeError as exc:
323            raise A2AClientError(
324                f"Agent card at {url} returned non-JSON: {response.text[:200]}"
325            ) from exc
326        except Exception as exc:
327            raise A2AClientError(
328                f"Agent card at {url} has invalid schema: {exc}"
329            ) from exc

Proxies execute() calls to a remote A2A-compliant agent service.

Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 tasks/send over HTTP. Context (user_assertion, obo_token, locale, etc.) is forwarded as message.metadata in the JSON-RPC payload.

This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) treat a separately-deployed A2A agent service as a first-class agent — no code changes needed in the orchestrator.

Args: endpoint_url: Base URL of the remote agent service (e.g. "http://search-agent:8080"). /rpc is appended automatically. timeout: HTTP request timeout in seconds (default: 30). headers: Extra HTTP headers sent with every request (e.g. Authorization). agent_id: Stable identifier used in logging/metrics. Defaults to "A2AClient". logger: Optional BasicLogger. metrics: Optional BasicMetricsCollector.

Example::

agent = A2AClient(
    endpoint_url="http://search-agent-service:8080",
    agent_id="search_agent",
)
result = await agent.execute("Find recent AI papers")

Discovery::

card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
agent = A2AClient(
    endpoint_url=card["url"],
    agent_id=card["name"],
)
A2AClient( endpoint_url: str, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None, behaviors: Optional[List[gmf_forge_ai_orchestration.BaseBehavior]] = None, agent_id: Optional[str] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None)
115    def __init__(
116        self,
117        endpoint_url: str,
118        timeout: float = 30.0,
119        headers: Optional[Dict[str, str]] = None,
120        behaviors: Optional[List["BaseBehavior"]] = None,
121        agent_id: Optional[str] = None,
122        logger: Optional[BasicLogger] = None,
123        metrics: Optional[BasicMetricsCollector] = None,
124    ) -> None:
125        super().__init__(
126            llm_gateway=None,
127            behaviors=behaviors,
128            agent_id=agent_id or "A2AClient",
129            logger=logger,
130            metrics=metrics,
131        )
132        self.endpoint_url = endpoint_url.rstrip("/")
133        self.timeout = timeout
134        self._headers = {"Content-Type": "application/json", **(headers or {})}
endpoint_url
timeout
async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> gmf_forge_ai_orchestration.AgentResult:
140    async def execute(
141        self, task: str, context: Optional[Dict[str, Any]] = None
142    ) -> AgentResult:
143        bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {})
144        bctx = await self._apply_behaviors_before(bctx)
145
146        while True:
147            self._log_execution_start(task)
148            try:
149                result = await self._call_remote(task, context or {})
150            except Exception as exc:
151                fallback = await self._apply_behaviors_on_error(bctx, exc)
152                if fallback is RETRY_SENTINEL:
153                    bctx.attempt += 1
154                    continue
155                if fallback is not None:
156                    return fallback  # type: ignore[return-value]
157                raise
158
159            result = await self._apply_behaviors_after(bctx, result)
160            self._log_execution_end(task, result.success, len(result.steps))
161            return result

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[gmf_forge_ai_orchestration.AgentStep]:
163    async def stream_execute(
164        self, task: str, context: Optional[Dict[str, Any]] = None
165    ) -> AsyncIterator[AgentStep]:
166        """Executes remotely then yields each step from the response."""
167        result = await self.execute(task, context)
168        for step in result.steps:
169            yield step

Executes remotely then yields each step from the response.

@staticmethod
async def fetch_agent_card( base_url: str, timeout: float = 10.0, headers: Optional[Dict[str, str]] = None) -> AgentCard:
283    @staticmethod
284    async def fetch_agent_card(
285        base_url: str,
286        timeout: float = 10.0,
287        headers: Optional[Dict[str, str]] = None,
288    ) -> AgentCard:
289        """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``.
290
291        Returns an :class:`AgentCard` instance. Standard fields: ``name``,
292        ``description``, ``url``, ``version``, ``skills``.
293
294        Args:
295            base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``).
296            timeout: HTTP timeout in seconds.
297            headers: Optional extra request headers.
298
299        Raises:
300            A2AClientError: if the endpoint is unreachable or returns non-200.
301        """
302        url = f"{base_url.rstrip('/')}/.well-known/agent.json"
303        async with httpx.AsyncClient(timeout=timeout) as client:
304            try:
305                response = await client.get(url, headers=headers or {})
306            except httpx.ConnectError as exc:
307                raise A2AClientError(
308                    f"Cannot reach agent card at {url}: {exc}"
309                ) from exc
310            except httpx.TimeoutException as exc:
311                raise A2AClientError(
312                    f"Timed out fetching agent card from {url}: {exc}"
313                ) from exc
314
315        if response.status_code != 200:
316            raise A2AClientError(
317                f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}"
318            )
319
320        try:
321            return AgentCard.model_validate(response.json())
322        except json.JSONDecodeError as exc:
323            raise A2AClientError(
324                f"Agent card at {url} returned non-JSON: {response.text[:200]}"
325            ) from exc
326        except Exception as exc:
327            raise A2AClientError(
328                f"Agent card at {url} has invalid schema: {exc}"
329            ) from exc

Fetch the A2A Agent Card from GET /.well-known/agent.json.

Returns an AgentCard instance. Standard fields: name, description, url, version, skills.

Args: base_url: Base URL of the remote agent (e.g. "http://agent:8080"). timeout: HTTP timeout in seconds. headers: Optional extra request headers.

Raises: A2AClientError: if the endpoint is unreachable or returns non-200.

class A2AClientError(builtins.RuntimeError):
72class A2AClientError(RuntimeError):
73    """Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol."""

Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol.

class A2AAdapter:
 43class A2AAdapter:
 44    """Protocol-only A2A receiver adapter.
 45
 46    The adapter does not know about tools, checkpoints, routing, or any other
 47    orchestration concerns. It only translates between the A2A wire protocol and
 48    the host agent's execution callback.
 49    """
 50
 51    def __init__(
 52        self,
 53        agent_id: str,
 54        description: str,
 55        url: str,
 56        version: str = "0.1.0",
 57        skills: Optional[Sequence[Mapping[str, Any]]] = None,
 58        logger: Optional[BasicLogger] = None,
 59    ) -> None:
 60        self.agent_id = agent_id
 61        self.description = description
 62        self.url = url.rstrip("/")
 63        self.version = version
 64        self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])]
 65        self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
 66
 67    def agent_card(self) -> AgentCard:
 68        """Return the A2A Agent Card for ``GET /.well-known/agent.json``."""
 69        return AgentCard(
 70            name=self.agent_id,
 71            description=self.description,
 72            url=self.url,
 73            version=self.version,
 74            skills=[AgentSkill(**s) for s in self._skills],
 75        )
 76
 77    def build_jsonrpc_error(
 78        self,
 79        rpc_id: Optional[str],
 80        code: int,
 81        message: str,
 82        data: Optional[Any] = None,
 83    ) -> Dict[str, Any]:
 84        """Build a JSON-RPC 2.0 error response."""
 85        return JsonRpcErrorResponse(
 86            id=rpc_id,
 87            error=JsonRpcError(code=code, message=message, data=data),
 88        ).model_dump()
 89
 90    def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]:
 91        """Parse a ``tasks/send`` JSON-RPC request body.
 92
 93        Returns:
 94            A tuple of ``(rpc_id, task_id, task_text, context)``.
 95        """
 96        method = str(body.get("method", ""))
 97        if method != "tasks/send":
 98            raise A2AAdapterError(f"Method not found: {method}")
 99
100        params = body.get("params", {})
101        if not isinstance(params, Mapping):
102            raise A2AAdapterError("Invalid params payload")
103
104        rpc_id = body.get("id")
105        task_id = str(params.get("id") or uuid4())
106
107        message = params.get("message", {})
108        if not isinstance(message, Mapping):
109            raise A2AAdapterError("Invalid message payload")
110
111        task_text = self._extract_task_text(message.get("parts", []))
112        context = message.get("metadata", {})
113        if not isinstance(context, Mapping):
114            raise A2AAdapterError("Invalid message metadata payload")
115
116        return rpc_id, task_id, task_text, dict(context)
117
118    def build_task_response(
119        self,
120        rpc_id: Optional[str],
121        task_id: str,
122        result: AgentResult,
123    ) -> Dict[str, Any]:
124        """Convert an :class:`AgentResult` into an A2A JSON-RPC success response."""
125        metadata = dict(result.metadata)
126        metadata["steps"] = [self._serialize_step(step) for step in result.steps]
127        metadata.setdefault("a2a_task_id", task_id)
128
129        artifacts = (
130            [A2AArtifact(parts=[A2APart(type="text", text=result.output)])]
131            if result.output
132            else []
133        )
134
135        return JsonRpcResponse(
136            id=rpc_id,
137            result=A2ATask(
138                id=task_id,
139                status=A2ATaskStatus(
140                    state="completed" if result.success else "failed",
141                    message=result.error if not result.success else None,
142                ),
143                artifacts=artifacts,
144                metadata=metadata,
145            ),
146        ).model_dump()
147
148    async def handle_rpc(
149        self,
150        body: Mapping[str, Any],
151        execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]],
152    ) -> Dict[str, Any]:
153        """Handle an incoming A2A JSON-RPC body and return the response payload."""
154        rpc_id = body.get("id") if isinstance(body, Mapping) else None
155
156        try:
157            rpc_id, task_id, task_text, context = self.parse_tasks_send(body)
158        except A2AAdapterError as exc:
159            code = -32601 if str(exc).startswith("Method not found") else -32602
160            return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc))
161
162        self._logger.info(
163            "A2A tasks/send received",
164            agent_id=self.agent_id,
165            task_id=task_id,
166            task_preview=task_text[:80],
167        )
168
169        try:
170            result = await execute(task_text, context)
171        except Exception as exc:
172            self._logger.error(
173                "Agent execution failed",
174                agent_id=self.agent_id,
175                task_id=task_id,
176                error=str(exc),
177            )
178            return self.build_jsonrpc_error(
179                rpc_id if isinstance(rpc_id, str) else None,
180                -32603,
181                str(exc),
182            )
183
184        return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)
185
186    def _extract_task_text(self, parts: Any) -> str:
187        if not isinstance(parts, Sequence):
188            raise A2AAdapterError("Invalid message parts payload")
189
190        for part in parts:
191            if isinstance(part, Mapping) and part.get("type") == "text":
192                return str(part.get("text", ""))
193        return ""
194
195    def _serialize_step(self, step: AgentStep) -> Dict[str, Any]:
196        return AgentStepModel(
197            thought=step.thought,
198            action=step.action,
199            action_input=step.action_input,
200            observation=step.observation,
201            metadata=step.metadata,
202        ).model_dump()

Protocol-only A2A receiver adapter.

The adapter does not know about tools, checkpoints, routing, or any other orchestration concerns. It only translates between the A2A wire protocol and the host agent's execution callback.

A2AAdapter( agent_id: str, description: str, url: str, version: str = '0.1.0', skills: Optional[Sequence[Mapping[str, Any]]] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None)
51    def __init__(
52        self,
53        agent_id: str,
54        description: str,
55        url: str,
56        version: str = "0.1.0",
57        skills: Optional[Sequence[Mapping[str, Any]]] = None,
58        logger: Optional[BasicLogger] = None,
59    ) -> None:
60        self.agent_id = agent_id
61        self.description = description
62        self.url = url.rstrip("/")
63        self.version = version
64        self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])]
65        self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
agent_id
description
url
version
def agent_card(self) -> AgentCard:
67    def agent_card(self) -> AgentCard:
68        """Return the A2A Agent Card for ``GET /.well-known/agent.json``."""
69        return AgentCard(
70            name=self.agent_id,
71            description=self.description,
72            url=self.url,
73            version=self.version,
74            skills=[AgentSkill(**s) for s in self._skills],
75        )

Return the A2A Agent Card for GET /.well-known/agent.json.

def build_jsonrpc_error( self, rpc_id: Optional[str], code: int, message: str, data: Optional[Any] = None) -> Dict[str, Any]:
77    def build_jsonrpc_error(
78        self,
79        rpc_id: Optional[str],
80        code: int,
81        message: str,
82        data: Optional[Any] = None,
83    ) -> Dict[str, Any]:
84        """Build a JSON-RPC 2.0 error response."""
85        return JsonRpcErrorResponse(
86            id=rpc_id,
87            error=JsonRpcError(code=code, message=message, data=data),
88        ).model_dump()

Build a JSON-RPC 2.0 error response.

def parse_tasks_send( self, body: Mapping[str, typing.Any]) -> tuple[typing.Optional[str], str, str, typing.Dict[str, typing.Any]]:
 90    def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]:
 91        """Parse a ``tasks/send`` JSON-RPC request body.
 92
 93        Returns:
 94            A tuple of ``(rpc_id, task_id, task_text, context)``.
 95        """
 96        method = str(body.get("method", ""))
 97        if method != "tasks/send":
 98            raise A2AAdapterError(f"Method not found: {method}")
 99
100        params = body.get("params", {})
101        if not isinstance(params, Mapping):
102            raise A2AAdapterError("Invalid params payload")
103
104        rpc_id = body.get("id")
105        task_id = str(params.get("id") or uuid4())
106
107        message = params.get("message", {})
108        if not isinstance(message, Mapping):
109            raise A2AAdapterError("Invalid message payload")
110
111        task_text = self._extract_task_text(message.get("parts", []))
112        context = message.get("metadata", {})
113        if not isinstance(context, Mapping):
114            raise A2AAdapterError("Invalid message metadata payload")
115
116        return rpc_id, task_id, task_text, dict(context)

Parse a tasks/send JSON-RPC request body.

Returns: A tuple of (rpc_id, task_id, task_text, context).

def build_task_response( self, rpc_id: Optional[str], task_id: str, result: gmf_forge_ai_orchestration.AgentResult) -> Dict[str, Any]:
118    def build_task_response(
119        self,
120        rpc_id: Optional[str],
121        task_id: str,
122        result: AgentResult,
123    ) -> Dict[str, Any]:
124        """Convert an :class:`AgentResult` into an A2A JSON-RPC success response."""
125        metadata = dict(result.metadata)
126        metadata["steps"] = [self._serialize_step(step) for step in result.steps]
127        metadata.setdefault("a2a_task_id", task_id)
128
129        artifacts = (
130            [A2AArtifact(parts=[A2APart(type="text", text=result.output)])]
131            if result.output
132            else []
133        )
134
135        return JsonRpcResponse(
136            id=rpc_id,
137            result=A2ATask(
138                id=task_id,
139                status=A2ATaskStatus(
140                    state="completed" if result.success else "failed",
141                    message=result.error if not result.success else None,
142                ),
143                artifacts=artifacts,
144                metadata=metadata,
145            ),
146        ).model_dump()

Convert an AgentResult into an A2A JSON-RPC success response.

async def handle_rpc( self, body: Mapping[str, typing.Any], execute: Callable[[str, typing.Dict[str, typing.Any]], Awaitable[gmf_forge_ai_orchestration.AgentResult]]) -> Dict[str, Any]:
148    async def handle_rpc(
149        self,
150        body: Mapping[str, Any],
151        execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]],
152    ) -> Dict[str, Any]:
153        """Handle an incoming A2A JSON-RPC body and return the response payload."""
154        rpc_id = body.get("id") if isinstance(body, Mapping) else None
155
156        try:
157            rpc_id, task_id, task_text, context = self.parse_tasks_send(body)
158        except A2AAdapterError as exc:
159            code = -32601 if str(exc).startswith("Method not found") else -32602
160            return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc))
161
162        self._logger.info(
163            "A2A tasks/send received",
164            agent_id=self.agent_id,
165            task_id=task_id,
166            task_preview=task_text[:80],
167        )
168
169        try:
170            result = await execute(task_text, context)
171        except Exception as exc:
172            self._logger.error(
173                "Agent execution failed",
174                agent_id=self.agent_id,
175                task_id=task_id,
176                error=str(exc),
177            )
178            return self.build_jsonrpc_error(
179                rpc_id if isinstance(rpc_id, str) else None,
180                -32603,
181                str(exc),
182            )
183
184        return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)

Handle an incoming A2A JSON-RPC body and return the response payload.

class A2AAdapterError(builtins.RuntimeError):
39class A2AAdapterError(RuntimeError):
40    """Raised when an incoming A2A request is malformed or unsupported."""

Raised when an incoming A2A request is malformed or unsupported.

class AgentCard(pydantic.main.BaseModel):
26class AgentCard(BaseModel):
27    """A2A Agent Card — describes this agent's capabilities and endpoint.
28
29    Returned by ``GET /.well-known/agent.json``.  The supervisor fetches this
30    at startup to discover agents without any hardcoded knowledge.
31    """
32
33    name: str = Field(..., description="Stable agent identifier.")
34    description: str = Field(..., description="High-level capability summary.")
35    url: str = Field(..., description="Base URL of this agent.")
36    version: str = Field("0.1.0", description="Agent version.")
37    skills: List[AgentSkill] = Field(default_factory=list)

A2A Agent Card — describes this agent's capabilities and endpoint.

Returned by GET /.well-known/agent.json. The supervisor fetches this at startup to discover agents without any hardcoded knowledge.

name: str = PydanticUndefined

Stable agent identifier.

description: str = PydanticUndefined

High-level capability summary.

url: str = PydanticUndefined

Base URL of this agent.

version: str = '0.1.0'

Agent version.

skills: List[AgentSkill] = PydanticUndefined
class AgentSkill(pydantic.main.BaseModel):
18class AgentSkill(BaseModel):
19    """One skill entry in the Agent Card."""
20
21    id: str = Field(..., description="Stable skill identifier.")
22    name: str = Field(..., description="Human-readable skill name.")
23    description: str = Field(..., description="What this skill does — shown to the LLM router.")

One skill entry in the Agent Card.

id: str = PydanticUndefined

Stable skill identifier.

name: str = PydanticUndefined

Human-readable skill name.

description: str = PydanticUndefined

What this skill does — shown to the LLM router.

class A2APart(pydantic.main.BaseModel):
42class A2APart(BaseModel):
43    """One content part within an A2A message or artifact."""
44
45    type: str = Field("text", description="Part type: 'text', 'file', or 'data'.")
46    text: Optional[str] = Field(None, description="Text content (when type='text').")
47    data: Optional[Dict[str, Any]] = Field(None, description="Structured data (when type='data').")

One content part within an A2A message or artifact.

type: str = 'text'

Part type: 'text', 'file', or 'data'.

text: Optional[str] = None

Text content (when type='text').

data: Optional[Dict[str, Any]] = None

Structured data (when type='data').

class A2AMessage(pydantic.main.BaseModel):
50class A2AMessage(BaseModel):
51    """An A2A message — carries the task text and context metadata."""
52
53    role: str = Field("user", description="Message originator: 'user' or 'agent'.")
54    parts: List[A2APart]
55    metadata: Dict[str, Any] = Field(
56        default_factory=dict,
57        description=(
58            "Arbitrary context propagated by the supervisor. "
59            "Standard keys: user_assertion, obo_token, locale, language, session_id."
60        ),
61    )

An A2A message — carries the task text and context metadata.

role: str = 'user'

Message originator: 'user' or 'agent'.

parts: List[A2APart] = PydanticUndefined
metadata: Dict[str, Any] = PydanticUndefined

Arbitrary context propagated by the supervisor. Standard keys: user_assertion, obo_token, locale, language, session_id.

class A2ATaskParams(pydantic.main.BaseModel):
66class A2ATaskParams(BaseModel):
67    """``params`` block for the ``tasks/send`` JSON-RPC method."""
68
69    id: str = Field(..., description="Client-assigned task ID (UUID).")
70    message: A2AMessage

params block for the tasks/send JSON-RPC method.

id: str = PydanticUndefined

Client-assigned task ID (UUID).

message: A2AMessage = PydanticUndefined
class JsonRpcRequest(pydantic.main.BaseModel):
73class JsonRpcRequest(BaseModel):
74    """JSON-RPC 2.0 request envelope."""
75
76    jsonrpc: str = "2.0"
77    id: str
78    method: str
79    params: A2ATaskParams

JSON-RPC 2.0 request envelope.

jsonrpc: str = '2.0'
id: str = PydanticUndefined
method: str = PydanticUndefined
params: A2ATaskParams = PydanticUndefined
class A2ATaskStatus(pydantic.main.BaseModel):
84class A2ATaskStatus(BaseModel):
85    """Status of an A2A task."""
86
87    state: str = Field(
88        ...,
89        description="One of: submitted, working, input-required, completed, failed.",
90    )
91    message: Optional[str] = Field(None, description="Human-readable status detail or error message.")

Status of an A2A task.

state: str = PydanticUndefined

One of: submitted, working, input-required, completed, failed.

message: Optional[str] = None

Human-readable status detail or error message.

class A2AArtifact(pydantic.main.BaseModel):
94class A2AArtifact(BaseModel):
95    """One result artifact from an A2A task."""
96
97    name: str = Field("result", description="Artifact name.")
98    parts: List[A2APart] = Field(default_factory=list)

One result artifact from an A2A task.

name: str = 'result'

Artifact name.

parts: List[A2APart] = PydanticUndefined
class A2ATask(pydantic.main.BaseModel):
101class A2ATask(BaseModel):
102    """A2A Task object — the result of a ``tasks/send`` call."""
103
104    id: str = Field(..., description="Task ID (echoed from request params.id).")
105    status: A2ATaskStatus
106    artifacts: List[A2AArtifact] = Field(default_factory=list)
107    metadata: Dict[str, Any] = Field(
108        default_factory=dict,
109        description="Internal metadata. 'steps' key carries ReAct thought/action/observation cycles.",
110    )

A2A Task object — the result of a tasks/send call.

id: str = PydanticUndefined

Task ID (echoed from request params.id).

status: A2ATaskStatus = PydanticUndefined
artifacts: List[A2AArtifact] = PydanticUndefined
metadata: Dict[str, Any] = PydanticUndefined

Internal metadata. 'steps' key carries ReAct thought/action/observation cycles.

class JsonRpcResponse(pydantic.main.BaseModel):
115class JsonRpcResponse(BaseModel):
116    """JSON-RPC 2.0 success response envelope."""
117
118    jsonrpc: str = "2.0"
119    id: str
120    result: A2ATask

JSON-RPC 2.0 success response envelope.

jsonrpc: str = '2.0'
id: str = PydanticUndefined
result: A2ATask = PydanticUndefined
class JsonRpcError(pydantic.main.BaseModel):
123class JsonRpcError(BaseModel):
124    """JSON-RPC 2.0 error object."""
125
126    code: int
127    message: str
128    data: Optional[Any] = None

JSON-RPC 2.0 error object.

code: int = PydanticUndefined
message: str = PydanticUndefined
data: Optional[Any] = None
class JsonRpcErrorResponse(pydantic.main.BaseModel):
131class JsonRpcErrorResponse(BaseModel):
132    """JSON-RPC 2.0 error response envelope."""
133
134    jsonrpc: str = "2.0"
135    id: Optional[str] = None
136    error: JsonRpcError

JSON-RPC 2.0 error response envelope.

jsonrpc: str = '2.0'
id: Optional[str] = None
error: JsonRpcError = PydanticUndefined
class AgentStepModel(pydantic.main.BaseModel):
141class AgentStepModel(BaseModel):
142    """Pydantic wire model for one thought/action/observation cycle.
143
144    Serialised into ``A2ATask.metadata["steps"]`` by :class:`A2AAdapter`,
145    deserialised back by :class:`A2AClient`. Structurally mirrors the internal
146    :class:`~gmf_forge_ai_orchestration.agents.base.AgentStep` dataclass.
147
148    Note:
149        ``action_input`` **must** be a ``dict``. Custom agents that store a plain
150        string here should wrap it as ``{"value": "..."}`` before serialisation.
151    """
152
153    thought: str = ""
154    action: str = ""
155    action_input: Dict[str, Any] = Field(default_factory=dict)
156    observation: str = ""
157    metadata: Dict[str, Any] = Field(default_factory=dict)

Pydantic wire model for one thought/action/observation cycle.

Serialised into A2ATask.metadata["steps"] by A2AAdapter, deserialised back by A2AClient. Structurally mirrors the internal ~gmf_forge_ai_orchestration.agents.base.AgentStep dataclass.

Note: action_input must be a dict. Custom agents that store a plain string here should wrap it as {"value": "..."} before serialisation.

thought: str = ''
action: str = ''
action_input: Dict[str, Any] = PydanticUndefined
observation: str = ''
metadata: Dict[str, Any] = PydanticUndefined