gmf_forge_ai_orchestration.protocols

Protocol adapters and clients for interoperability surfaces.

 1"""Protocol adapters and clients for interoperability surfaces."""
 2
 3from gmf_forge_ai_orchestration.protocols.a2a import (
 4    A2AAdapter,
 5    A2AAdapterError,
 6    A2AClient,
 7    A2AClientError,
 8)
 9
10__all__ = [
11    "A2AClient",
12    "A2AClientError",
13    "A2AAdapter",
14    "A2AAdapterError",
15]
class A2AClient(gmf_forge_ai_orchestration.agents.base.BaseAgent):
 76class A2AClient(BaseAgent):
 77    """
 78    Proxies ``execute()`` calls to a remote A2A-compliant agent service.
 79
 80    Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 ``tasks/send`` over HTTP.
 81    Context (user_assertion, obo_token, locale, etc.) is forwarded as
 82    ``message.metadata`` in the JSON-RPC payload.
 83
 84    This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.)
 85    treat a separately-deployed A2A agent service as a first-class agent — no code
 86    changes needed in the orchestrator.
 87
 88    Args:
 89        endpoint_url: Base URL of the remote agent service
 90            (e.g. ``"http://search-agent:8080"``).
 91            ``/rpc`` is appended automatically.
 92        timeout: HTTP request timeout in seconds (default: 30).
 93        headers: Extra HTTP headers sent with every request (e.g. ``Authorization``).
 94        agent_id: Stable identifier used in logging/metrics. Defaults to ``"A2AClient"``.
 95        logger: Optional :class:`BasicLogger`.
 96        metrics: Optional :class:`BasicMetricsCollector`.
 97
 98    Example::
 99
100        agent = A2AClient(
101            endpoint_url="http://search-agent-service:8080",
102            agent_id="search_agent",
103        )
104        result = await agent.execute("Find recent AI papers")
105
106    Discovery::
107
108        card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
109        agent = A2AClient(
110            endpoint_url=card["url"],
111            agent_id=card["name"],
112        )
113    """
114
115    def __init__(
116        self,
117        endpoint_url: str,
118        timeout: float = 30.0,
119        headers: Optional[Dict[str, str]] = None,
120        behaviors: Optional[List["BaseBehavior"]] = None,
121        agent_id: Optional[str] = None,
122        logger: Optional[BasicLogger] = None,
123        metrics: Optional[BasicMetricsCollector] = None,
124    ) -> None:
125        super().__init__(
126            llm_gateway=None,
127            behaviors=behaviors,
128            agent_id=agent_id or "A2AClient",
129            logger=logger,
130            metrics=metrics,
131        )
132        self.endpoint_url = endpoint_url.rstrip("/")
133        self.timeout = timeout
134        self._headers = {"Content-Type": "application/json", **(headers or {})}
135
136    # ------------------------------------------------------------------
137    # BaseAgent implementation
138    # ------------------------------------------------------------------
139
140    async def execute(
141        self, task: str, context: Optional[Dict[str, Any]] = None
142    ) -> AgentResult:
143        bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {})
144        bctx = await self._apply_behaviors_before(bctx)
145
146        while True:
147            self._log_execution_start(task)
148            try:
149                result = await self._call_remote(task, context or {})
150            except Exception as exc:
151                fallback = await self._apply_behaviors_on_error(bctx, exc)
152                if fallback is RETRY_SENTINEL:
153                    bctx.attempt += 1
154                    continue
155                if fallback is not None:
156                    return fallback  # type: ignore[return-value]
157                raise
158
159            result = await self._apply_behaviors_after(bctx, result)
160            self._log_execution_end(task, result.success, len(result.steps))
161            return result
162
163    async def stream_execute(
164        self, task: str, context: Optional[Dict[str, Any]] = None
165    ) -> AsyncIterator[AgentStep]:
166        """Executes remotely then yields each step from the response."""
167        result = await self.execute(task, context)
168        for step in result.steps:
169            yield step
170
171    # ------------------------------------------------------------------
172    # A2A JSON-RPC transport
173    # ------------------------------------------------------------------
174
175    async def _call_remote(self, task: str, context: Dict[str, Any]) -> AgentResult:
176        """Send an A2A tasks/send JSON-RPC 2.0 request and return an AgentResult."""
177        url = f"{self.endpoint_url}/rpc"
178        request_id = str(uuid4())
179        task_id = str(uuid4())
180
181        payload = JsonRpcRequest(
182            id=request_id,
183            method="tasks/send",
184            params=A2ATaskParams(
185                id=task_id,
186                message=A2AMessage(
187                    parts=[A2APart(type="text", text=task)],
188                    # Context (user_assertion, obo_token, locale, etc.) is carried
189                    # in message.metadata — A2A agents read it from there.
190                    metadata=context,
191                ),
192            ),
193        ).model_dump()
194
195        self._logger.info(
196            "A2AClient tasks/send",
197            url=url,
198            agent_id=self.agent_id,
199            task_id=task_id,
200        )
201
202        async with httpx.AsyncClient(timeout=self.timeout) as client:
203            try:
204                response = await client.post(url, json=payload, headers=self._headers)
205            except httpx.ConnectError as exc:
206                raise A2AClientError(
207                    f"Cannot reach remote agent at {url}: {exc}"
208                ) from exc
209            except httpx.TimeoutException as exc:
210                raise A2AClientError(
211                    f"Remote agent timed out after {self.timeout}s: {exc}"
212                ) from exc
213
214        if response.status_code != 200:
215            raise A2AClientError(
216                f"Remote agent returned HTTP {response.status_code}: {response.text[:200]}"
217            )
218
219        return self._parse_response(response.text)
220
221    def _parse_response(self, body: str) -> AgentResult:
222        """Parse an A2A JSON-RPC tasks/send response into an AgentResult."""
223        try:
224            envelope = json.loads(body)
225        except json.JSONDecodeError as exc:
226            raise A2AClientError(f"Remote agent returned non-JSON body: {body[:200]}") from exc
227
228        if "error" in envelope:
229            err = JsonRpcErrorResponse.model_validate(envelope)
230            raise A2AClientError(
231                f"A2A JSON-RPC error {err.error.code}: {err.error.message}"
232            )
233
234        try:
235            response = JsonRpcResponse.model_validate(envelope)
236        except Exception as exc:
237            raise A2AClientError(f"Malformed A2A response: {exc}") from exc
238
239        task = response.result
240        success = task.status.state == "completed"
241        error_msg: Optional[str] = task.status.message if not success else None
242
243        # Extract text output from the first artifact's first text part
244        output = ""
245        for artifact in task.artifacts:
246            for part in artifact.parts:
247                if part.type == "text":
248                    output = part.text or ""
249                    break
250            if output:
251                break
252
253        # Steps — validate each raw step dict through AgentStepModel
254        steps: List[AgentStep] = []
255        for raw_step in task.metadata.get("steps", []):
256            s = AgentStepModel.model_validate(raw_step)
257            steps.append(
258                AgentStep(
259                    thought=s.thought,
260                    action=s.action,
261                    action_input=s.action_input,
262                    observation=s.observation,
263                    metadata=s.metadata,
264                )
265            )
266
267        # Propagate all task metadata except "steps" (already parsed above)
268        metadata = {k: v for k, v in task.metadata.items() if k != "steps"}
269        metadata["a2a_task_id"] = task.id
270
271        return AgentResult(
272            output=output,
273            steps=steps,
274            metadata=metadata,
275            success=success,
276            error=error_msg,
277        )
278
279    # ------------------------------------------------------------------
280    # A2A discovery helper
281    # ------------------------------------------------------------------
282
283    @staticmethod
284    async def fetch_agent_card(
285        base_url: str,
286        timeout: float = 10.0,
287        headers: Optional[Dict[str, str]] = None,
288    ) -> AgentCard:
289        """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``.
290
291        Returns an :class:`AgentCard` instance. Standard fields: ``name``,
292        ``description``, ``url``, ``version``, ``skills``.
293
294        Args:
295            base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``).
296            timeout: HTTP timeout in seconds.
297            headers: Optional extra request headers.
298
299        Raises:
300            A2AClientError: if the endpoint is unreachable or returns non-200.
301        """
302        url = f"{base_url.rstrip('/')}/.well-known/agent.json"
303        async with httpx.AsyncClient(timeout=timeout) as client:
304            try:
305                response = await client.get(url, headers=headers or {})
306            except httpx.ConnectError as exc:
307                raise A2AClientError(
308                    f"Cannot reach agent card at {url}: {exc}"
309                ) from exc
310            except httpx.TimeoutException as exc:
311                raise A2AClientError(
312                    f"Timed out fetching agent card from {url}: {exc}"
313                ) from exc
314
315        if response.status_code != 200:
316            raise A2AClientError(
317                f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}"
318            )
319
320        try:
321            return AgentCard.model_validate(response.json())
322        except json.JSONDecodeError as exc:
323            raise A2AClientError(
324                f"Agent card at {url} returned non-JSON: {response.text[:200]}"
325            ) from exc
326        except Exception as exc:
327            raise A2AClientError(
328                f"Agent card at {url} has invalid schema: {exc}"
329            ) from exc

Proxies execute() calls to a remote A2A-compliant agent service.

Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 tasks/send over HTTP. Context (user_assertion, obo_token, locale, etc.) is forwarded as message.metadata in the JSON-RPC payload.

This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) treat a separately-deployed A2A agent service as a first-class agent — no code changes needed in the orchestrator.

Args: endpoint_url: Base URL of the remote agent service (e.g. "http://search-agent:8080"). /rpc is appended automatically. timeout: HTTP request timeout in seconds (default: 30). headers: Extra HTTP headers sent with every request (e.g. Authorization). agent_id: Stable identifier used in logging/metrics. Defaults to "A2AClient". logger: Optional BasicLogger. metrics: Optional BasicMetricsCollector.

Example::

agent = A2AClient(
    endpoint_url="http://search-agent-service:8080",
    agent_id="search_agent",
)
result = await agent.execute("Find recent AI papers")

Discovery::

card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
agent = A2AClient(
    endpoint_url=card["url"],
    agent_id=card["name"],
)
A2AClient( endpoint_url: str, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None, behaviors: Optional[List[gmf_forge_ai_orchestration.BaseBehavior]] = None, agent_id: Optional[str] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None)
115    def __init__(
116        self,
117        endpoint_url: str,
118        timeout: float = 30.0,
119        headers: Optional[Dict[str, str]] = None,
120        behaviors: Optional[List["BaseBehavior"]] = None,
121        agent_id: Optional[str] = None,
122        logger: Optional[BasicLogger] = None,
123        metrics: Optional[BasicMetricsCollector] = None,
124    ) -> None:
125        super().__init__(
126            llm_gateway=None,
127            behaviors=behaviors,
128            agent_id=agent_id or "A2AClient",
129            logger=logger,
130            metrics=metrics,
131        )
132        self.endpoint_url = endpoint_url.rstrip("/")
133        self.timeout = timeout
134        self._headers = {"Content-Type": "application/json", **(headers or {})}
endpoint_url
timeout
async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> gmf_forge_ai_orchestration.AgentResult:
140    async def execute(
141        self, task: str, context: Optional[Dict[str, Any]] = None
142    ) -> AgentResult:
143        bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {})
144        bctx = await self._apply_behaviors_before(bctx)
145
146        while True:
147            self._log_execution_start(task)
148            try:
149                result = await self._call_remote(task, context or {})
150            except Exception as exc:
151                fallback = await self._apply_behaviors_on_error(bctx, exc)
152                if fallback is RETRY_SENTINEL:
153                    bctx.attempt += 1
154                    continue
155                if fallback is not None:
156                    return fallback  # type: ignore[return-value]
157                raise
158
159            result = await self._apply_behaviors_after(bctx, result)
160            self._log_execution_end(task, result.success, len(result.steps))
161            return result

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[gmf_forge_ai_orchestration.AgentStep]:
163    async def stream_execute(
164        self, task: str, context: Optional[Dict[str, Any]] = None
165    ) -> AsyncIterator[AgentStep]:
166        """Executes remotely then yields each step from the response."""
167        result = await self.execute(task, context)
168        for step in result.steps:
169            yield step

Executes remotely then yields each step from the response.

@staticmethod
async def fetch_agent_card( base_url: str, timeout: float = 10.0, headers: Optional[Dict[str, str]] = None) -> gmf_forge_ai_orchestration.protocols.a2a.AgentCard:
283    @staticmethod
284    async def fetch_agent_card(
285        base_url: str,
286        timeout: float = 10.0,
287        headers: Optional[Dict[str, str]] = None,
288    ) -> AgentCard:
289        """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``.
290
291        Returns an :class:`AgentCard` instance. Standard fields: ``name``,
292        ``description``, ``url``, ``version``, ``skills``.
293
294        Args:
295            base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``).
296            timeout: HTTP timeout in seconds.
297            headers: Optional extra request headers.
298
299        Raises:
300            A2AClientError: if the endpoint is unreachable or returns non-200.
301        """
302        url = f"{base_url.rstrip('/')}/.well-known/agent.json"
303        async with httpx.AsyncClient(timeout=timeout) as client:
304            try:
305                response = await client.get(url, headers=headers or {})
306            except httpx.ConnectError as exc:
307                raise A2AClientError(
308                    f"Cannot reach agent card at {url}: {exc}"
309                ) from exc
310            except httpx.TimeoutException as exc:
311                raise A2AClientError(
312                    f"Timed out fetching agent card from {url}: {exc}"
313                ) from exc
314
315        if response.status_code != 200:
316            raise A2AClientError(
317                f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}"
318            )
319
320        try:
321            return AgentCard.model_validate(response.json())
322        except json.JSONDecodeError as exc:
323            raise A2AClientError(
324                f"Agent card at {url} returned non-JSON: {response.text[:200]}"
325            ) from exc
326        except Exception as exc:
327            raise A2AClientError(
328                f"Agent card at {url} has invalid schema: {exc}"
329            ) from exc

Fetch the A2A Agent Card from GET /.well-known/agent.json.

Returns an AgentCard instance. Standard fields: name, description, url, version, skills.

Args: base_url: Base URL of the remote agent (e.g. "http://agent:8080"). timeout: HTTP timeout in seconds. headers: Optional extra request headers.

Raises: A2AClientError: if the endpoint is unreachable or returns non-200.

class A2AClientError(builtins.RuntimeError):
72class A2AClientError(RuntimeError):
73    """Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol."""

Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol.

class A2AAdapter:
 43class A2AAdapter:
 44    """Protocol-only A2A receiver adapter.
 45
 46    The adapter does not know about tools, checkpoints, routing, or any other
 47    orchestration concerns. It only translates between the A2A wire protocol and
 48    the host agent's execution callback.
 49    """
 50
 51    def __init__(
 52        self,
 53        agent_id: str,
 54        description: str,
 55        url: str,
 56        version: str = "0.1.0",
 57        skills: Optional[Sequence[Mapping[str, Any]]] = None,
 58        logger: Optional[BasicLogger] = None,
 59    ) -> None:
 60        self.agent_id = agent_id
 61        self.description = description
 62        self.url = url.rstrip("/")
 63        self.version = version
 64        self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])]
 65        self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
 66
 67    def agent_card(self) -> AgentCard:
 68        """Return the A2A Agent Card for ``GET /.well-known/agent.json``."""
 69        return AgentCard(
 70            name=self.agent_id,
 71            description=self.description,
 72            url=self.url,
 73            version=self.version,
 74            skills=[AgentSkill(**s) for s in self._skills],
 75        )
 76
 77    def build_jsonrpc_error(
 78        self,
 79        rpc_id: Optional[str],
 80        code: int,
 81        message: str,
 82        data: Optional[Any] = None,
 83    ) -> Dict[str, Any]:
 84        """Build a JSON-RPC 2.0 error response."""
 85        return JsonRpcErrorResponse(
 86            id=rpc_id,
 87            error=JsonRpcError(code=code, message=message, data=data),
 88        ).model_dump()
 89
 90    def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]:
 91        """Parse a ``tasks/send`` JSON-RPC request body.
 92
 93        Returns:
 94            A tuple of ``(rpc_id, task_id, task_text, context)``.
 95        """
 96        method = str(body.get("method", ""))
 97        if method != "tasks/send":
 98            raise A2AAdapterError(f"Method not found: {method}")
 99
100        params = body.get("params", {})
101        if not isinstance(params, Mapping):
102            raise A2AAdapterError("Invalid params payload")
103
104        rpc_id = body.get("id")
105        task_id = str(params.get("id") or uuid4())
106
107        message = params.get("message", {})
108        if not isinstance(message, Mapping):
109            raise A2AAdapterError("Invalid message payload")
110
111        task_text = self._extract_task_text(message.get("parts", []))
112        context = message.get("metadata", {})
113        if not isinstance(context, Mapping):
114            raise A2AAdapterError("Invalid message metadata payload")
115
116        return rpc_id, task_id, task_text, dict(context)
117
118    def build_task_response(
119        self,
120        rpc_id: Optional[str],
121        task_id: str,
122        result: AgentResult,
123    ) -> Dict[str, Any]:
124        """Convert an :class:`AgentResult` into an A2A JSON-RPC success response."""
125        metadata = dict(result.metadata)
126        metadata["steps"] = [self._serialize_step(step) for step in result.steps]
127        metadata.setdefault("a2a_task_id", task_id)
128
129        artifacts = (
130            [A2AArtifact(parts=[A2APart(type="text", text=result.output)])]
131            if result.output
132            else []
133        )
134
135        return JsonRpcResponse(
136            id=rpc_id,
137            result=A2ATask(
138                id=task_id,
139                status=A2ATaskStatus(
140                    state="completed" if result.success else "failed",
141                    message=result.error if not result.success else None,
142                ),
143                artifacts=artifacts,
144                metadata=metadata,
145            ),
146        ).model_dump()
147
148    async def handle_rpc(
149        self,
150        body: Mapping[str, Any],
151        execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]],
152    ) -> Dict[str, Any]:
153        """Handle an incoming A2A JSON-RPC body and return the response payload."""
154        rpc_id = body.get("id") if isinstance(body, Mapping) else None
155
156        try:
157            rpc_id, task_id, task_text, context = self.parse_tasks_send(body)
158        except A2AAdapterError as exc:
159            code = -32601 if str(exc).startswith("Method not found") else -32602
160            return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc))
161
162        self._logger.info(
163            "A2A tasks/send received",
164            agent_id=self.agent_id,
165            task_id=task_id,
166            task_preview=task_text[:80],
167        )
168
169        try:
170            result = await execute(task_text, context)
171        except Exception as exc:
172            self._logger.error(
173                "Agent execution failed",
174                agent_id=self.agent_id,
175                task_id=task_id,
176                error=str(exc),
177            )
178            return self.build_jsonrpc_error(
179                rpc_id if isinstance(rpc_id, str) else None,
180                -32603,
181                str(exc),
182            )
183
184        return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)
185
186    def _extract_task_text(self, parts: Any) -> str:
187        if not isinstance(parts, Sequence):
188            raise A2AAdapterError("Invalid message parts payload")
189
190        for part in parts:
191            if isinstance(part, Mapping) and part.get("type") == "text":
192                return str(part.get("text", ""))
193        return ""
194
195    def _serialize_step(self, step: AgentStep) -> Dict[str, Any]:
196        return AgentStepModel(
197            thought=step.thought,
198            action=step.action,
199            action_input=step.action_input,
200            observation=step.observation,
201            metadata=step.metadata,
202        ).model_dump()

Protocol-only A2A receiver adapter.

The adapter does not know about tools, checkpoints, routing, or any other orchestration concerns. It only translates between the A2A wire protocol and the host agent's execution callback.

A2AAdapter( agent_id: str, description: str, url: str, version: str = '0.1.0', skills: Optional[Sequence[Mapping[str, Any]]] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None)
51    def __init__(
52        self,
53        agent_id: str,
54        description: str,
55        url: str,
56        version: str = "0.1.0",
57        skills: Optional[Sequence[Mapping[str, Any]]] = None,
58        logger: Optional[BasicLogger] = None,
59    ) -> None:
60        self.agent_id = agent_id
61        self.description = description
62        self.url = url.rstrip("/")
63        self.version = version
64        self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])]
65        self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
agent_id
description
url
version
def agent_card(self) -> gmf_forge_ai_orchestration.protocols.a2a.AgentCard:
67    def agent_card(self) -> AgentCard:
68        """Return the A2A Agent Card for ``GET /.well-known/agent.json``."""
69        return AgentCard(
70            name=self.agent_id,
71            description=self.description,
72            url=self.url,
73            version=self.version,
74            skills=[AgentSkill(**s) for s in self._skills],
75        )

Return the A2A Agent Card for GET /.well-known/agent.json.

def build_jsonrpc_error( self, rpc_id: Optional[str], code: int, message: str, data: Optional[Any] = None) -> Dict[str, Any]:
77    def build_jsonrpc_error(
78        self,
79        rpc_id: Optional[str],
80        code: int,
81        message: str,
82        data: Optional[Any] = None,
83    ) -> Dict[str, Any]:
84        """Build a JSON-RPC 2.0 error response."""
85        return JsonRpcErrorResponse(
86            id=rpc_id,
87            error=JsonRpcError(code=code, message=message, data=data),
88        ).model_dump()

Build a JSON-RPC 2.0 error response.

def parse_tasks_send( self, body: Mapping[str, typing.Any]) -> tuple[typing.Optional[str], str, str, typing.Dict[str, typing.Any]]:
 90    def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]:
 91        """Parse a ``tasks/send`` JSON-RPC request body.
 92
 93        Returns:
 94            A tuple of ``(rpc_id, task_id, task_text, context)``.
 95        """
 96        method = str(body.get("method", ""))
 97        if method != "tasks/send":
 98            raise A2AAdapterError(f"Method not found: {method}")
 99
100        params = body.get("params", {})
101        if not isinstance(params, Mapping):
102            raise A2AAdapterError("Invalid params payload")
103
104        rpc_id = body.get("id")
105        task_id = str(params.get("id") or uuid4())
106
107        message = params.get("message", {})
108        if not isinstance(message, Mapping):
109            raise A2AAdapterError("Invalid message payload")
110
111        task_text = self._extract_task_text(message.get("parts", []))
112        context = message.get("metadata", {})
113        if not isinstance(context, Mapping):
114            raise A2AAdapterError("Invalid message metadata payload")
115
116        return rpc_id, task_id, task_text, dict(context)

Parse a tasks/send JSON-RPC request body.

Returns: A tuple of (rpc_id, task_id, task_text, context).

def build_task_response( self, rpc_id: Optional[str], task_id: str, result: gmf_forge_ai_orchestration.AgentResult) -> Dict[str, Any]:
118    def build_task_response(
119        self,
120        rpc_id: Optional[str],
121        task_id: str,
122        result: AgentResult,
123    ) -> Dict[str, Any]:
124        """Convert an :class:`AgentResult` into an A2A JSON-RPC success response."""
125        metadata = dict(result.metadata)
126        metadata["steps"] = [self._serialize_step(step) for step in result.steps]
127        metadata.setdefault("a2a_task_id", task_id)
128
129        artifacts = (
130            [A2AArtifact(parts=[A2APart(type="text", text=result.output)])]
131            if result.output
132            else []
133        )
134
135        return JsonRpcResponse(
136            id=rpc_id,
137            result=A2ATask(
138                id=task_id,
139                status=A2ATaskStatus(
140                    state="completed" if result.success else "failed",
141                    message=result.error if not result.success else None,
142                ),
143                artifacts=artifacts,
144                metadata=metadata,
145            ),
146        ).model_dump()

Convert an AgentResult into an A2A JSON-RPC success response.

async def handle_rpc( self, body: Mapping[str, typing.Any], execute: Callable[[str, typing.Dict[str, typing.Any]], Awaitable[gmf_forge_ai_orchestration.AgentResult]]) -> Dict[str, Any]:
148    async def handle_rpc(
149        self,
150        body: Mapping[str, Any],
151        execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]],
152    ) -> Dict[str, Any]:
153        """Handle an incoming A2A JSON-RPC body and return the response payload."""
154        rpc_id = body.get("id") if isinstance(body, Mapping) else None
155
156        try:
157            rpc_id, task_id, task_text, context = self.parse_tasks_send(body)
158        except A2AAdapterError as exc:
159            code = -32601 if str(exc).startswith("Method not found") else -32602
160            return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc))
161
162        self._logger.info(
163            "A2A tasks/send received",
164            agent_id=self.agent_id,
165            task_id=task_id,
166            task_preview=task_text[:80],
167        )
168
169        try:
170            result = await execute(task_text, context)
171        except Exception as exc:
172            self._logger.error(
173                "Agent execution failed",
174                agent_id=self.agent_id,
175                task_id=task_id,
176                error=str(exc),
177            )
178            return self.build_jsonrpc_error(
179                rpc_id if isinstance(rpc_id, str) else None,
180                -32603,
181                str(exc),
182            )
183
184        return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)

Handle an incoming A2A JSON-RPC body and return the response payload.

class A2AAdapterError(builtins.RuntimeError):
39class A2AAdapterError(RuntimeError):
40    """Raised when an incoming A2A request is malformed or unsupported."""

Raised when an incoming A2A request is malformed or unsupported.