gmf_forge_ai_orchestration.agents

Agents — ReAct, Plan-Execute, Reflexion, Chain-of-Thought, and A2A.

Choosing the right agent

ReActAgent Use when the task requires tool calls (search, lookup, API calls). Interleaves reasoning (Thought) and acting (Action/Observation) in a loop. The system prompt MUST include the Thought:/Action:/Action Input: format contract — without it the agent degrades to a single-pass answer with no tool calls. Supports a custom system_prompt.

ChainOfThoughtAgent Use when the task requires structured reasoning but NO tool calls. A single LLM call that reasons inside <thinking> tags before answering. Simpler and cheaper than ReAct — good for classification, analysis, or multi-step math. Supports a custom system_prompt.

PlanExecuteAgent Use when the task has clearly separable sequential steps known upfront. Phase 1 asks the LLM to decompose the task into a JSON step list. Phase 2 executes each step sequentially; each step can call tools via an inner Thought/Action loop. Good for structured workflows (e.g., search → summarise → write). Supports custom plan_prompt and execute_prompt.

ReflexionAgent Wrapper — not a standalone agent. Adds a self-critique-and-retry loop around any inner agent. Use when output quality matters more than latency and a second-pass correction is acceptable.

A2AClient Use to call a separately-deployed A2A-compliant agent service. Sends tasks via JSON-RPC 2.0 tasks/send (A2A protocol). Discovers agent capabilities via GET /.well-known/agent.json (Agent Card). No local LLM or tools needed.

A2AAdapter Use on the receiving side of an A2A service. Handles the A2A protocol boilerplate for discovery and tasks/send dispatch while leaving business logic to the host agent.

 1"""Agents — ReAct, Plan-Execute, Reflexion, Chain-of-Thought, and A2A.
 2
 3Choosing the right agent
 4------------------------
 5
 6:class:`ReActAgent`
 7    **Use when the task requires tool calls** (search, lookup, API calls).
 8    Interleaves reasoning (Thought) and acting (Action/Observation) in a loop.
 9    The system prompt MUST include the ``Thought:/Action:/Action Input:`` format
10    contract — without it the agent degrades to a single-pass answer with no
11    tool calls. Supports a custom ``system_prompt``.
12
13:class:`ChainOfThoughtAgent`
14    **Use when the task requires structured reasoning but NO tool calls.**
15    A single LLM call that reasons inside ``<thinking>`` tags before answering.
16    Simpler and cheaper than ReAct — good for classification, analysis, or
17    multi-step math. Supports a custom ``system_prompt``.
18
19:class:`PlanExecuteAgent`
20    **Use when the task has clearly separable sequential steps known upfront.**
21    Phase 1 asks the LLM to decompose the task into a JSON step list.
22    Phase 2 executes each step sequentially; each step can call tools via
23    an inner Thought/Action loop. Good for structured workflows (e.g., search
24    → summarise → write). Supports custom ``plan_prompt`` and ``execute_prompt``.
25
26:class:`ReflexionAgent`
27    **Wrapper — not a standalone agent.** Adds a self-critique-and-retry loop
28    around any inner agent. Use when output quality matters more than latency
29    and a second-pass correction is acceptable.
30
31:class:`A2AClient`
32    **Use to call a separately-deployed A2A-compliant agent service.**
33    Sends tasks via JSON-RPC 2.0 ``tasks/send`` (A2A protocol).
34    Discovers agent capabilities via ``GET /.well-known/agent.json`` (Agent Card).
35    No local LLM or tools needed.
36
37:class:`A2AAdapter`
38    **Use on the receiving side of an A2A service.**
39    Handles the A2A protocol boilerplate for discovery and ``tasks/send``
40    dispatch while leaving business logic to the host agent.
41"""
42
43from gmf_forge_ai_orchestration.agents.base import AgentResult, AgentStep, BaseAgent
44from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
45from gmf_forge_ai_orchestration.agents.plan_execute_agent import PlanExecuteAgent
46from gmf_forge_ai_orchestration.agents.reflexion_agent import ReflexionAgent
47from gmf_forge_ai_orchestration.agents.chain_of_thought_agent import ChainOfThoughtAgent
48from gmf_forge_ai_orchestration.protocols.a2a.a2a_client import (
49    A2AClient,
50    A2AClientError,
51)
52from gmf_forge_ai_orchestration.protocols.a2a.a2a_adapter import (
53    A2AAdapter,
54    A2AAdapterError,
55)
56
57__all__ = [
58    "AgentResult",
59    "AgentStep",
60    "BaseAgent",
61    "ReActAgent",
62    "PlanExecuteAgent",
63    "ReflexionAgent",
64    "ChainOfThoughtAgent",
65    "A2AClient",
66    "A2AClientError",
67    "A2AAdapter",
68    "A2AAdapterError",
69]
@dataclass
class AgentResult:
34@dataclass
35class AgentResult:
36    """The final result of an agent execution."""
37
38    output: str
39    steps: List[AgentStep] = field(default_factory=list)
40    metadata: Dict[str, Any] = field(default_factory=dict)
41    success: bool = True
42    error: Optional[str] = None

The final result of an agent execution.

AgentResult( output: str, steps: List[AgentStep] = <factory>, metadata: Dict[str, Any] = <factory>, success: bool = True, error: Optional[str] = None)
output: str
steps: List[AgentStep]
metadata: Dict[str, Any]
success: bool = True
error: Optional[str] = None
@dataclass
class AgentStep:
23@dataclass
24class AgentStep:
25    """One thought/action/observation cycle within an agent execution."""
26
27    thought: str
28    action: str
29    action_input: Dict[str, Any] = field(default_factory=dict)
30    observation: str = ""
31    metadata: Dict[str, Any] = field(default_factory=dict)

One thought/action/observation cycle within an agent execution.

AgentStep( thought: str, action: str, action_input: Dict[str, Any] = <factory>, observation: str = '', metadata: Dict[str, Any] = <factory>)
thought: str
action: str
action_input: Dict[str, Any]
observation: str = ''
metadata: Dict[str, Any]
class BaseAgent(abc.ABC):
 45class BaseAgent(ABC):
 46    """
 47    Abstract base class for all agents.
 48
 49    Wires together:
 50    - :class:`UnifiedLLMGateway` for LLM calls
 51    - :class:`ToolRegistry` for tool discovery and execution
 52    - :class:`BaseBehavior` pipeline applied around every execution
 53    - :class:`BaseStateStore` for persisting conversation/step state
 54    - Full shared-core observability stack (Logger, Metrics, PerformanceMonitor, Tracing)
 55
 56    Args:
 57        llm_gateway: Required. The LLM gateway to use for completions.
 58        tool_registry: Optional tool registry. Provides available tools to the agent.
 59        behaviors: Ordered list of behaviors applied around each execution.
 60        state_store: Optional state store for persisting steps and conversation.
 61        agent_id: Stable identifier used in logging and metrics. Defaults to class name.
 62        logger: Optional :class:`BasicLogger`. Created automatically if omitted.
 63        metrics: Optional :class:`BasicMetricsCollector`.
 64        performance_monitor: Optional :class:`BasicPerformanceMonitor`.
 65        tracer: Optional :class:`TracingProvider`. Falls back to ``get_tracer()``.
 66    """
 67
 68    def __init__(
 69        self,
 70        llm_gateway: Optional["UnifiedLLMGateway"] = None,
 71        tool_registry: Optional["ToolRegistry"] = None,
 72        behaviors: Optional[List["BaseBehavior"]] = None,
 73        state_store: Optional["BaseStateStore"] = None,
 74        checkpoint_manager: Optional["CheckpointManager"] = None,
 75        agent_id: Optional[str] = None,
 76        logger: Optional[BasicLogger] = None,
 77        metrics: Optional[BasicMetricsCollector] = None,
 78        performance_monitor: Optional[BasicPerformanceMonitor] = None,
 79        tracer: Optional[TracingProvider] = None,
 80    ) -> None:
 81        self.llm_gateway = llm_gateway
 82        self.tool_registry = tool_registry
 83        self.behaviors: List["BaseBehavior"] = behaviors or []
 84        self.state_store = state_store
 85        self.checkpoint_manager = checkpoint_manager
 86        self.agent_id = agent_id or self.__class__.__name__
 87        self._logger = logger or BasicLogger(f"gmf_forge_ai.agent.{self.agent_id}")
 88        self._metrics = metrics
 89        self._performance_monitor = performance_monitor
 90        self._tracer = tracer or get_tracer()
 91
 92    # ------------------------------------------------------------------
 93    # Abstract interface
 94    # ------------------------------------------------------------------
 95
 96    @abstractmethod
 97    async def execute(
 98        self, task: str, context: Optional[Dict[str, Any]] = None
 99    ) -> AgentResult:
100        """Execute the task and return a result."""
101
102    @abstractmethod
103    async def stream_execute(
104        self, task: str, context: Optional[Dict[str, Any]] = None
105    ) -> AsyncIterator[AgentStep]:
106        """Execute the task, yielding each step as it completes."""
107
108    # ------------------------------------------------------------------
109    # Behavior pipeline helpers (called by subclasses)
110    # ------------------------------------------------------------------
111
112    async def _apply_behaviors_before(
113        self, context: "BehaviorContext"
114    ) -> "BehaviorContext":
115        for behavior in self.behaviors:
116            context = await behavior.before_execute(context)
117        return context
118
119    async def _apply_behaviors_after(
120        self, context: "BehaviorContext", result: AgentResult
121    ) -> AgentResult:
122        for behavior in self.behaviors:
123            result = await behavior.after_execute(context, result)
124        return result
125
126    async def _apply_behaviors_on_error(
127        self, context: "BehaviorContext", error: Exception
128    ) -> Optional[AgentResult]:
129        for behavior in self.behaviors:
130            fallback = await behavior.on_error(context, error)
131            if fallback is not None:
132                return fallback
133        return None
134
135    # ------------------------------------------------------------------
136    # Shared observability helpers (called by subclasses)
137    # ------------------------------------------------------------------
138
139    def _log_execution_start(self, task: str) -> None:
140        self._logger.info(
141            "Agent execution started", agent_id=self.agent_id, task=task
142        )
143        if self._metrics:
144            self._metrics.increment("agent.executions", agent_id=self.agent_id)
145
146    def _log_execution_end(self, task: str, success: bool, steps: int) -> None:
147        self._logger.info(
148            "Agent execution finished",
149            agent_id=self.agent_id,
150            task=task,
151            success=success,
152            steps=steps,
153        )
154        if self._metrics:
155            self._metrics.histogram("agent.steps", steps, agent_id=self.agent_id)
156
157    def _log_execution_error(self, task: str, error: Exception) -> None:
158        self._logger.error(
159            "Agent execution error",
160            agent_id=self.agent_id,
161            task=task,
162            error=str(error),
163            error_type=type(error).__name__,
164        )
165        if self._metrics:
166            self._metrics.increment("agent.errors", agent_id=self.agent_id)

Abstract base class for all agents.

Wires together:

  • UnifiedLLMGateway for LLM calls
  • ToolRegistry for tool discovery and execution
  • BaseBehavior pipeline applied around every execution
  • BaseStateStore for persisting conversation/step state
  • Full shared-core observability stack (Logger, Metrics, PerformanceMonitor, Tracing)

Args: llm_gateway: Required. The LLM gateway to use for completions. tool_registry: Optional tool registry. Provides available tools to the agent. behaviors: Ordered list of behaviors applied around each execution. state_store: Optional state store for persisting steps and conversation. agent_id: Stable identifier used in logging and metrics. Defaults to class name. logger: Optional BasicLogger. Created automatically if omitted. metrics: Optional BasicMetricsCollector. performance_monitor: Optional BasicPerformanceMonitor. tracer: Optional TracingProvider. Falls back to get_tracer().

llm_gateway
tool_registry
state_store
checkpoint_manager
agent_id
@abstractmethod
async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AgentResult:
 96    @abstractmethod
 97    async def execute(
 98        self, task: str, context: Optional[Dict[str, Any]] = None
 99    ) -> AgentResult:
100        """Execute the task and return a result."""

Execute the task and return a result.

@abstractmethod
async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[AgentStep]:
102    @abstractmethod
103    async def stream_execute(
104        self, task: str, context: Optional[Dict[str, Any]] = None
105    ) -> AsyncIterator[AgentStep]:
106        """Execute the task, yielding each step as it completes."""

Execute the task, yielding each step as it completes.

class ReActAgent(gmf_forge_ai_orchestration.agents.BaseAgent):
 46class ReActAgent(BaseAgent):
 47    """
 48    ReAct agent: interleaves Reasoning (Thought) and Acting (Action/Observation).
 49
 50    **When to use:** Any task that requires tool calls (search, API lookup,
 51    database queries). The LLM reasons step by step and acts on each step.
 52
 53    **When NOT to use:** Pure reasoning/analysis tasks with no tools — use
 54    :class:`ChainOfThoughtAgent` instead (cheaper: one LLM call vs. many).
 55
 56    On each step the LLM produces a Thought, an Action (tool name or
 57    ``"Final Answer"``), and an Action Input. If the action is a tool call,
 58    the tool's output is fed back as an Observation and the loop continues.
 59
 60    .. warning::
 61        The system prompt MUST instruct the LLM to respond using the
 62        ``Thought:/Action:/Action Input:`` format. Without this contract the
 63        regex parser will not match, the agent will treat the first response
 64        as a Final Answer, and **no tool calls will ever be made**.
 65        Always include ``{tool_descriptions}`` in a custom prompt so the LLM
 66        knows what tools are available.
 67
 68    Args:
 69        max_steps: Maximum thought/action cycles before stopping (default: 10).
 70        model: LLM model name passed to the gateway (optional).
 71        temperature: Sampling temperature (default: 0.0 for determinism).
 72        system_prompt: Override the default ReAct system prompt. Use
 73            ``{tool_descriptions}`` as a placeholder if you want the agent's
 74            available tools listed in your prompt. The task is always appended
 75            separately and does not need a placeholder here.
 76
 77    All other args inherited from :class:`BaseAgent`.
 78    """
 79
 80    def __init__(self, *args: Any, max_steps: int = 10, model: Optional[str] = None,
 81                 temperature: float = 0.0, system_prompt: Optional[str] = None,
 82                 **kwargs: Any) -> None:
 83        super().__init__(*args, **kwargs)
 84        self.max_steps = max_steps
 85        self.model = model
 86        self.temperature = temperature
 87        self._system_prompt = system_prompt
 88
 89    #: Default ReAct system prompt used when no ``system_prompt`` is passed.
 90    #: Inspect this to understand the expected format or use it as a base
 91    #: for your own customisations.
 92    DEFAULT_SYSTEM_PROMPT: str = _REACT_SYSTEM
 93
 94    # ------------------------------------------------------------------
 95    # Internal prompt helpers
 96    # ------------------------------------------------------------------
 97
 98    def _tool_descriptions(self) -> str:
 99        if not self.tool_registry:
100            return "No tools available."
101        tools = self.tool_registry.list_tools()
102        if not tools:
103            return "No tools available."
104        lines = []
105        for t in tools:
106            lines.append(f"- {t.name}: {t.description}")
107        return "\n".join(lines)
108
109    def _build_prompt(self, task: str, history: List[AgentStep]) -> str:
110        template = self._system_prompt if self._system_prompt is not None else _REACT_SYSTEM
111        if "{tool_descriptions}" in template:
112            system = template.format(tool_descriptions=self._tool_descriptions())
113        else:
114            system = template
115        turns = [f"Task: {task}\n"]
116        for step in history:
117            turns.append(f"Thought: {step.thought}")
118            turns.append(f"Action: {step.action}")
119            turns.append(f"Action Input: {json.dumps(step.action_input)}")
120            if step.observation:
121                turns.append(f"Observation: {step.observation}")
122        return system + "\n" + "\n".join(turns)
123
124    def _parse_step(self, text: str) -> Optional[AgentStep]:
125        def _coerce_action_input(raw_input: str) -> Dict[str, Any]:
126            try:
127                action_input = json.loads(raw_input)
128                if isinstance(action_input, dict):
129                    return action_input
130            except (json.JSONDecodeError, ValueError):
131                pass
132            return {"raw": raw_input}
133
134        match = _REACT_STEP_PATTERN.search(text)
135        if match:
136            thought = match.group("thought").strip()
137            action = match.group("action").strip()
138            raw_input = match.group("action_input").strip()
139            action_input = _coerce_action_input(raw_input)
140            return AgentStep(thought=thought, action=action, action_input=action_input)
141
142        # Some models omit "Thought:" and return only Action/Action Input.
143        final_only = _REACT_FINAL_ONLY_PATTERN.search(text)
144        if final_only:
145            raw_input = final_only.group("action_input").strip()
146            action_input = _coerce_action_input(raw_input)
147            return AgentStep(thought="", action="Final Answer", action_input=action_input)
148
149        return None
150
151    def _extract_final_answer(self, step: AgentStep) -> str:
152        """Return normalized final answer text from a Final Answer step."""
153        answer_obj: Any
154        if isinstance(step.action_input, dict) and "answer" in step.action_input:
155            answer_obj = step.action_input["answer"]
156        else:
157            answer_obj = step.action_input
158
159        answer = str(answer_obj).strip()
160
161        # Unwrap nested ReAct control text accidentally returned as answer body.
162        for _ in range(3):
163            nested = _REACT_FINAL_ONLY_PATTERN.search(answer)
164            if nested:
165                raw_input = nested.group("action_input").strip()
166            elif answer.startswith("Final Answer:"):
167                raw_input = answer[len("Final Answer:"):].strip()
168            else:
169                break
170            try:
171                parsed = json.loads(raw_input)
172                if isinstance(parsed, dict) and "answer" in parsed:
173                    answer = str(parsed["answer"]).strip()
174                else:
175                    answer = raw_input
176            except (json.JSONDecodeError, ValueError):
177                answer = raw_input
178
179        return answer.strip()
180
181    async def _save_checkpoint(
182        self,
183        execution_id: str,
184        task: str,
185        context: Dict[str, Any],
186        steps: List[AgentStep],
187        step_number: int,
188    ) -> None:
189        if not self.checkpoint_manager:
190            return
191        await self.checkpoint_manager.save(
192            agent_id=self.agent_id,
193            execution_id=execution_id,
194            state={
195                "execution_id": execution_id,
196                "task": task,
197                "context": context,
198                "step_number": step_number,
199                "steps": [dataclasses.asdict(s) for s in steps],
200            },
201            metadata={"step_number": step_number},
202        )
203
204    async def resume_from(
205        self, execution_id: str, from_step: Optional[int] = None
206    ) -> AgentResult:
207        """Resume a ReAct execution from its latest (or selected) checkpoint."""
208        if not self.checkpoint_manager:
209            raise ValueError("Checkpoint manager is not configured for this agent")
210
211        checkpoint = None
212        if from_step is not None:
213            checkpoints = await self.checkpoint_manager.list_by_execution(execution_id)
214            for ckpt in checkpoints:
215                if ckpt.state.get("step_number") == from_step:
216                    checkpoint = ckpt
217                    break
218        else:
219            checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
220
221        if checkpoint is None:
222            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
223
224        state = checkpoint.state
225        task = str(state.get("task", ""))
226        context = state.get("context") or {}
227        raw_steps = state.get("steps") or []
228        steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)]
229
230        # If the checkpoint already contains a final answer, return idempotently.
231        if steps and steps[-1].action == "Final Answer":
232            answer = self._extract_final_answer(steps[-1])
233            return AgentResult(
234                output=str(answer),
235                steps=steps,
236                success=True,
237                metadata={"execution_id": execution_id},
238            )
239
240        ctx = BehaviorContext(
241            agent_id=self.agent_id,
242            task=task,
243            execution_id=execution_id,
244            last_completed_step=int(state.get("step_number", len(steps) - 1)),
245            metadata={"resume_agent": self},
246        )
247        ctx = await self._apply_behaviors_before(ctx)
248        self._log_execution_start(task)
249
250        for step_num in range(len(steps), self.max_steps):
251            prompt = self._build_prompt(task, steps)
252            response = await self.llm_gateway.complete(
253                prompt, model=self.model, temperature=self.temperature
254            )
255            step = self._parse_step(response.content)
256            if step is None:
257                step = AgentStep(
258                    thought="Could not parse structured response.",
259                    action="Final Answer",
260                    action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))},
261                )
262
263            if step.action == "Final Answer":
264                answer = self._extract_final_answer(step)
265                step.observation = answer
266                steps.append(step)
267                await self._save_checkpoint(execution_id, task, context, steps, step_num)
268                ctx.last_completed_step = step_num
269                result = AgentResult(
270                    output=answer,
271                    steps=steps,
272                    success=True,
273                    metadata={"execution_id": execution_id},
274                )
275                result = await self._apply_behaviors_after(ctx, result)
276                self._log_execution_end(task, success=True, steps=len(steps))
277                return result
278
279            observation = ""
280            if self.tool_registry:
281                try:
282                    tool_result = await self.tool_registry.execute(step.action, **step.action_input)
283                    observation = str(tool_result)
284                except Exception as tool_exc:
285                    observation = f"Tool error: {tool_exc}"
286            else:
287                observation = f"No tool registry — cannot execute '{step.action}'"
288
289            step.observation = observation
290            steps.append(step)
291            await self._save_checkpoint(execution_id, task, context, steps, step_num)
292            ctx.last_completed_step = step_num
293
294        final_output = steps[-1].observation if steps else "No output"
295        result = AgentResult(
296            output=final_output,
297            steps=steps,
298            success=False,
299            error=f"Reached max_steps={self.max_steps} without Final Answer",
300            metadata={"execution_id": execution_id},
301        )
302        result = await self._apply_behaviors_after(ctx, result)
303        self._log_execution_end(task, success=False, steps=len(steps))
304        return result
305
306    # ------------------------------------------------------------------
307    # BaseAgent implementation
308    # ------------------------------------------------------------------
309
310    async def execute(
311        self, task: str, context: Optional[Dict[str, Any]] = None
312    ) -> AgentResult:
313        execution_id = str(uuid4())
314        run_context = context or {}
315        ctx = BehaviorContext(
316            agent_id=self.agent_id,
317            task=task,
318            execution_id=execution_id,
319            metadata={"resume_agent": self, **run_context},
320        )
321        ctx = await self._apply_behaviors_before(ctx)
322        self._log_execution_start(task)
323
324        while True:
325            _retry = False
326            _control_exit = None  # HumanApprovalRequired or PendingApproval — not errors
327            steps: List[AgentStep] = []
328
329            with self._tracer.trace(
330                "react_agent.execute", input=task, metadata={"agent_id": self.agent_id}
331            ) as trace:
332                try:
333                    for _ in range(self.max_steps):
334                        prompt = self._build_prompt(task, steps)
335
336                        with trace.generation(
337                            "llm_call", model=self.model or "default", input=prompt
338                        ) as gen:
339                            perf_id = None
340                            if self._performance_monitor:
341                                perf_id = self._performance_monitor.start_request(
342                                    provider="llm_gateway", model=self.model or "default"
343                                )
344                            try:
345                                response = await self.llm_gateway.complete(
346                                    prompt, model=self.model, temperature=self.temperature
347                                )
348                                if self._performance_monitor and perf_id:
349                                    self._performance_monitor.end_request(
350                                        request_id=perf_id,
351                                        prompt_tokens=response.usage.get("prompt_tokens", 0),
352                                        completion_tokens=response.usage.get("completion_tokens", 0),
353                                        success=True,
354                                    )
355                                gen.set_output(response.content)
356                                gen.set_token_usage(**response.usage)
357                            except Exception as exc:
358                                if self._performance_monitor and perf_id:
359                                    self._performance_monitor.end_request(
360                                        request_id=perf_id,
361                                        prompt_tokens=0,
362                                        completion_tokens=0,
363                                        success=False,
364                                        error=str(exc),
365                                    )
366                                raise
367
368                        step = self._parse_step(response.content)
369                        if step is None:
370                            # Unparseable response — treat as final answer
371                            step = AgentStep(
372                                thought="Could not parse structured response.",
373                                action="Final Answer",
374                                action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))},
375                            )
376
377                        with trace.span("step", input=step.action) as span:
378                            if step.action == "Final Answer":
379                                answer = self._extract_final_answer(step)
380                                step.observation = answer
381                                steps.append(step)
382                                span.set_output(answer)
383
384                                # Persist to state store if available
385                                if self.state_store:
386                                    await self.state_store.set(
387                                        f"agent:{self.agent_id}:last_steps",
388                                        [vars(s) for s in steps],
389                                        ttl=self.checkpoint_manager.default_ttl if self.checkpoint_manager else None,
390                                    )
391
392                                await self._save_checkpoint(
393                                    execution_id,
394                                    task,
395                                    run_context,
396                                    steps,
397                                    len(steps) - 1,
398                                )
399                                ctx.last_completed_step = len(steps) - 1
400
401                                result = AgentResult(
402                                    output=answer,
403                                    steps=steps,
404                                    success=True,
405                                    metadata={"execution_id": execution_id},
406                                )
407                                try:
408                                    result = await self._apply_behaviors_after(ctx, result)
409                                except (HumanApprovalRequired, PendingApproval) as ctrl_exc:
410                                    _control_exit = ctrl_exc
411
412                                if _control_exit is None:
413                                    self._log_execution_end(task, success=True, steps=len(steps))
414                                    trace.set_output(answer)
415                                    return result
416                                else:
417                                    # Control-flow exit (HumanApprovalRequired / PendingApproval)
418                                    # span output already set above; also set trace output so
419                                    # it logs the answer rather than finishing with no output.
420                                    trace.set_output(answer)
421
422                            else:
423                                # Tool call
424                                observation = ""
425                                if self.tool_registry:
426                                    try:
427                                        tool_result = await self.tool_registry.execute(
428                                            step.action, **step.action_input
429                                        )
430                                        observation = str(tool_result)
431                                    except Exception as tool_exc:
432                                        observation = f"Tool error: {tool_exc}"
433                                else:
434                                    observation = f"No tool registry — cannot execute '{step.action}'"
435
436                                step.observation = observation
437                                span.set_output(observation)
438                                steps.append(step)
439                                await self._save_checkpoint(
440                                    execution_id,
441                                    task,
442                                    run_context,
443                                    steps,
444                                    len(steps) - 1,
445                                )
446                                ctx.last_completed_step = len(steps) - 1
447
448                        if _control_exit is not None:
449                            break  # exit for loop; span already closed cleanly
450
451                    # Max steps reached (or for loop broken by control-flow signal)
452                    if _control_exit is None:
453                        final_output = steps[-1].observation if steps else "No output"
454                        result = AgentResult(
455                            output=final_output,
456                            steps=steps,
457                            success=False,
458                            error=f"Reached max_steps={self.max_steps} without Final Answer",
459                            metadata={"execution_id": execution_id},
460                        )
461                        result = await self._apply_behaviors_after(ctx, result)
462                        self._log_execution_end(task, success=False, steps=len(steps))
463                        trace.set_output(final_output)
464                        return result
465                    # else: control-flow exit — trace closes cleanly, re-raised below
466
467                except Exception as exc:
468                    self._log_execution_error(task, exc)
469                    fallback = await self._apply_behaviors_on_error(ctx, exc)
470                    if fallback is RETRY_SENTINEL:
471                        # RetryBehavior slept and wants us to retry from scratch.
472                        # Mark the trace as failed so it logs "failed" not "finished".
473                        trace.set_error(exc)
474                        ctx.attempt += 1
475                        _retry = True
476                    elif fallback is not None:
477                        return fallback
478                    else:
479                        raise
480
481            if _control_exit is not None:
482                raise _control_exit
483
484            if not _retry:
485                break
486
487    async def stream_execute(
488        self, task: str, context: Optional[Dict[str, Any]] = None
489    ) -> AsyncIterator[AgentStep]:
490        ctx = BehaviorContext(agent_id=self.agent_id, task=task)
491        ctx = await self._apply_behaviors_before(ctx)
492        self._log_execution_start(task)
493
494        steps: List[AgentStep] = []
495        for _ in range(self.max_steps):
496            prompt = self._build_prompt(task, steps)
497            response = await self.llm_gateway.complete(
498                prompt, model=self.model, temperature=self.temperature
499            )
500            step = self._parse_step(response.content)
501            if step is None:
502                step = AgentStep(
503                    thought="Unparseable response.",
504                    action="Final Answer",
505                    action_input={"answer": response.content},
506                )
507
508            if step.action == "Final Answer":
509                step.observation = self._extract_final_answer(step)
510                steps.append(step)
511                yield step
512                return
513
514            if self.tool_registry:
515                try:
516                    tool_result = await self.tool_registry.execute(step.action, **step.action_input)
517                    step.observation = str(tool_result)
518                except Exception as tool_exc:
519                    step.observation = f"Tool error: {tool_exc}"
520            else:
521                step.observation = f"No tool registry — cannot execute '{step.action}'"
522
523            steps.append(step)
524            yield step

ReAct agent: interleaves Reasoning (Thought) and Acting (Action/Observation).

When to use: Any task that requires tool calls (search, API lookup, database queries). The LLM reasons step by step and acts on each step.

When NOT to use: Pure reasoning/analysis tasks with no tools — use ChainOfThoughtAgent instead (cheaper: one LLM call vs. many).

On each step the LLM produces a Thought, an Action (tool name or "Final Answer"), and an Action Input. If the action is a tool call, the tool's output is fed back as an Observation and the loop continues.

The system prompt MUST instruct the LLM to respond using the Thought:/Action:/Action Input: format. Without this contract the regex parser will not match, the agent will treat the first response as a Final Answer, and no tool calls will ever be made. Always include {tool_descriptions} in a custom prompt so the LLM knows what tools are available.

Args: max_steps: Maximum thought/action cycles before stopping (default: 10). model: LLM model name passed to the gateway (optional). temperature: Sampling temperature (default: 0.0 for determinism). system_prompt: Override the default ReAct system prompt. Use {tool_descriptions} as a placeholder if you want the agent's available tools listed in your prompt. The task is always appended separately and does not need a placeholder here.

All other args inherited from BaseAgent.

ReActAgent( *args: Any, max_steps: int = 10, model: Optional[str] = None, temperature: float = 0.0, system_prompt: Optional[str] = None, **kwargs: Any)
80    def __init__(self, *args: Any, max_steps: int = 10, model: Optional[str] = None,
81                 temperature: float = 0.0, system_prompt: Optional[str] = None,
82                 **kwargs: Any) -> None:
83        super().__init__(*args, **kwargs)
84        self.max_steps = max_steps
85        self.model = model
86        self.temperature = temperature
87        self._system_prompt = system_prompt
max_steps
model
temperature
DEFAULT_SYSTEM_PROMPT: str = 'You are an AI assistant that reasons step by step before acting.\n\nYou have access to the following tools:\n{tool_descriptions}\n\nRespond using EXACTLY this format for each step:\nThought: <your reasoning>\nAction: <tool name or "Final Answer">\nAction Input: <JSON object with tool arguments, or your final answer string>\n\nWhen you have enough information, use:\nAction: Final Answer\nAction Input: {{"answer": "<your complete answer>"}}\n'
async def resume_from( self, execution_id: str, from_step: Optional[int] = None) -> AgentResult:
204    async def resume_from(
205        self, execution_id: str, from_step: Optional[int] = None
206    ) -> AgentResult:
207        """Resume a ReAct execution from its latest (or selected) checkpoint."""
208        if not self.checkpoint_manager:
209            raise ValueError("Checkpoint manager is not configured for this agent")
210
211        checkpoint = None
212        if from_step is not None:
213            checkpoints = await self.checkpoint_manager.list_by_execution(execution_id)
214            for ckpt in checkpoints:
215                if ckpt.state.get("step_number") == from_step:
216                    checkpoint = ckpt
217                    break
218        else:
219            checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
220
221        if checkpoint is None:
222            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
223
224        state = checkpoint.state
225        task = str(state.get("task", ""))
226        context = state.get("context") or {}
227        raw_steps = state.get("steps") or []
228        steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)]
229
230        # If the checkpoint already contains a final answer, return idempotently.
231        if steps and steps[-1].action == "Final Answer":
232            answer = self._extract_final_answer(steps[-1])
233            return AgentResult(
234                output=str(answer),
235                steps=steps,
236                success=True,
237                metadata={"execution_id": execution_id},
238            )
239
240        ctx = BehaviorContext(
241            agent_id=self.agent_id,
242            task=task,
243            execution_id=execution_id,
244            last_completed_step=int(state.get("step_number", len(steps) - 1)),
245            metadata={"resume_agent": self},
246        )
247        ctx = await self._apply_behaviors_before(ctx)
248        self._log_execution_start(task)
249
250        for step_num in range(len(steps), self.max_steps):
251            prompt = self._build_prompt(task, steps)
252            response = await self.llm_gateway.complete(
253                prompt, model=self.model, temperature=self.temperature
254            )
255            step = self._parse_step(response.content)
256            if step is None:
257                step = AgentStep(
258                    thought="Could not parse structured response.",
259                    action="Final Answer",
260                    action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))},
261                )
262
263            if step.action == "Final Answer":
264                answer = self._extract_final_answer(step)
265                step.observation = answer
266                steps.append(step)
267                await self._save_checkpoint(execution_id, task, context, steps, step_num)
268                ctx.last_completed_step = step_num
269                result = AgentResult(
270                    output=answer,
271                    steps=steps,
272                    success=True,
273                    metadata={"execution_id": execution_id},
274                )
275                result = await self._apply_behaviors_after(ctx, result)
276                self._log_execution_end(task, success=True, steps=len(steps))
277                return result
278
279            observation = ""
280            if self.tool_registry:
281                try:
282                    tool_result = await self.tool_registry.execute(step.action, **step.action_input)
283                    observation = str(tool_result)
284                except Exception as tool_exc:
285                    observation = f"Tool error: {tool_exc}"
286            else:
287                observation = f"No tool registry — cannot execute '{step.action}'"
288
289            step.observation = observation
290            steps.append(step)
291            await self._save_checkpoint(execution_id, task, context, steps, step_num)
292            ctx.last_completed_step = step_num
293
294        final_output = steps[-1].observation if steps else "No output"
295        result = AgentResult(
296            output=final_output,
297            steps=steps,
298            success=False,
299            error=f"Reached max_steps={self.max_steps} without Final Answer",
300            metadata={"execution_id": execution_id},
301        )
302        result = await self._apply_behaviors_after(ctx, result)
303        self._log_execution_end(task, success=False, steps=len(steps))
304        return result

Resume a ReAct execution from its latest (or selected) checkpoint.

async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AgentResult:
310    async def execute(
311        self, task: str, context: Optional[Dict[str, Any]] = None
312    ) -> AgentResult:
313        execution_id = str(uuid4())
314        run_context = context or {}
315        ctx = BehaviorContext(
316            agent_id=self.agent_id,
317            task=task,
318            execution_id=execution_id,
319            metadata={"resume_agent": self, **run_context},
320        )
321        ctx = await self._apply_behaviors_before(ctx)
322        self._log_execution_start(task)
323
324        while True:
325            _retry = False
326            _control_exit = None  # HumanApprovalRequired or PendingApproval — not errors
327            steps: List[AgentStep] = []
328
329            with self._tracer.trace(
330                "react_agent.execute", input=task, metadata={"agent_id": self.agent_id}
331            ) as trace:
332                try:
333                    for _ in range(self.max_steps):
334                        prompt = self._build_prompt(task, steps)
335
336                        with trace.generation(
337                            "llm_call", model=self.model or "default", input=prompt
338                        ) as gen:
339                            perf_id = None
340                            if self._performance_monitor:
341                                perf_id = self._performance_monitor.start_request(
342                                    provider="llm_gateway", model=self.model or "default"
343                                )
344                            try:
345                                response = await self.llm_gateway.complete(
346                                    prompt, model=self.model, temperature=self.temperature
347                                )
348                                if self._performance_monitor and perf_id:
349                                    self._performance_monitor.end_request(
350                                        request_id=perf_id,
351                                        prompt_tokens=response.usage.get("prompt_tokens", 0),
352                                        completion_tokens=response.usage.get("completion_tokens", 0),
353                                        success=True,
354                                    )
355                                gen.set_output(response.content)
356                                gen.set_token_usage(**response.usage)
357                            except Exception as exc:
358                                if self._performance_monitor and perf_id:
359                                    self._performance_monitor.end_request(
360                                        request_id=perf_id,
361                                        prompt_tokens=0,
362                                        completion_tokens=0,
363                                        success=False,
364                                        error=str(exc),
365                                    )
366                                raise
367
368                        step = self._parse_step(response.content)
369                        if step is None:
370                            # Unparseable response — treat as final answer
371                            step = AgentStep(
372                                thought="Could not parse structured response.",
373                                action="Final Answer",
374                                action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))},
375                            )
376
377                        with trace.span("step", input=step.action) as span:
378                            if step.action == "Final Answer":
379                                answer = self._extract_final_answer(step)
380                                step.observation = answer
381                                steps.append(step)
382                                span.set_output(answer)
383
384                                # Persist to state store if available
385                                if self.state_store:
386                                    await self.state_store.set(
387                                        f"agent:{self.agent_id}:last_steps",
388                                        [vars(s) for s in steps],
389                                        ttl=self.checkpoint_manager.default_ttl if self.checkpoint_manager else None,
390                                    )
391
392                                await self._save_checkpoint(
393                                    execution_id,
394                                    task,
395                                    run_context,
396                                    steps,
397                                    len(steps) - 1,
398                                )
399                                ctx.last_completed_step = len(steps) - 1
400
401                                result = AgentResult(
402                                    output=answer,
403                                    steps=steps,
404                                    success=True,
405                                    metadata={"execution_id": execution_id},
406                                )
407                                try:
408                                    result = await self._apply_behaviors_after(ctx, result)
409                                except (HumanApprovalRequired, PendingApproval) as ctrl_exc:
410                                    _control_exit = ctrl_exc
411
412                                if _control_exit is None:
413                                    self._log_execution_end(task, success=True, steps=len(steps))
414                                    trace.set_output(answer)
415                                    return result
416                                else:
417                                    # Control-flow exit (HumanApprovalRequired / PendingApproval)
418                                    # span output already set above; also set trace output so
419                                    # it logs the answer rather than finishing with no output.
420                                    trace.set_output(answer)
421
422                            else:
423                                # Tool call
424                                observation = ""
425                                if self.tool_registry:
426                                    try:
427                                        tool_result = await self.tool_registry.execute(
428                                            step.action, **step.action_input
429                                        )
430                                        observation = str(tool_result)
431                                    except Exception as tool_exc:
432                                        observation = f"Tool error: {tool_exc}"
433                                else:
434                                    observation = f"No tool registry — cannot execute '{step.action}'"
435
436                                step.observation = observation
437                                span.set_output(observation)
438                                steps.append(step)
439                                await self._save_checkpoint(
440                                    execution_id,
441                                    task,
442                                    run_context,
443                                    steps,
444                                    len(steps) - 1,
445                                )
446                                ctx.last_completed_step = len(steps) - 1
447
448                        if _control_exit is not None:
449                            break  # exit for loop; span already closed cleanly
450
451                    # Max steps reached (or for loop broken by control-flow signal)
452                    if _control_exit is None:
453                        final_output = steps[-1].observation if steps else "No output"
454                        result = AgentResult(
455                            output=final_output,
456                            steps=steps,
457                            success=False,
458                            error=f"Reached max_steps={self.max_steps} without Final Answer",
459                            metadata={"execution_id": execution_id},
460                        )
461                        result = await self._apply_behaviors_after(ctx, result)
462                        self._log_execution_end(task, success=False, steps=len(steps))
463                        trace.set_output(final_output)
464                        return result
465                    # else: control-flow exit — trace closes cleanly, re-raised below
466
467                except Exception as exc:
468                    self._log_execution_error(task, exc)
469                    fallback = await self._apply_behaviors_on_error(ctx, exc)
470                    if fallback is RETRY_SENTINEL:
471                        # RetryBehavior slept and wants us to retry from scratch.
472                        # Mark the trace as failed so it logs "failed" not "finished".
473                        trace.set_error(exc)
474                        ctx.attempt += 1
475                        _retry = True
476                    elif fallback is not None:
477                        return fallback
478                    else:
479                        raise
480
481            if _control_exit is not None:
482                raise _control_exit
483
484            if not _retry:
485                break

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[AgentStep]:
487    async def stream_execute(
488        self, task: str, context: Optional[Dict[str, Any]] = None
489    ) -> AsyncIterator[AgentStep]:
490        ctx = BehaviorContext(agent_id=self.agent_id, task=task)
491        ctx = await self._apply_behaviors_before(ctx)
492        self._log_execution_start(task)
493
494        steps: List[AgentStep] = []
495        for _ in range(self.max_steps):
496            prompt = self._build_prompt(task, steps)
497            response = await self.llm_gateway.complete(
498                prompt, model=self.model, temperature=self.temperature
499            )
500            step = self._parse_step(response.content)
501            if step is None:
502                step = AgentStep(
503                    thought="Unparseable response.",
504                    action="Final Answer",
505                    action_input={"answer": response.content},
506                )
507
508            if step.action == "Final Answer":
509                step.observation = self._extract_final_answer(step)
510                steps.append(step)
511                yield step
512                return
513
514            if self.tool_registry:
515                try:
516                    tool_result = await self.tool_registry.execute(step.action, **step.action_input)
517                    step.observation = str(tool_result)
518                except Exception as tool_exc:
519                    step.observation = f"Tool error: {tool_exc}"
520            else:
521                step.observation = f"No tool registry — cannot execute '{step.action}'"
522
523            steps.append(step)
524            yield step

Execute the task, yielding each step as it completes.

class PlanExecuteAgent(gmf_forge_ai_orchestration.agents.BaseAgent):
 54class PlanExecuteAgent(BaseAgent):
 55    """
 56    Two-phase agent: LLM plans all steps first, then executes each step sequentially.
 57
 58    Phase 1 — Plan: Ask the LLM to decompose the task into a JSON list of steps.
 59    Phase 2 — Execute: Feed each step back to the LLM (with accumulated context) to
 60    produce a result. Each step runs an inner tool-calling loop — if the LLM emits a
 61    ``Thought/Action/Action Input`` block referencing a registered tool, the tool is
 62    invoked and its observation is fed back for the LLM to produce a final step result.
 63
 64    Args:
 65        max_plan_steps: Maximum number of planned steps allowed (default: 10).
 66        max_tool_calls_per_step: Maximum tool calls allowed within a single plan step
 67            (default: 3).
 68        model: LLM model name (optional).
 69        temperature: Sampling temperature (default: 0.1).
 70        plan_prompt: Override the planning prompt. Must contain ``{task}``.
 71        execute_prompt: Override the step-execution prompt. Must contain
 72            ``{task}``, ``{plan}``, ``{previous_results}``, ``{step_num}``,
 73            ``{current_step}``, and ``{tool_section}``.
 74
 75    All other args inherited from :class:`BaseAgent`.
 76    """
 77
 78    def __init__(
 79        self,
 80        *args: Any,
 81        max_plan_steps: int = 10,
 82        max_tool_calls_per_step: int = 3,
 83        model: Optional[str] = None,
 84        temperature: float = 0.1,
 85        plan_prompt: Optional[str] = None,
 86        execute_prompt: Optional[str] = None,
 87        **kwargs: Any,
 88    ) -> None:
 89        super().__init__(*args, **kwargs)
 90        self.max_plan_steps = max_plan_steps
 91        self.max_tool_calls_per_step = max_tool_calls_per_step
 92        self.model = model
 93        self.temperature = temperature
 94        self._plan_prompt = plan_prompt
 95        self._execute_prompt = execute_prompt
 96
 97    #: Default planning prompt. Must contain ``{task}`` if overriding.
 98    DEFAULT_PLAN_PROMPT: str = _PLAN_PROMPT
 99    #: Default step-execution prompt. Must contain ``{task}``, ``{plan}``,
100    #: ``{previous_results}``, ``{step_num}``, ``{current_step}``, and
101    #: ``{tool_section}`` if overriding (``{tool_section}`` is auto-populated
102    #: from the tool registry; pass an empty string if no tools are needed).
103    DEFAULT_EXECUTE_PROMPT: str = _EXECUTE_PROMPT
104
105    # ------------------------------------------------------------------
106    # Phase 1: Plan
107    # ------------------------------------------------------------------
108
109    async def _plan(self, task: str) -> List[str]:
110        template = self._plan_prompt if self._plan_prompt is not None else _PLAN_PROMPT
111        prompt = template.format(task=task)
112        response = await self.llm_gateway.complete(
113            prompt, model=self.model, temperature=0.0
114        )
115        raw = response.content.strip()
116        # Extract JSON array even if wrapped in markdown fences
117        match = re.search(r"\[.*\]", raw, re.DOTALL)
118        if match:
119            try:
120                steps = json.loads(match.group())
121                if isinstance(steps, list):
122                    return [str(s) for s in steps[: self.max_plan_steps]]
123            except (json.JSONDecodeError, ValueError):
124                pass
125        # Fallback: treat each line as a step
126        lines = [ln.strip().lstrip("0123456789.-) ") for ln in raw.splitlines() if ln.strip()]
127        return lines[: self.max_plan_steps]
128
129    # ------------------------------------------------------------------
130    # Phase 2: Execute a single step (with optional tool calls)
131    # ------------------------------------------------------------------
132
133    def _tool_descriptions(self) -> str:
134        if not self.tool_registry:
135            return ""
136        tools = self.tool_registry.list_tools()
137        if not tools:
138            return ""
139        return "\n".join(f"- {t.name}: {t.description}" for t in tools)
140
141    async def _execute_step(
142        self, task: str, plan: List[str], step_num: int, previous: List[str]
143    ) -> str:
144        tool_descriptions = self._tool_descriptions()
145        tool_section = (
146            _TOOL_SECTION.format(tool_descriptions=tool_descriptions)
147            if tool_descriptions
148            else ""
149        )
150        template = self._execute_prompt if self._execute_prompt is not None else _EXECUTE_PROMPT
151        prompt = template.format(
152            task=task,
153            plan="\n".join(f"{i+1}. {s}" for i, s in enumerate(plan)),
154            previous_results="\n".join(
155                f"Step {i+1} result: {r}" for i, r in enumerate(previous)
156            ) or "None yet.",
157            step_num=step_num,
158            current_step=plan[step_num - 1],
159            tool_section=tool_section,
160        )
161
162        # Inner tool-calling loop: allow up to max_tool_calls tool calls per step
163        conversation = prompt
164        for _ in range(self.max_tool_calls_per_step):
165            response = await self.llm_gateway.complete(
166                conversation, model=self.model, temperature=self.temperature
167            )
168            raw = response.content.strip()
169
170            # Check if the LLM wants to call a tool
171            match = _STEP_PATTERN.search(raw)
172            if match and self.tool_registry:
173                action = match.group("action").strip()
174                raw_input = match.group("action_input").strip()
175                try:
176                    action_input = json.loads(raw_input)
177                    if not isinstance(action_input, dict):
178                        action_input = {"raw": raw_input}
179                except (json.JSONDecodeError, ValueError):
180                    action_input = {"raw": raw_input}
181
182                try:
183                    tool_result = await self.tool_registry.execute(action, **action_input)
184                    observation = str(tool_result)
185                except Exception as exc:
186                    observation = f"Tool error: {exc}"
187
188                # Append the tool call + observation to the conversation and continue
189                conversation = (
190                    conversation
191                    + f"\n{raw}\nObservation: {observation}\n"
192                    + "Now provide the final result for this step based on the above observation."
193                )
194            else:
195                # No tool call pattern — treat the response as the step result
196                return raw
197
198        # Exhausted tool call budget — return last response
199        return raw
200
201    async def _save_checkpoint(
202        self,
203        execution_id: str,
204        task: str,
205        context: Dict[str, Any],
206        plan: List[str],
207        steps: List[AgentStep],
208        step_results: List[str],
209        step_number: int,
210    ) -> None:
211        if not self.checkpoint_manager:
212            return
213        await self.checkpoint_manager.save(
214            agent_id=self.agent_id,
215            execution_id=execution_id,
216            state={
217                "execution_id": execution_id,
218                "task": task,
219                "context": context,
220                "plan": plan,
221                "step_number": step_number,
222                "step_results": list(step_results),
223                "steps": [dataclasses.asdict(s) for s in steps],
224            },
225            metadata={"step_number": step_number},
226            ttl=86400,
227        )
228
229    async def resume_from(
230        self, execution_id: str, from_step: Optional[int] = None
231    ) -> AgentResult:
232        """Resume a Plan-Execute run from latest (or selected) checkpoint."""
233        if not self.checkpoint_manager:
234            raise ValueError("Checkpoint manager is not configured for this agent")
235
236        checkpoint = None
237        if from_step is not None:
238            checkpoints = await self.checkpoint_manager.list_by_execution(execution_id)
239            for ckpt in checkpoints:
240                if ckpt.state.get("step_number") == from_step:
241                    checkpoint = ckpt
242                    break
243        else:
244            checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
245
246        if checkpoint is None:
247            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
248
249        state = checkpoint.state
250        task = str(state.get("task", ""))
251        context = state.get("context") or {}
252        plan = [str(s) for s in state.get("plan") or []]
253        raw_steps = state.get("steps") or []
254        steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)]
255        step_results = [str(r) for r in state.get("step_results") or []]
256        current_step = int(state.get("step_number", len(step_results) - 1)) + 1
257
258        ctx = BehaviorContext(
259            agent_id=self.agent_id,
260            task=task,
261            execution_id=execution_id,
262            last_completed_step=max(current_step - 1, -1),
263            metadata={"resume_agent": self},
264        )
265        ctx = await self._apply_behaviors_before(ctx)
266
267        for i in range(current_step, len(plan)):
268            step_num = i + 1
269            result_text = await self._execute_step(task, plan, step_num, step_results)
270            step_results.append(result_text)
271            agent_step = AgentStep(
272                thought=f"Executing plan step {step_num}: {plan[i]}",
273                action="llm_execution",
274                action_input={"step": plan[i]},
275                observation=result_text,
276            )
277            steps.append(agent_step)
278            await self._save_checkpoint(
279                execution_id,
280                task,
281                context,
282                plan,
283                steps,
284                step_results,
285                i,
286            )
287            ctx.last_completed_step = i
288
289        final_output = step_results[-1] if step_results else ""
290        result = AgentResult(
291            output=final_output,
292            steps=steps,
293            success=True,
294            metadata={"plan": plan, "execution_id": execution_id},
295        )
296        return await self._apply_behaviors_after(ctx, result)
297
298    # ------------------------------------------------------------------
299    # BaseAgent implementation
300    # ------------------------------------------------------------------
301
302    async def execute(
303        self, task: str, context: Optional[Dict[str, Any]] = None
304    ) -> AgentResult:
305        execution_id = str(uuid4())
306        run_context = context or {}
307        ctx = BehaviorContext(
308            agent_id=self.agent_id,
309            task=task,
310            execution_id=execution_id,
311            metadata={"resume_agent": self, **run_context},
312        )
313        ctx = await self._apply_behaviors_before(ctx)
314        self._log_execution_start(task)
315
316        with self._tracer.trace(
317            "plan_execute_agent.execute", input=task, metadata={"agent_id": self.agent_id}
318        ) as trace:
319            try:
320                # Phase 1 — Plan
321                with trace.span("plan", input=task) as plan_span:
322                    plan = await self._plan(task)
323                    plan_span.set_output({"steps": plan})
324                    self._logger.info(
325                        "Plan created",
326                        agent_id=self.agent_id,
327                        step_count=len(plan),
328                    )
329                    await self._save_checkpoint(
330                        execution_id,
331                        task,
332                        run_context,
333                        plan,
334                        [],
335                        [],
336                        -1,
337                    )
338
339                # Phase 2 — Execute
340                steps: List[AgentStep] = []
341                step_results: List[str] = []
342                for i, plan_step in enumerate(plan):
343                    step_num = i + 1
344                    with trace.span(f"execute_step_{step_num}", input=plan_step) as step_span:
345                        perf_id = None
346                        if self._performance_monitor:
347                            perf_id = self._performance_monitor.start_request(
348                                provider="llm_gateway",
349                                model=self.model or "default",
350                            )
351                        try:
352                            result_text = await self._execute_step(
353                                task, plan, step_num, step_results
354                            )
355                            if self._performance_monitor and perf_id:
356                                self._performance_monitor.end_request(
357                                    request_id=perf_id,
358                                    prompt_tokens=0,
359                                    completion_tokens=0,
360                                    success=True,
361                                )
362                        except Exception as exc:
363                            if self._performance_monitor and perf_id:
364                                self._performance_monitor.end_request(
365                                    request_id=perf_id,
366                                    prompt_tokens=0,
367                                    completion_tokens=0,
368                                    success=False,
369                                    error=str(exc),
370                                )
371                            raise
372
373                        step_results.append(result_text)
374                        agent_step = AgentStep(
375                            thought=f"Executing plan step {step_num}: {plan_step}",
376                            action="llm_execution",
377                            action_input={"step": plan_step},
378                            observation=result_text,
379                        )
380                        steps.append(agent_step)
381                        step_span.set_output(result_text)
382                        await self._save_checkpoint(
383                            execution_id,
384                            task,
385                            run_context,
386                            plan,
387                            steps,
388                            step_results,
389                            i,
390                        )
391                        ctx.last_completed_step = i
392
393                final_output = step_results[-1] if step_results else ""
394                if self.state_store:
395                    await self.state_store.set(
396                        f"agent:{self.agent_id}:last_steps",
397                        [vars(s) for s in steps],
398                        ttl=3600,
399                    )
400
401                result = AgentResult(
402                    output=final_output,
403                    steps=steps,
404                    success=True,
405                    metadata={"plan": plan, "execution_id": execution_id},
406                )
407                result = await self._apply_behaviors_after(ctx, result)
408                self._log_execution_end(task, success=True, steps=len(steps))
409                trace.set_output(final_output)
410                return result
411
412            except Exception as exc:
413                self._log_execution_error(task, exc)
414                fallback = await self._apply_behaviors_on_error(ctx, exc)
415                if fallback is not None:
416                    return fallback
417                raise
418
419    async def stream_execute(
420        self, task: str, context: Optional[Dict[str, Any]] = None
421    ) -> AsyncIterator[AgentStep]:
422        ctx = BehaviorContext(agent_id=self.agent_id, task=task)
423        ctx = await self._apply_behaviors_before(ctx)
424        self._log_execution_start(task)
425
426        plan = await self._plan(task)
427        step_results: List[str] = []
428        for i, plan_step in enumerate(plan):
429            result_text = await self._execute_step(task, plan, i + 1, step_results)
430            step_results.append(result_text)
431            step = AgentStep(
432                thought=f"Executing plan step {i+1}: {plan_step}",
433                action="llm_execution",
434                action_input={"step": plan_step},
435                observation=result_text,
436            )
437            yield step

Two-phase agent: LLM plans all steps first, then executes each step sequentially.

Phase 1 — Plan: Ask the LLM to decompose the task into a JSON list of steps. Phase 2 — Execute: Feed each step back to the LLM (with accumulated context) to produce a result. Each step runs an inner tool-calling loop — if the LLM emits a Thought/Action/Action Input block referencing a registered tool, the tool is invoked and its observation is fed back for the LLM to produce a final step result.

Args: max_plan_steps: Maximum number of planned steps allowed (default: 10). max_tool_calls_per_step: Maximum tool calls allowed within a single plan step (default: 3). model: LLM model name (optional). temperature: Sampling temperature (default: 0.1). plan_prompt: Override the planning prompt. Must contain {task}. execute_prompt: Override the step-execution prompt. Must contain {task}, {plan}, {previous_results}, {step_num}, {current_step}, and {tool_section}.

All other args inherited from BaseAgent.

PlanExecuteAgent( *args: Any, max_plan_steps: int = 10, max_tool_calls_per_step: int = 3, model: Optional[str] = None, temperature: float = 0.1, plan_prompt: Optional[str] = None, execute_prompt: Optional[str] = None, **kwargs: Any)
78    def __init__(
79        self,
80        *args: Any,
81        max_plan_steps: int = 10,
82        max_tool_calls_per_step: int = 3,
83        model: Optional[str] = None,
84        temperature: float = 0.1,
85        plan_prompt: Optional[str] = None,
86        execute_prompt: Optional[str] = None,
87        **kwargs: Any,
88    ) -> None:
89        super().__init__(*args, **kwargs)
90        self.max_plan_steps = max_plan_steps
91        self.max_tool_calls_per_step = max_tool_calls_per_step
92        self.model = model
93        self.temperature = temperature
94        self._plan_prompt = plan_prompt
95        self._execute_prompt = execute_prompt
max_plan_steps
max_tool_calls_per_step
model
temperature
DEFAULT_PLAN_PROMPT: str = 'You are a planning assistant. Given a task, decompose it into a numbered\nlist of clear, sequential steps. Each step should be a single actionable instruction.\n\nTask: {task}\n\nRespond with ONLY a JSON array of step strings:\n["step 1 description", "step 2 description", ...]\n'
DEFAULT_EXECUTE_PROMPT: str = 'You are executing step {step_num} of a multi-step plan.\n\nOriginal task: {task}\nFull plan: {plan}\nPrevious results:\n{previous_results}\n\nCurrent step: {current_step}\n\n{tool_section}Complete this step and provide the result. Be concise and factual.\n\nIf you need to use a tool, respond with:\nThought: <your reasoning>\nAction: <tool name>\nAction Input: <JSON object with tool arguments>\n\nOtherwise, just provide the result directly.\n'
async def resume_from( self, execution_id: str, from_step: Optional[int] = None) -> AgentResult:
229    async def resume_from(
230        self, execution_id: str, from_step: Optional[int] = None
231    ) -> AgentResult:
232        """Resume a Plan-Execute run from latest (or selected) checkpoint."""
233        if not self.checkpoint_manager:
234            raise ValueError("Checkpoint manager is not configured for this agent")
235
236        checkpoint = None
237        if from_step is not None:
238            checkpoints = await self.checkpoint_manager.list_by_execution(execution_id)
239            for ckpt in checkpoints:
240                if ckpt.state.get("step_number") == from_step:
241                    checkpoint = ckpt
242                    break
243        else:
244            checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
245
246        if checkpoint is None:
247            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
248
249        state = checkpoint.state
250        task = str(state.get("task", ""))
251        context = state.get("context") or {}
252        plan = [str(s) for s in state.get("plan") or []]
253        raw_steps = state.get("steps") or []
254        steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)]
255        step_results = [str(r) for r in state.get("step_results") or []]
256        current_step = int(state.get("step_number", len(step_results) - 1)) + 1
257
258        ctx = BehaviorContext(
259            agent_id=self.agent_id,
260            task=task,
261            execution_id=execution_id,
262            last_completed_step=max(current_step - 1, -1),
263            metadata={"resume_agent": self},
264        )
265        ctx = await self._apply_behaviors_before(ctx)
266
267        for i in range(current_step, len(plan)):
268            step_num = i + 1
269            result_text = await self._execute_step(task, plan, step_num, step_results)
270            step_results.append(result_text)
271            agent_step = AgentStep(
272                thought=f"Executing plan step {step_num}: {plan[i]}",
273                action="llm_execution",
274                action_input={"step": plan[i]},
275                observation=result_text,
276            )
277            steps.append(agent_step)
278            await self._save_checkpoint(
279                execution_id,
280                task,
281                context,
282                plan,
283                steps,
284                step_results,
285                i,
286            )
287            ctx.last_completed_step = i
288
289        final_output = step_results[-1] if step_results else ""
290        result = AgentResult(
291            output=final_output,
292            steps=steps,
293            success=True,
294            metadata={"plan": plan, "execution_id": execution_id},
295        )
296        return await self._apply_behaviors_after(ctx, result)

Resume a Plan-Execute run from latest (or selected) checkpoint.

async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AgentResult:
302    async def execute(
303        self, task: str, context: Optional[Dict[str, Any]] = None
304    ) -> AgentResult:
305        execution_id = str(uuid4())
306        run_context = context or {}
307        ctx = BehaviorContext(
308            agent_id=self.agent_id,
309            task=task,
310            execution_id=execution_id,
311            metadata={"resume_agent": self, **run_context},
312        )
313        ctx = await self._apply_behaviors_before(ctx)
314        self._log_execution_start(task)
315
316        with self._tracer.trace(
317            "plan_execute_agent.execute", input=task, metadata={"agent_id": self.agent_id}
318        ) as trace:
319            try:
320                # Phase 1 — Plan
321                with trace.span("plan", input=task) as plan_span:
322                    plan = await self._plan(task)
323                    plan_span.set_output({"steps": plan})
324                    self._logger.info(
325                        "Plan created",
326                        agent_id=self.agent_id,
327                        step_count=len(plan),
328                    )
329                    await self._save_checkpoint(
330                        execution_id,
331                        task,
332                        run_context,
333                        plan,
334                        [],
335                        [],
336                        -1,
337                    )
338
339                # Phase 2 — Execute
340                steps: List[AgentStep] = []
341                step_results: List[str] = []
342                for i, plan_step in enumerate(plan):
343                    step_num = i + 1
344                    with trace.span(f"execute_step_{step_num}", input=plan_step) as step_span:
345                        perf_id = None
346                        if self._performance_monitor:
347                            perf_id = self._performance_monitor.start_request(
348                                provider="llm_gateway",
349                                model=self.model or "default",
350                            )
351                        try:
352                            result_text = await self._execute_step(
353                                task, plan, step_num, step_results
354                            )
355                            if self._performance_monitor and perf_id:
356                                self._performance_monitor.end_request(
357                                    request_id=perf_id,
358                                    prompt_tokens=0,
359                                    completion_tokens=0,
360                                    success=True,
361                                )
362                        except Exception as exc:
363                            if self._performance_monitor and perf_id:
364                                self._performance_monitor.end_request(
365                                    request_id=perf_id,
366                                    prompt_tokens=0,
367                                    completion_tokens=0,
368                                    success=False,
369                                    error=str(exc),
370                                )
371                            raise
372
373                        step_results.append(result_text)
374                        agent_step = AgentStep(
375                            thought=f"Executing plan step {step_num}: {plan_step}",
376                            action="llm_execution",
377                            action_input={"step": plan_step},
378                            observation=result_text,
379                        )
380                        steps.append(agent_step)
381                        step_span.set_output(result_text)
382                        await self._save_checkpoint(
383                            execution_id,
384                            task,
385                            run_context,
386                            plan,
387                            steps,
388                            step_results,
389                            i,
390                        )
391                        ctx.last_completed_step = i
392
393                final_output = step_results[-1] if step_results else ""
394                if self.state_store:
395                    await self.state_store.set(
396                        f"agent:{self.agent_id}:last_steps",
397                        [vars(s) for s in steps],
398                        ttl=3600,
399                    )
400
401                result = AgentResult(
402                    output=final_output,
403                    steps=steps,
404                    success=True,
405                    metadata={"plan": plan, "execution_id": execution_id},
406                )
407                result = await self._apply_behaviors_after(ctx, result)
408                self._log_execution_end(task, success=True, steps=len(steps))
409                trace.set_output(final_output)
410                return result
411
412            except Exception as exc:
413                self._log_execution_error(task, exc)
414                fallback = await self._apply_behaviors_on_error(ctx, exc)
415                if fallback is not None:
416                    return fallback
417                raise

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[AgentStep]:
419    async def stream_execute(
420        self, task: str, context: Optional[Dict[str, Any]] = None
421    ) -> AsyncIterator[AgentStep]:
422        ctx = BehaviorContext(agent_id=self.agent_id, task=task)
423        ctx = await self._apply_behaviors_before(ctx)
424        self._log_execution_start(task)
425
426        plan = await self._plan(task)
427        step_results: List[str] = []
428        for i, plan_step in enumerate(plan):
429            result_text = await self._execute_step(task, plan, i + 1, step_results)
430            step_results.append(result_text)
431            step = AgentStep(
432                thought=f"Executing plan step {i+1}: {plan_step}",
433                action="llm_execution",
434                action_input={"step": plan_step},
435                observation=result_text,
436            )
437            yield step

Execute the task, yielding each step as it completes.

class ReflexionAgent(gmf_forge_ai_orchestration.agents.BaseAgent):
 31class ReflexionAgent(BaseAgent):
 32    """
 33    Wraps another agent and applies self-reflection on failure.
 34
 35    After each attempt, a separate LLM call critiques the output. If the
 36    critique says the result is unsatisfactory, the agent reflects and retries
 37    with an improved prompt up to ``max_reflections`` times.
 38
 39    Args:
 40        inner_agent: The underlying agent to run and reflect on.
 41        max_reflections: Maximum reflection/retry cycles (default: 2).
 42        model: Model for critique and reflection calls (defaults to inner agent
 43            model or gateway default).
 44        critique_prompt: Override the critique prompt. Must contain ``{task}``
 45            and ``{response}``.
 46        reflect_prompt: Override the reflection/retry prompt. Must contain
 47            ``{task}``, ``{previous_output}``, and ``{critique}``.
 48
 49    All other args inherited from :class:`BaseAgent`.
 50    """
 51
 52    def __init__(
 53        self,
 54        inner_agent: BaseAgent,
 55        *args: Any,
 56        max_reflections: int = 2,
 57        model: Optional[str] = None,
 58        critique_prompt: Optional[str] = None,
 59        reflect_prompt: Optional[str] = None,
 60        **kwargs: Any,
 61    ) -> None:
 62        super().__init__(inner_agent.llm_gateway, *args, **kwargs)
 63        self.inner_agent = inner_agent
 64        self.max_reflections = max_reflections
 65        self.model = model
 66        self._critique_prompt = critique_prompt
 67        self._reflect_prompt = reflect_prompt
 68
 69    #: Default critique prompt. Must contain ``{task}`` and ``{response}`` if overriding.
 70    DEFAULT_CRITIQUE_PROMPT: str = _CRITIQUE_PROMPT
 71    #: Default reflection prompt. Must contain ``{task}``, ``{previous_output}``,
 72    #: and ``{critique}`` if overriding.
 73    DEFAULT_REFLECT_PROMPT: str = _REFLECT_PROMPT
 74
 75    async def _critique(self, task: str, output: str) -> Optional[str]:
 76        """Returns the critique reason string, or None if the output is satisfactory."""
 77        template = self._critique_prompt if self._critique_prompt is not None else _CRITIQUE_PROMPT
 78        prompt = template.format(task=task, response=output)
 79        response = await self.llm_gateway.complete(prompt, model=self.model, temperature=0.0)
 80        text = response.content.strip()
 81        if text.upper().startswith("YES"):
 82            return None
 83        # Extract reason after "NO:"
 84        if ":" in text:
 85            return text.split(":", 1)[1].strip()
 86        return text
 87
 88    async def _reflect_and_retry(
 89        self, task: str, previous_output: str, critique: str
 90    ) -> AgentResult:
 91        template = self._reflect_prompt if self._reflect_prompt is not None else _REFLECT_PROMPT
 92        improved_task = template.format(
 93            task=task, previous_output=previous_output, critique=critique
 94        )
 95        return await self.inner_agent.execute(improved_task)
 96
 97    async def _save_checkpoint(
 98        self,
 99        execution_id: str,
100        task: str,
101        context: Dict[str, Any],
102        reflection_round: int,
103        result: AgentResult,
104        critique: Optional[str] = None,
105    ) -> None:
106        if not self.checkpoint_manager:
107            return
108        await self.checkpoint_manager.save(
109            agent_id=self.agent_id,
110            execution_id=execution_id,
111            state={
112                "execution_id": execution_id,
113                "task": task,
114                "context": context,
115                "reflection_round": reflection_round,
116                "critique": critique,
117                "result": {
118                    "output": result.output,
119                    "success": result.success,
120                    "error": result.error,
121                    "metadata": result.metadata,
122                    "steps": [dataclasses.asdict(s) for s in result.steps],
123                },
124            },
125            metadata={"reflection_round": reflection_round},
126            ttl=86400,
127        )
128
129    async def resume_from(
130        self, execution_id: str, from_reflection: Optional[int] = None
131    ) -> AgentResult:
132        """Resume a reflexion execution from latest (or selected) reflection checkpoint."""
133        if not self.checkpoint_manager:
134            raise ValueError("Checkpoint manager is not configured for this agent")
135
136        checkpoint = None
137        if from_reflection is not None:
138            checkpoints = await self.checkpoint_manager.list_by_execution(execution_id)
139            for ckpt in checkpoints:
140                if ckpt.state.get("reflection_round") == from_reflection:
141                    checkpoint = ckpt
142                    break
143        else:
144            checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
145
146        if checkpoint is None:
147            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
148
149        state = checkpoint.state
150        task = str(state.get("task", ""))
151        context = state.get("context") or {}
152        reflection_round = int(state.get("reflection_round", 0))
153        raw = state.get("result") or {}
154
155        steps = [
156            AgentStep(**s) for s in raw.get("steps", []) if isinstance(s, dict)
157        ]
158        result = AgentResult(
159            output=str(raw.get("output", "")),
160            steps=steps,
161            success=bool(raw.get("success", True)),
162            error=raw.get("error"),
163            metadata=raw.get("metadata", {}),
164        )
165
166        ctx = BehaviorContext(
167            agent_id=self.agent_id,
168            task=task,
169            execution_id=execution_id,
170            last_completed_step=reflection_round,
171            metadata={"resume_agent": self},
172        )
173        ctx = await self._apply_behaviors_before(ctx)
174
175        all_steps: List[AgentStep] = list(result.steps)
176        for idx in range(reflection_round, self.max_reflections):
177            critique = await self._critique(task, result.output)
178            if critique is None:
179                break
180            result = await self._reflect_and_retry(task, result.output, critique)
181            all_steps.extend(result.steps)
182            await self._save_checkpoint(
183                execution_id,
184                task,
185                context,
186                idx + 1,
187                result,
188                critique,
189            )
190            ctx.last_completed_step = idx + 1
191
192        final = AgentResult(
193            output=result.output,
194            steps=all_steps,
195            success=result.success,
196            error=result.error,
197            metadata={**result.metadata, "execution_id": execution_id},
198        )
199        return await self._apply_behaviors_after(ctx, final)
200
201    # ------------------------------------------------------------------
202    # BaseAgent implementation
203    # ------------------------------------------------------------------
204
205    async def execute(
206        self, task: str, context: Optional[Dict[str, Any]] = None
207    ) -> AgentResult:
208        execution_id = str(uuid4())
209        run_context = context or {}
210        ctx = BehaviorContext(
211            agent_id=self.agent_id,
212            task=task,
213            execution_id=execution_id,
214            metadata={"resume_agent": self, **run_context},
215        )
216        ctx = await self._apply_behaviors_before(ctx)
217        self._log_execution_start(task)
218
219        with self._tracer.trace(
220            "reflexion_agent.execute", input=task, metadata={"agent_id": self.agent_id}
221        ) as trace:
222            try:
223                result = await self.inner_agent.execute(task, context)
224                all_steps: List[AgentStep] = list(result.steps)
225                await self._save_checkpoint(execution_id, task, run_context, 0, result)
226                ctx.last_completed_step = 0
227
228                for reflection_num in range(self.max_reflections):
229                    with trace.span(
230                        f"critique_{reflection_num + 1}", input=result.output
231                    ) as cspan:
232                        critique = await self._critique(task, result.output)
233                        cspan.set_output({"satisfactory": critique is None, "critique": critique})
234
235                    if critique is None:
236                        self._logger.info(
237                            "Reflexion: output accepted",
238                            agent_id=self.agent_id,
239                            reflection=reflection_num + 1,
240                        )
241                        break
242
243                    self._logger.info(
244                        "Reflexion: retrying",
245                        agent_id=self.agent_id,
246                        reflection=reflection_num + 1,
247                        critique=critique,
248                    )
249                    if self._metrics:
250                        self._metrics.increment(
251                            "agent.reflexions", agent_id=self.agent_id
252                        )
253
254                    with trace.span(f"reflect_{reflection_num + 1}", input=critique) as rspan:
255                        result = await self._reflect_and_retry(task, result.output, critique)
256                        all_steps.extend(result.steps)
257                        await self._save_checkpoint(
258                            execution_id,
259                            task,
260                            run_context,
261                            reflection_num + 1,
262                            result,
263                            critique,
264                        )
265                        ctx.last_completed_step = reflection_num + 1
266                        rspan.set_output(result.output)
267
268                final = AgentResult(
269                    output=result.output,
270                    steps=all_steps,
271                    success=result.success,
272                    error=result.error,
273                    metadata={**result.metadata, "execution_id": execution_id},
274                )
275                final = await self._apply_behaviors_after(ctx, final)
276                self._log_execution_end(task, success=final.success, steps=len(all_steps))
277                trace.set_output(final.output)
278                return final
279
280            except Exception as exc:
281                self._log_execution_error(task, exc)
282                fallback = await self._apply_behaviors_on_error(ctx, exc)
283                if fallback is not None:
284                    return fallback
285                raise
286
287    async def stream_execute(
288        self, task: str, context: Optional[Dict[str, Any]] = None
289    ) -> AsyncIterator[AgentStep]:
290        # Reflexion does full-cycle reflection, so streaming yields inner steps
291        async for step in self.inner_agent.stream_execute(task, context):
292            yield step

Wraps another agent and applies self-reflection on failure.

After each attempt, a separate LLM call critiques the output. If the critique says the result is unsatisfactory, the agent reflects and retries with an improved prompt up to max_reflections times.

Args: inner_agent: The underlying agent to run and reflect on. max_reflections: Maximum reflection/retry cycles (default: 2). model: Model for critique and reflection calls (defaults to inner agent model or gateway default). critique_prompt: Override the critique prompt. Must contain {task} and {response}. reflect_prompt: Override the reflection/retry prompt. Must contain {task}, {previous_output}, and {critique}.

All other args inherited from BaseAgent.

ReflexionAgent( inner_agent: BaseAgent, *args: Any, max_reflections: int = 2, model: Optional[str] = None, critique_prompt: Optional[str] = None, reflect_prompt: Optional[str] = None, **kwargs: Any)
52    def __init__(
53        self,
54        inner_agent: BaseAgent,
55        *args: Any,
56        max_reflections: int = 2,
57        model: Optional[str] = None,
58        critique_prompt: Optional[str] = None,
59        reflect_prompt: Optional[str] = None,
60        **kwargs: Any,
61    ) -> None:
62        super().__init__(inner_agent.llm_gateway, *args, **kwargs)
63        self.inner_agent = inner_agent
64        self.max_reflections = max_reflections
65        self.model = model
66        self._critique_prompt = critique_prompt
67        self._reflect_prompt = reflect_prompt
inner_agent
max_reflections
model
DEFAULT_CRITIQUE_PROMPT: str = 'Evaluate the following response to the task.\n\nTask: {task}\nResponse: {response}\n\nIs this response satisfactory? Reply with ONLY:\n- "YES" if the response fully and correctly addresses the task.\n- "NO: <brief reason>" if it does not.\n'
DEFAULT_REFLECT_PROMPT: str = 'You just attempted a task and the result was unsatisfactory.\n\nTask: {task}\nPrevious attempt output: {previous_output}\nReason it was unsatisfactory: {critique}\n\nReflect on what went wrong and provide an improved, more thorough answer.\n'
async def resume_from( self, execution_id: str, from_reflection: Optional[int] = None) -> AgentResult:
129    async def resume_from(
130        self, execution_id: str, from_reflection: Optional[int] = None
131    ) -> AgentResult:
132        """Resume a reflexion execution from latest (or selected) reflection checkpoint."""
133        if not self.checkpoint_manager:
134            raise ValueError("Checkpoint manager is not configured for this agent")
135
136        checkpoint = None
137        if from_reflection is not None:
138            checkpoints = await self.checkpoint_manager.list_by_execution(execution_id)
139            for ckpt in checkpoints:
140                if ckpt.state.get("reflection_round") == from_reflection:
141                    checkpoint = ckpt
142                    break
143        else:
144            checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
145
146        if checkpoint is None:
147            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
148
149        state = checkpoint.state
150        task = str(state.get("task", ""))
151        context = state.get("context") or {}
152        reflection_round = int(state.get("reflection_round", 0))
153        raw = state.get("result") or {}
154
155        steps = [
156            AgentStep(**s) for s in raw.get("steps", []) if isinstance(s, dict)
157        ]
158        result = AgentResult(
159            output=str(raw.get("output", "")),
160            steps=steps,
161            success=bool(raw.get("success", True)),
162            error=raw.get("error"),
163            metadata=raw.get("metadata", {}),
164        )
165
166        ctx = BehaviorContext(
167            agent_id=self.agent_id,
168            task=task,
169            execution_id=execution_id,
170            last_completed_step=reflection_round,
171            metadata={"resume_agent": self},
172        )
173        ctx = await self._apply_behaviors_before(ctx)
174
175        all_steps: List[AgentStep] = list(result.steps)
176        for idx in range(reflection_round, self.max_reflections):
177            critique = await self._critique(task, result.output)
178            if critique is None:
179                break
180            result = await self._reflect_and_retry(task, result.output, critique)
181            all_steps.extend(result.steps)
182            await self._save_checkpoint(
183                execution_id,
184                task,
185                context,
186                idx + 1,
187                result,
188                critique,
189            )
190            ctx.last_completed_step = idx + 1
191
192        final = AgentResult(
193            output=result.output,
194            steps=all_steps,
195            success=result.success,
196            error=result.error,
197            metadata={**result.metadata, "execution_id": execution_id},
198        )
199        return await self._apply_behaviors_after(ctx, final)

Resume a reflexion execution from latest (or selected) reflection checkpoint.

async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AgentResult:
205    async def execute(
206        self, task: str, context: Optional[Dict[str, Any]] = None
207    ) -> AgentResult:
208        execution_id = str(uuid4())
209        run_context = context or {}
210        ctx = BehaviorContext(
211            agent_id=self.agent_id,
212            task=task,
213            execution_id=execution_id,
214            metadata={"resume_agent": self, **run_context},
215        )
216        ctx = await self._apply_behaviors_before(ctx)
217        self._log_execution_start(task)
218
219        with self._tracer.trace(
220            "reflexion_agent.execute", input=task, metadata={"agent_id": self.agent_id}
221        ) as trace:
222            try:
223                result = await self.inner_agent.execute(task, context)
224                all_steps: List[AgentStep] = list(result.steps)
225                await self._save_checkpoint(execution_id, task, run_context, 0, result)
226                ctx.last_completed_step = 0
227
228                for reflection_num in range(self.max_reflections):
229                    with trace.span(
230                        f"critique_{reflection_num + 1}", input=result.output
231                    ) as cspan:
232                        critique = await self._critique(task, result.output)
233                        cspan.set_output({"satisfactory": critique is None, "critique": critique})
234
235                    if critique is None:
236                        self._logger.info(
237                            "Reflexion: output accepted",
238                            agent_id=self.agent_id,
239                            reflection=reflection_num + 1,
240                        )
241                        break
242
243                    self._logger.info(
244                        "Reflexion: retrying",
245                        agent_id=self.agent_id,
246                        reflection=reflection_num + 1,
247                        critique=critique,
248                    )
249                    if self._metrics:
250                        self._metrics.increment(
251                            "agent.reflexions", agent_id=self.agent_id
252                        )
253
254                    with trace.span(f"reflect_{reflection_num + 1}", input=critique) as rspan:
255                        result = await self._reflect_and_retry(task, result.output, critique)
256                        all_steps.extend(result.steps)
257                        await self._save_checkpoint(
258                            execution_id,
259                            task,
260                            run_context,
261                            reflection_num + 1,
262                            result,
263                            critique,
264                        )
265                        ctx.last_completed_step = reflection_num + 1
266                        rspan.set_output(result.output)
267
268                final = AgentResult(
269                    output=result.output,
270                    steps=all_steps,
271                    success=result.success,
272                    error=result.error,
273                    metadata={**result.metadata, "execution_id": execution_id},
274                )
275                final = await self._apply_behaviors_after(ctx, final)
276                self._log_execution_end(task, success=final.success, steps=len(all_steps))
277                trace.set_output(final.output)
278                return final
279
280            except Exception as exc:
281                self._log_execution_error(task, exc)
282                fallback = await self._apply_behaviors_on_error(ctx, exc)
283                if fallback is not None:
284                    return fallback
285                raise

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[AgentStep]:
287    async def stream_execute(
288        self, task: str, context: Optional[Dict[str, Any]] = None
289    ) -> AsyncIterator[AgentStep]:
290        # Reflexion does full-cycle reflection, so streaming yields inner steps
291        async for step in self.inner_agent.stream_execute(task, context):
292            yield step

Execute the task, yielding each step as it completes.

class ChainOfThoughtAgent(gmf_forge_ai_orchestration.agents.BaseAgent):
 28class ChainOfThoughtAgent(BaseAgent):
 29    """
 30    Prompts the LLM to reason via a structured ``<thinking>`` scratchpad before
 31    producing a final answer.
 32
 33    **When to use:** Tasks requiring structured multi-step reasoning with no
 34    external tool calls — classification, analysis, summarisation, math, or
 35    any question answerable from the LLM's own knowledge. Single LLM call:
 36    lower latency and cost than :class:`ReActAgent`.
 37
 38    **When NOT to use:** Tasks that require searching a database, calling an
 39    API, or any live data lookup — use :class:`ReActAgent` instead.
 40
 41    .. note::
 42        Passing a ``tool_registry`` to this agent has no effect — tools are
 43        never invoked. If you need tool calls, use :class:`ReActAgent`.
 44
 45    The scratchpad is extracted and stored in the returned :class:`AgentStep`
 46    as the ``thought`` field, while the final answer becomes ``observation``.
 47
 48    Args:
 49        model: LLM model name (optional).
 50        temperature: Sampling temperature (default: 0.1).
 51        system_prompt: Override the default Chain-of-Thought prompt template.
 52            Must contain ``{task}`` as a placeholder where the task will be
 53            inserted.
 54
 55    All other args inherited from :class:`BaseAgent`.
 56    """
 57
 58    def __init__(
 59        self,
 60        *args: Any,
 61        model: Optional[str] = None,
 62        temperature: float = 0.1,
 63        system_prompt: Optional[str] = None,
 64        **kwargs: Any,
 65    ) -> None:
 66        super().__init__(*args, **kwargs)
 67        self.model = model
 68        self.temperature = temperature
 69        self._system_prompt = system_prompt
 70
 71    #: Default CoT prompt template used when no ``system_prompt`` is passed.
 72    #: Must contain ``{task}`` if overriding.
 73    DEFAULT_SYSTEM_PROMPT: str = _COT_PROMPT
 74
 75    def _build_prompt(self, task: str) -> str:
 76        template = self._system_prompt if self._system_prompt is not None else _COT_PROMPT
 77        return template.format(task=task)
 78
 79    def _parse_response(self, text: str) -> tuple[str, str]:
 80        """Returns (thinking_scratchpad, final_answer)."""
 81        match = _THINKING_PATTERN.search(text)
 82        if match:
 83            thinking = match.group(1).strip()
 84            answer = _THINKING_PATTERN.sub("", text).strip()
 85        else:
 86            # No <thinking> block — treat everything as the answer
 87            thinking = ""
 88            answer = text.strip()
 89        return thinking, answer
 90
 91    async def _save_checkpoint(
 92        self,
 93        execution_id: str,
 94        task: str,
 95        context: Dict[str, Any],
 96        thinking: str,
 97        answer: str,
 98    ) -> None:
 99        if not self.checkpoint_manager:
100            return
101        await self.checkpoint_manager.save(
102            agent_id=self.agent_id,
103            execution_id=execution_id,
104            state={
105                "execution_id": execution_id,
106                "task": task,
107                "context": context,
108                "step_number": 0,
109                "steps": [
110                    {
111                        "thought": thinking,
112                        "action": "chain_of_thought",
113                        "action_input": {"task": task},
114                        "observation": answer,
115                        "metadata": {},
116                    }
117                ],
118                "thinking": thinking,
119                "answer": answer,
120            },
121            metadata={"step_number": 0},
122            ttl=86400,
123        )
124
125    async def resume_from(self, execution_id: str) -> AgentResult:
126        """Resume (idempotently) from a completed Chain-of-Thought checkpoint."""
127        if not self.checkpoint_manager:
128            raise ValueError("Checkpoint manager is not configured for this agent")
129
130        checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
131        if checkpoint is None:
132            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
133
134        state = checkpoint.state
135        thinking = str(state.get("thinking", ""))
136        answer = str(state.get("answer", ""))
137        task = str(state.get("task", ""))
138
139        ctx = BehaviorContext(
140            agent_id=self.agent_id,
141            task=task,
142            execution_id=execution_id,
143            last_completed_step=0,
144            metadata={"resume_agent": self},
145        )
146        ctx = await self._apply_behaviors_before(ctx)
147
148        step = AgentStep(
149            thought=thinking,
150            action="chain_of_thought",
151            action_input={"task": task},
152            observation=answer,
153        )
154        result = AgentResult(
155            output=answer,
156            steps=[step],
157            success=True,
158            metadata={"thinking": thinking, "execution_id": execution_id},
159        )
160        return await self._apply_behaviors_after(ctx, result)
161
162    # ------------------------------------------------------------------
163    # BaseAgent implementation
164    # ------------------------------------------------------------------
165
166    async def execute(
167        self, task: str, context: Optional[Dict[str, Any]] = None
168    ) -> AgentResult:
169        execution_id = str(uuid4())
170        run_context = context or {}
171        ctx = BehaviorContext(
172            agent_id=self.agent_id,
173            task=task,
174            execution_id=execution_id,
175            metadata={"resume_agent": self, **run_context},
176        )
177        ctx = await self._apply_behaviors_before(ctx)
178        self._log_execution_start(task)
179
180        with self._tracer.trace(
181            "cot_agent.execute", input=task, metadata={"agent_id": self.agent_id}
182        ) as trace:
183            try:
184                prompt = self._build_prompt(task)
185
186                perf_id = None
187                if self._performance_monitor:
188                    perf_id = self._performance_monitor.start_request(
189                        provider="llm_gateway", model=self.model or "default"
190                    )
191
192                with trace.generation("llm_call", model=self.model or "default", input=prompt) as gen:
193                    try:
194                        response = await self.llm_gateway.complete(
195                            prompt, model=self.model, temperature=self.temperature
196                        )
197                        if self._performance_monitor and perf_id:
198                            self._performance_monitor.end_request(
199                                request_id=perf_id,
200                                prompt_tokens=response.usage.get("prompt_tokens", 0),
201                                completion_tokens=response.usage.get("completion_tokens", 0),
202                                success=True,
203                            )
204                        gen.set_output(response.content)
205                        gen.set_token_usage(**response.usage)
206                    except Exception as exc:
207                        if self._performance_monitor and perf_id:
208                            self._performance_monitor.end_request(
209                                request_id=perf_id,
210                                prompt_tokens=0,
211                                completion_tokens=0,
212                                success=False,
213                                error=str(exc),
214                            )
215                        raise
216
217                thinking, answer = self._parse_response(response.content)
218                step = AgentStep(
219                    thought=thinking,
220                    action="chain_of_thought",
221                    action_input={"task": task},
222                    observation=answer,
223                )
224
225                if self.state_store:
226                    await self.state_store.set(
227                        f"agent:{self.agent_id}:last_steps",
228                        [vars(step)],
229                        ttl=3600,
230                    )
231
232                await self._save_checkpoint(
233                    execution_id=execution_id,
234                    task=task,
235                    context=run_context,
236                    thinking=thinking,
237                    answer=answer,
238                )
239                ctx.last_completed_step = 0
240
241                result = AgentResult(
242                    output=answer,
243                    steps=[step],
244                    success=True,
245                    metadata={"thinking": thinking, "execution_id": execution_id},
246                )
247                result = await self._apply_behaviors_after(ctx, result)
248                self._log_execution_end(task, success=True, steps=1)
249                trace.set_output(answer)
250                return result
251
252            except Exception as exc:
253                self._log_execution_error(task, exc)
254                fallback = await self._apply_behaviors_on_error(ctx, exc)
255                if fallback is not None:
256                    return fallback
257                raise
258
259    async def stream_execute(
260        self, task: str, context: Optional[Dict[str, Any]] = None
261    ) -> AsyncIterator[AgentStep]:
262        result = await self.execute(task, context)
263        for step in result.steps:
264            yield step

Prompts the LLM to reason via a structured <thinking> scratchpad before producing a final answer.

When to use: Tasks requiring structured multi-step reasoning with no external tool calls — classification, analysis, summarisation, math, or any question answerable from the LLM's own knowledge. Single LLM call: lower latency and cost than ReActAgent.

When NOT to use: Tasks that require searching a database, calling an API, or any live data lookup — use ReActAgent instead.

Passing a tool_registry to this agent has no effect — tools are never invoked. If you need tool calls, use ReActAgent.

The scratchpad is extracted and stored in the returned AgentStep as the thought field, while the final answer becomes observation.

Args: model: LLM model name (optional). temperature: Sampling temperature (default: 0.1). system_prompt: Override the default Chain-of-Thought prompt template. Must contain {task} as a placeholder where the task will be inserted.

All other args inherited from BaseAgent.

ChainOfThoughtAgent( *args: Any, model: Optional[str] = None, temperature: float = 0.1, system_prompt: Optional[str] = None, **kwargs: Any)
58    def __init__(
59        self,
60        *args: Any,
61        model: Optional[str] = None,
62        temperature: float = 0.1,
63        system_prompt: Optional[str] = None,
64        **kwargs: Any,
65    ) -> None:
66        super().__init__(*args, **kwargs)
67        self.model = model
68        self.temperature = temperature
69        self._system_prompt = system_prompt
model
temperature
DEFAULT_SYSTEM_PROMPT: str = 'You are a careful, step-by-step reasoning assistant.\n\nBefore answering, work through the problem thoroughly inside <thinking> tags.\nThen provide your final answer outside the tags.\n\nFormat:\n<thinking>\n[Your detailed step-by-step reasoning here]\n</thinking>\n[Your final, concise answer here]\n\nTask: {task}\n'
async def resume_from( self, execution_id: str) -> AgentResult:
125    async def resume_from(self, execution_id: str) -> AgentResult:
126        """Resume (idempotently) from a completed Chain-of-Thought checkpoint."""
127        if not self.checkpoint_manager:
128            raise ValueError("Checkpoint manager is not configured for this agent")
129
130        checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id)
131        if checkpoint is None:
132            raise ValueError(f"No checkpoint found for execution_id={execution_id}")
133
134        state = checkpoint.state
135        thinking = str(state.get("thinking", ""))
136        answer = str(state.get("answer", ""))
137        task = str(state.get("task", ""))
138
139        ctx = BehaviorContext(
140            agent_id=self.agent_id,
141            task=task,
142            execution_id=execution_id,
143            last_completed_step=0,
144            metadata={"resume_agent": self},
145        )
146        ctx = await self._apply_behaviors_before(ctx)
147
148        step = AgentStep(
149            thought=thinking,
150            action="chain_of_thought",
151            action_input={"task": task},
152            observation=answer,
153        )
154        result = AgentResult(
155            output=answer,
156            steps=[step],
157            success=True,
158            metadata={"thinking": thinking, "execution_id": execution_id},
159        )
160        return await self._apply_behaviors_after(ctx, result)

Resume (idempotently) from a completed Chain-of-Thought checkpoint.

async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AgentResult:
166    async def execute(
167        self, task: str, context: Optional[Dict[str, Any]] = None
168    ) -> AgentResult:
169        execution_id = str(uuid4())
170        run_context = context or {}
171        ctx = BehaviorContext(
172            agent_id=self.agent_id,
173            task=task,
174            execution_id=execution_id,
175            metadata={"resume_agent": self, **run_context},
176        )
177        ctx = await self._apply_behaviors_before(ctx)
178        self._log_execution_start(task)
179
180        with self._tracer.trace(
181            "cot_agent.execute", input=task, metadata={"agent_id": self.agent_id}
182        ) as trace:
183            try:
184                prompt = self._build_prompt(task)
185
186                perf_id = None
187                if self._performance_monitor:
188                    perf_id = self._performance_monitor.start_request(
189                        provider="llm_gateway", model=self.model or "default"
190                    )
191
192                with trace.generation("llm_call", model=self.model or "default", input=prompt) as gen:
193                    try:
194                        response = await self.llm_gateway.complete(
195                            prompt, model=self.model, temperature=self.temperature
196                        )
197                        if self._performance_monitor and perf_id:
198                            self._performance_monitor.end_request(
199                                request_id=perf_id,
200                                prompt_tokens=response.usage.get("prompt_tokens", 0),
201                                completion_tokens=response.usage.get("completion_tokens", 0),
202                                success=True,
203                            )
204                        gen.set_output(response.content)
205                        gen.set_token_usage(**response.usage)
206                    except Exception as exc:
207                        if self._performance_monitor and perf_id:
208                            self._performance_monitor.end_request(
209                                request_id=perf_id,
210                                prompt_tokens=0,
211                                completion_tokens=0,
212                                success=False,
213                                error=str(exc),
214                            )
215                        raise
216
217                thinking, answer = self._parse_response(response.content)
218                step = AgentStep(
219                    thought=thinking,
220                    action="chain_of_thought",
221                    action_input={"task": task},
222                    observation=answer,
223                )
224
225                if self.state_store:
226                    await self.state_store.set(
227                        f"agent:{self.agent_id}:last_steps",
228                        [vars(step)],
229                        ttl=3600,
230                    )
231
232                await self._save_checkpoint(
233                    execution_id=execution_id,
234                    task=task,
235                    context=run_context,
236                    thinking=thinking,
237                    answer=answer,
238                )
239                ctx.last_completed_step = 0
240
241                result = AgentResult(
242                    output=answer,
243                    steps=[step],
244                    success=True,
245                    metadata={"thinking": thinking, "execution_id": execution_id},
246                )
247                result = await self._apply_behaviors_after(ctx, result)
248                self._log_execution_end(task, success=True, steps=1)
249                trace.set_output(answer)
250                return result
251
252            except Exception as exc:
253                self._log_execution_error(task, exc)
254                fallback = await self._apply_behaviors_on_error(ctx, exc)
255                if fallback is not None:
256                    return fallback
257                raise

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[AgentStep]:
259    async def stream_execute(
260        self, task: str, context: Optional[Dict[str, Any]] = None
261    ) -> AsyncIterator[AgentStep]:
262        result = await self.execute(task, context)
263        for step in result.steps:
264            yield step

Execute the task, yielding each step as it completes.

class A2AClient(gmf_forge_ai_orchestration.agents.BaseAgent):
 76class A2AClient(BaseAgent):
 77    """
 78    Proxies ``execute()`` calls to a remote A2A-compliant agent service.
 79
 80    Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 ``tasks/send`` over HTTP.
 81    Context (user_assertion, obo_token, locale, etc.) is forwarded as
 82    ``message.metadata`` in the JSON-RPC payload.
 83
 84    This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.)
 85    treat a separately-deployed A2A agent service as a first-class agent — no code
 86    changes needed in the orchestrator.
 87
 88    Args:
 89        endpoint_url: Base URL of the remote agent service
 90            (e.g. ``"http://search-agent:8080"``).
 91            ``/rpc`` is appended automatically.
 92        timeout: HTTP request timeout in seconds (default: 30).
 93        headers: Extra HTTP headers sent with every request (e.g. ``Authorization``).
 94        agent_id: Stable identifier used in logging/metrics. Defaults to ``"A2AClient"``.
 95        logger: Optional :class:`BasicLogger`.
 96        metrics: Optional :class:`BasicMetricsCollector`.
 97
 98    Example::
 99
100        agent = A2AClient(
101            endpoint_url="http://search-agent-service:8080",
102            agent_id="search_agent",
103        )
104        result = await agent.execute("Find recent AI papers")
105
106    Discovery::
107
108        card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
109        agent = A2AClient(
110            endpoint_url=card["url"],
111            agent_id=card["name"],
112        )
113    """
114
115    def __init__(
116        self,
117        endpoint_url: str,
118        timeout: float = 30.0,
119        headers: Optional[Dict[str, str]] = None,
120        behaviors: Optional[List["BaseBehavior"]] = None,
121        agent_id: Optional[str] = None,
122        logger: Optional[BasicLogger] = None,
123        metrics: Optional[BasicMetricsCollector] = None,
124    ) -> None:
125        super().__init__(
126            llm_gateway=None,
127            behaviors=behaviors,
128            agent_id=agent_id or "A2AClient",
129            logger=logger,
130            metrics=metrics,
131        )
132        self.endpoint_url = endpoint_url.rstrip("/")
133        self.timeout = timeout
134        self._headers = {"Content-Type": "application/json", **(headers or {})}
135
136    # ------------------------------------------------------------------
137    # BaseAgent implementation
138    # ------------------------------------------------------------------
139
140    async def execute(
141        self, task: str, context: Optional[Dict[str, Any]] = None
142    ) -> AgentResult:
143        bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {})
144        bctx = await self._apply_behaviors_before(bctx)
145
146        while True:
147            self._log_execution_start(task)
148            try:
149                result = await self._call_remote(task, context or {})
150            except Exception as exc:
151                fallback = await self._apply_behaviors_on_error(bctx, exc)
152                if fallback is RETRY_SENTINEL:
153                    bctx.attempt += 1
154                    continue
155                if fallback is not None:
156                    return fallback  # type: ignore[return-value]
157                raise
158
159            result = await self._apply_behaviors_after(bctx, result)
160            self._log_execution_end(task, result.success, len(result.steps))
161            return result
162
163    async def stream_execute(
164        self, task: str, context: Optional[Dict[str, Any]] = None
165    ) -> AsyncIterator[AgentStep]:
166        """Executes remotely then yields each step from the response."""
167        result = await self.execute(task, context)
168        for step in result.steps:
169            yield step
170
171    # ------------------------------------------------------------------
172    # A2A JSON-RPC transport
173    # ------------------------------------------------------------------
174
175    async def _call_remote(self, task: str, context: Dict[str, Any]) -> AgentResult:
176        """Send an A2A tasks/send JSON-RPC 2.0 request and return an AgentResult."""
177        url = f"{self.endpoint_url}/rpc"
178        request_id = str(uuid4())
179        task_id = str(uuid4())
180
181        payload = JsonRpcRequest(
182            id=request_id,
183            method="tasks/send",
184            params=A2ATaskParams(
185                id=task_id,
186                message=A2AMessage(
187                    parts=[A2APart(type="text", text=task)],
188                    # Context (user_assertion, obo_token, locale, etc.) is carried
189                    # in message.metadata — A2A agents read it from there.
190                    metadata=context,
191                ),
192            ),
193        ).model_dump()
194
195        self._logger.info(
196            "A2AClient tasks/send",
197            url=url,
198            agent_id=self.agent_id,
199            task_id=task_id,
200        )
201
202        async with httpx.AsyncClient(timeout=self.timeout) as client:
203            try:
204                response = await client.post(url, json=payload, headers=self._headers)
205            except httpx.ConnectError as exc:
206                raise A2AClientError(
207                    f"Cannot reach remote agent at {url}: {exc}"
208                ) from exc
209            except httpx.TimeoutException as exc:
210                raise A2AClientError(
211                    f"Remote agent timed out after {self.timeout}s: {exc}"
212                ) from exc
213
214        if response.status_code != 200:
215            raise A2AClientError(
216                f"Remote agent returned HTTP {response.status_code}: {response.text[:200]}"
217            )
218
219        return self._parse_response(response.text)
220
221    def _parse_response(self, body: str) -> AgentResult:
222        """Parse an A2A JSON-RPC tasks/send response into an AgentResult."""
223        try:
224            envelope = json.loads(body)
225        except json.JSONDecodeError as exc:
226            raise A2AClientError(f"Remote agent returned non-JSON body: {body[:200]}") from exc
227
228        if "error" in envelope:
229            err = JsonRpcErrorResponse.model_validate(envelope)
230            raise A2AClientError(
231                f"A2A JSON-RPC error {err.error.code}: {err.error.message}"
232            )
233
234        try:
235            response = JsonRpcResponse.model_validate(envelope)
236        except Exception as exc:
237            raise A2AClientError(f"Malformed A2A response: {exc}") from exc
238
239        task = response.result
240        success = task.status.state == "completed"
241        error_msg: Optional[str] = task.status.message if not success else None
242
243        # Extract text output from the first artifact's first text part
244        output = ""
245        for artifact in task.artifacts:
246            for part in artifact.parts:
247                if part.type == "text":
248                    output = part.text or ""
249                    break
250            if output:
251                break
252
253        # Steps — validate each raw step dict through AgentStepModel
254        steps: List[AgentStep] = []
255        for raw_step in task.metadata.get("steps", []):
256            s = AgentStepModel.model_validate(raw_step)
257            steps.append(
258                AgentStep(
259                    thought=s.thought,
260                    action=s.action,
261                    action_input=s.action_input,
262                    observation=s.observation,
263                    metadata=s.metadata,
264                )
265            )
266
267        # Propagate all task metadata except "steps" (already parsed above)
268        metadata = {k: v for k, v in task.metadata.items() if k != "steps"}
269        metadata["a2a_task_id"] = task.id
270
271        return AgentResult(
272            output=output,
273            steps=steps,
274            metadata=metadata,
275            success=success,
276            error=error_msg,
277        )
278
279    # ------------------------------------------------------------------
280    # A2A discovery helper
281    # ------------------------------------------------------------------
282
283    @staticmethod
284    async def fetch_agent_card(
285        base_url: str,
286        timeout: float = 10.0,
287        headers: Optional[Dict[str, str]] = None,
288    ) -> AgentCard:
289        """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``.
290
291        Returns an :class:`AgentCard` instance. Standard fields: ``name``,
292        ``description``, ``url``, ``version``, ``skills``.
293
294        Args:
295            base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``).
296            timeout: HTTP timeout in seconds.
297            headers: Optional extra request headers.
298
299        Raises:
300            A2AClientError: if the endpoint is unreachable or returns non-200.
301        """
302        url = f"{base_url.rstrip('/')}/.well-known/agent.json"
303        async with httpx.AsyncClient(timeout=timeout) as client:
304            try:
305                response = await client.get(url, headers=headers or {})
306            except httpx.ConnectError as exc:
307                raise A2AClientError(
308                    f"Cannot reach agent card at {url}: {exc}"
309                ) from exc
310            except httpx.TimeoutException as exc:
311                raise A2AClientError(
312                    f"Timed out fetching agent card from {url}: {exc}"
313                ) from exc
314
315        if response.status_code != 200:
316            raise A2AClientError(
317                f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}"
318            )
319
320        try:
321            return AgentCard.model_validate(response.json())
322        except json.JSONDecodeError as exc:
323            raise A2AClientError(
324                f"Agent card at {url} returned non-JSON: {response.text[:200]}"
325            ) from exc
326        except Exception as exc:
327            raise A2AClientError(
328                f"Agent card at {url} has invalid schema: {exc}"
329            ) from exc

Proxies execute() calls to a remote A2A-compliant agent service.

Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 tasks/send over HTTP. Context (user_assertion, obo_token, locale, etc.) is forwarded as message.metadata in the JSON-RPC payload.

This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) treat a separately-deployed A2A agent service as a first-class agent — no code changes needed in the orchestrator.

Args: endpoint_url: Base URL of the remote agent service (e.g. "http://search-agent:8080"). /rpc is appended automatically. timeout: HTTP request timeout in seconds (default: 30). headers: Extra HTTP headers sent with every request (e.g. Authorization). agent_id: Stable identifier used in logging/metrics. Defaults to "A2AClient". logger: Optional BasicLogger. metrics: Optional BasicMetricsCollector.

Example::

agent = A2AClient(
    endpoint_url="http://search-agent-service:8080",
    agent_id="search_agent",
)
result = await agent.execute("Find recent AI papers")

Discovery::

card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
agent = A2AClient(
    endpoint_url=card["url"],
    agent_id=card["name"],
)
A2AClient( endpoint_url: str, timeout: float = 30.0, headers: Optional[Dict[str, str]] = None, behaviors: Optional[List[gmf_forge_ai_orchestration.BaseBehavior]] = None, agent_id: Optional[str] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None)
115    def __init__(
116        self,
117        endpoint_url: str,
118        timeout: float = 30.0,
119        headers: Optional[Dict[str, str]] = None,
120        behaviors: Optional[List["BaseBehavior"]] = None,
121        agent_id: Optional[str] = None,
122        logger: Optional[BasicLogger] = None,
123        metrics: Optional[BasicMetricsCollector] = None,
124    ) -> None:
125        super().__init__(
126            llm_gateway=None,
127            behaviors=behaviors,
128            agent_id=agent_id or "A2AClient",
129            logger=logger,
130            metrics=metrics,
131        )
132        self.endpoint_url = endpoint_url.rstrip("/")
133        self.timeout = timeout
134        self._headers = {"Content-Type": "application/json", **(headers or {})}
endpoint_url
timeout
async def execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AgentResult:
140    async def execute(
141        self, task: str, context: Optional[Dict[str, Any]] = None
142    ) -> AgentResult:
143        bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {})
144        bctx = await self._apply_behaviors_before(bctx)
145
146        while True:
147            self._log_execution_start(task)
148            try:
149                result = await self._call_remote(task, context or {})
150            except Exception as exc:
151                fallback = await self._apply_behaviors_on_error(bctx, exc)
152                if fallback is RETRY_SENTINEL:
153                    bctx.attempt += 1
154                    continue
155                if fallback is not None:
156                    return fallback  # type: ignore[return-value]
157                raise
158
159            result = await self._apply_behaviors_after(bctx, result)
160            self._log_execution_end(task, result.success, len(result.steps))
161            return result

Execute the task and return a result.

async def stream_execute( self, task: str, context: Optional[Dict[str, Any]] = None) -> AsyncIterator[AgentStep]:
163    async def stream_execute(
164        self, task: str, context: Optional[Dict[str, Any]] = None
165    ) -> AsyncIterator[AgentStep]:
166        """Executes remotely then yields each step from the response."""
167        result = await self.execute(task, context)
168        for step in result.steps:
169            yield step

Executes remotely then yields each step from the response.

@staticmethod
async def fetch_agent_card( base_url: str, timeout: float = 10.0, headers: Optional[Dict[str, str]] = None) -> gmf_forge_ai_orchestration.protocols.a2a.AgentCard:
283    @staticmethod
284    async def fetch_agent_card(
285        base_url: str,
286        timeout: float = 10.0,
287        headers: Optional[Dict[str, str]] = None,
288    ) -> AgentCard:
289        """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``.
290
291        Returns an :class:`AgentCard` instance. Standard fields: ``name``,
292        ``description``, ``url``, ``version``, ``skills``.
293
294        Args:
295            base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``).
296            timeout: HTTP timeout in seconds.
297            headers: Optional extra request headers.
298
299        Raises:
300            A2AClientError: if the endpoint is unreachable or returns non-200.
301        """
302        url = f"{base_url.rstrip('/')}/.well-known/agent.json"
303        async with httpx.AsyncClient(timeout=timeout) as client:
304            try:
305                response = await client.get(url, headers=headers or {})
306            except httpx.ConnectError as exc:
307                raise A2AClientError(
308                    f"Cannot reach agent card at {url}: {exc}"
309                ) from exc
310            except httpx.TimeoutException as exc:
311                raise A2AClientError(
312                    f"Timed out fetching agent card from {url}: {exc}"
313                ) from exc
314
315        if response.status_code != 200:
316            raise A2AClientError(
317                f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}"
318            )
319
320        try:
321            return AgentCard.model_validate(response.json())
322        except json.JSONDecodeError as exc:
323            raise A2AClientError(
324                f"Agent card at {url} returned non-JSON: {response.text[:200]}"
325            ) from exc
326        except Exception as exc:
327            raise A2AClientError(
328                f"Agent card at {url} has invalid schema: {exc}"
329            ) from exc

Fetch the A2A Agent Card from GET /.well-known/agent.json.

Returns an AgentCard instance. Standard fields: name, description, url, version, skills.

Args: base_url: Base URL of the remote agent (e.g. "http://agent:8080"). timeout: HTTP timeout in seconds. headers: Optional extra request headers.

Raises: A2AClientError: if the endpoint is unreachable or returns non-200.

class A2AClientError(builtins.RuntimeError):
72class A2AClientError(RuntimeError):
73    """Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol."""

Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol.

class A2AAdapter:
 43class A2AAdapter:
 44    """Protocol-only A2A receiver adapter.
 45
 46    The adapter does not know about tools, checkpoints, routing, or any other
 47    orchestration concerns. It only translates between the A2A wire protocol and
 48    the host agent's execution callback.
 49    """
 50
 51    def __init__(
 52        self,
 53        agent_id: str,
 54        description: str,
 55        url: str,
 56        version: str = "0.1.0",
 57        skills: Optional[Sequence[Mapping[str, Any]]] = None,
 58        logger: Optional[BasicLogger] = None,
 59    ) -> None:
 60        self.agent_id = agent_id
 61        self.description = description
 62        self.url = url.rstrip("/")
 63        self.version = version
 64        self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])]
 65        self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
 66
 67    def agent_card(self) -> AgentCard:
 68        """Return the A2A Agent Card for ``GET /.well-known/agent.json``."""
 69        return AgentCard(
 70            name=self.agent_id,
 71            description=self.description,
 72            url=self.url,
 73            version=self.version,
 74            skills=[AgentSkill(**s) for s in self._skills],
 75        )
 76
 77    def build_jsonrpc_error(
 78        self,
 79        rpc_id: Optional[str],
 80        code: int,
 81        message: str,
 82        data: Optional[Any] = None,
 83    ) -> Dict[str, Any]:
 84        """Build a JSON-RPC 2.0 error response."""
 85        return JsonRpcErrorResponse(
 86            id=rpc_id,
 87            error=JsonRpcError(code=code, message=message, data=data),
 88        ).model_dump()
 89
 90    def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]:
 91        """Parse a ``tasks/send`` JSON-RPC request body.
 92
 93        Returns:
 94            A tuple of ``(rpc_id, task_id, task_text, context)``.
 95        """
 96        method = str(body.get("method", ""))
 97        if method != "tasks/send":
 98            raise A2AAdapterError(f"Method not found: {method}")
 99
100        params = body.get("params", {})
101        if not isinstance(params, Mapping):
102            raise A2AAdapterError("Invalid params payload")
103
104        rpc_id = body.get("id")
105        task_id = str(params.get("id") or uuid4())
106
107        message = params.get("message", {})
108        if not isinstance(message, Mapping):
109            raise A2AAdapterError("Invalid message payload")
110
111        task_text = self._extract_task_text(message.get("parts", []))
112        context = message.get("metadata", {})
113        if not isinstance(context, Mapping):
114            raise A2AAdapterError("Invalid message metadata payload")
115
116        return rpc_id, task_id, task_text, dict(context)
117
118    def build_task_response(
119        self,
120        rpc_id: Optional[str],
121        task_id: str,
122        result: AgentResult,
123    ) -> Dict[str, Any]:
124        """Convert an :class:`AgentResult` into an A2A JSON-RPC success response."""
125        metadata = dict(result.metadata)
126        metadata["steps"] = [self._serialize_step(step) for step in result.steps]
127        metadata.setdefault("a2a_task_id", task_id)
128
129        artifacts = (
130            [A2AArtifact(parts=[A2APart(type="text", text=result.output)])]
131            if result.output
132            else []
133        )
134
135        return JsonRpcResponse(
136            id=rpc_id,
137            result=A2ATask(
138                id=task_id,
139                status=A2ATaskStatus(
140                    state="completed" if result.success else "failed",
141                    message=result.error if not result.success else None,
142                ),
143                artifacts=artifacts,
144                metadata=metadata,
145            ),
146        ).model_dump()
147
148    async def handle_rpc(
149        self,
150        body: Mapping[str, Any],
151        execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]],
152    ) -> Dict[str, Any]:
153        """Handle an incoming A2A JSON-RPC body and return the response payload."""
154        rpc_id = body.get("id") if isinstance(body, Mapping) else None
155
156        try:
157            rpc_id, task_id, task_text, context = self.parse_tasks_send(body)
158        except A2AAdapterError as exc:
159            code = -32601 if str(exc).startswith("Method not found") else -32602
160            return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc))
161
162        self._logger.info(
163            "A2A tasks/send received",
164            agent_id=self.agent_id,
165            task_id=task_id,
166            task_preview=task_text[:80],
167        )
168
169        try:
170            result = await execute(task_text, context)
171        except Exception as exc:
172            self._logger.error(
173                "Agent execution failed",
174                agent_id=self.agent_id,
175                task_id=task_id,
176                error=str(exc),
177            )
178            return self.build_jsonrpc_error(
179                rpc_id if isinstance(rpc_id, str) else None,
180                -32603,
181                str(exc),
182            )
183
184        return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)
185
186    def _extract_task_text(self, parts: Any) -> str:
187        if not isinstance(parts, Sequence):
188            raise A2AAdapterError("Invalid message parts payload")
189
190        for part in parts:
191            if isinstance(part, Mapping) and part.get("type") == "text":
192                return str(part.get("text", ""))
193        return ""
194
195    def _serialize_step(self, step: AgentStep) -> Dict[str, Any]:
196        return AgentStepModel(
197            thought=step.thought,
198            action=step.action,
199            action_input=step.action_input,
200            observation=step.observation,
201            metadata=step.metadata,
202        ).model_dump()

Protocol-only A2A receiver adapter.

The adapter does not know about tools, checkpoints, routing, or any other orchestration concerns. It only translates between the A2A wire protocol and the host agent's execution callback.

A2AAdapter( agent_id: str, description: str, url: str, version: str = '0.1.0', skills: Optional[Sequence[Mapping[str, Any]]] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None)
51    def __init__(
52        self,
53        agent_id: str,
54        description: str,
55        url: str,
56        version: str = "0.1.0",
57        skills: Optional[Sequence[Mapping[str, Any]]] = None,
58        logger: Optional[BasicLogger] = None,
59    ) -> None:
60        self.agent_id = agent_id
61        self.description = description
62        self.url = url.rstrip("/")
63        self.version = version
64        self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])]
65        self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
agent_id
description
url
version
def agent_card(self) -> gmf_forge_ai_orchestration.protocols.a2a.AgentCard:
67    def agent_card(self) -> AgentCard:
68        """Return the A2A Agent Card for ``GET /.well-known/agent.json``."""
69        return AgentCard(
70            name=self.agent_id,
71            description=self.description,
72            url=self.url,
73            version=self.version,
74            skills=[AgentSkill(**s) for s in self._skills],
75        )

Return the A2A Agent Card for GET /.well-known/agent.json.

def build_jsonrpc_error( self, rpc_id: Optional[str], code: int, message: str, data: Optional[Any] = None) -> Dict[str, Any]:
77    def build_jsonrpc_error(
78        self,
79        rpc_id: Optional[str],
80        code: int,
81        message: str,
82        data: Optional[Any] = None,
83    ) -> Dict[str, Any]:
84        """Build a JSON-RPC 2.0 error response."""
85        return JsonRpcErrorResponse(
86            id=rpc_id,
87            error=JsonRpcError(code=code, message=message, data=data),
88        ).model_dump()

Build a JSON-RPC 2.0 error response.

def parse_tasks_send( self, body: Mapping[str, typing.Any]) -> tuple[typing.Optional[str], str, str, typing.Dict[str, typing.Any]]:
 90    def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]:
 91        """Parse a ``tasks/send`` JSON-RPC request body.
 92
 93        Returns:
 94            A tuple of ``(rpc_id, task_id, task_text, context)``.
 95        """
 96        method = str(body.get("method", ""))
 97        if method != "tasks/send":
 98            raise A2AAdapterError(f"Method not found: {method}")
 99
100        params = body.get("params", {})
101        if not isinstance(params, Mapping):
102            raise A2AAdapterError("Invalid params payload")
103
104        rpc_id = body.get("id")
105        task_id = str(params.get("id") or uuid4())
106
107        message = params.get("message", {})
108        if not isinstance(message, Mapping):
109            raise A2AAdapterError("Invalid message payload")
110
111        task_text = self._extract_task_text(message.get("parts", []))
112        context = message.get("metadata", {})
113        if not isinstance(context, Mapping):
114            raise A2AAdapterError("Invalid message metadata payload")
115
116        return rpc_id, task_id, task_text, dict(context)

Parse a tasks/send JSON-RPC request body.

Returns: A tuple of (rpc_id, task_id, task_text, context).

def build_task_response( self, rpc_id: Optional[str], task_id: str, result: AgentResult) -> Dict[str, Any]:
118    def build_task_response(
119        self,
120        rpc_id: Optional[str],
121        task_id: str,
122        result: AgentResult,
123    ) -> Dict[str, Any]:
124        """Convert an :class:`AgentResult` into an A2A JSON-RPC success response."""
125        metadata = dict(result.metadata)
126        metadata["steps"] = [self._serialize_step(step) for step in result.steps]
127        metadata.setdefault("a2a_task_id", task_id)
128
129        artifacts = (
130            [A2AArtifact(parts=[A2APart(type="text", text=result.output)])]
131            if result.output
132            else []
133        )
134
135        return JsonRpcResponse(
136            id=rpc_id,
137            result=A2ATask(
138                id=task_id,
139                status=A2ATaskStatus(
140                    state="completed" if result.success else "failed",
141                    message=result.error if not result.success else None,
142                ),
143                artifacts=artifacts,
144                metadata=metadata,
145            ),
146        ).model_dump()

Convert an AgentResult into an A2A JSON-RPC success response.

async def handle_rpc( self, body: Mapping[str, typing.Any], execute: Callable[[str, typing.Dict[str, typing.Any]], Awaitable[AgentResult]]) -> Dict[str, Any]:
148    async def handle_rpc(
149        self,
150        body: Mapping[str, Any],
151        execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]],
152    ) -> Dict[str, Any]:
153        """Handle an incoming A2A JSON-RPC body and return the response payload."""
154        rpc_id = body.get("id") if isinstance(body, Mapping) else None
155
156        try:
157            rpc_id, task_id, task_text, context = self.parse_tasks_send(body)
158        except A2AAdapterError as exc:
159            code = -32601 if str(exc).startswith("Method not found") else -32602
160            return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc))
161
162        self._logger.info(
163            "A2A tasks/send received",
164            agent_id=self.agent_id,
165            task_id=task_id,
166            task_preview=task_text[:80],
167        )
168
169        try:
170            result = await execute(task_text, context)
171        except Exception as exc:
172            self._logger.error(
173                "Agent execution failed",
174                agent_id=self.agent_id,
175                task_id=task_id,
176                error=str(exc),
177            )
178            return self.build_jsonrpc_error(
179                rpc_id if isinstance(rpc_id, str) else None,
180                -32603,
181                str(exc),
182            )
183
184        return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)

Handle an incoming A2A JSON-RPC body and return the response payload.

class A2AAdapterError(builtins.RuntimeError):
39class A2AAdapterError(RuntimeError):
40    """Raised when an incoming A2A request is malformed or unsupported."""

Raised when an incoming A2A request is malformed or unsupported.