gmf_forge_ai_orchestration.agents
Agents — ReAct, Plan-Execute, Reflexion, Chain-of-Thought, and A2A.
Choosing the right agent
ReActAgent
Use when the task requires tool calls (search, lookup, API calls).
Interleaves reasoning (Thought) and acting (Action/Observation) in a loop.
The system prompt MUST include the Thought:/Action:/Action Input: format
contract — without it the agent degrades to a single-pass answer with no
tool calls. Supports a custom system_prompt.
ChainOfThoughtAgent
Use when the task requires structured reasoning but NO tool calls.
A single LLM call that reasons inside <thinking> tags before answering.
Simpler and cheaper than ReAct — good for classification, analysis, or
multi-step math. Supports a custom system_prompt.
PlanExecuteAgent
Use when the task has clearly separable sequential steps known upfront.
Phase 1 asks the LLM to decompose the task into a JSON step list.
Phase 2 executes each step sequentially; each step can call tools via
an inner Thought/Action loop. Good for structured workflows (e.g., search
→ summarise → write). Supports custom plan_prompt and execute_prompt.
ReflexionAgent
Wrapper — not a standalone agent. Adds a self-critique-and-retry loop
around any inner agent. Use when output quality matters more than latency
and a second-pass correction is acceptable.
A2AClient
Use to call a separately-deployed A2A-compliant agent service.
Sends tasks via JSON-RPC 2.0 tasks/send (A2A protocol).
Discovers agent capabilities via GET /.well-known/agent.json (Agent Card).
No local LLM or tools needed.
A2AAdapter
Use on the receiving side of an A2A service.
Handles the A2A protocol boilerplate for discovery and tasks/send
dispatch while leaving business logic to the host agent.
1"""Agents — ReAct, Plan-Execute, Reflexion, Chain-of-Thought, and A2A. 2 3Choosing the right agent 4------------------------ 5 6:class:`ReActAgent` 7 **Use when the task requires tool calls** (search, lookup, API calls). 8 Interleaves reasoning (Thought) and acting (Action/Observation) in a loop. 9 The system prompt MUST include the ``Thought:/Action:/Action Input:`` format 10 contract — without it the agent degrades to a single-pass answer with no 11 tool calls. Supports a custom ``system_prompt``. 12 13:class:`ChainOfThoughtAgent` 14 **Use when the task requires structured reasoning but NO tool calls.** 15 A single LLM call that reasons inside ``<thinking>`` tags before answering. 16 Simpler and cheaper than ReAct — good for classification, analysis, or 17 multi-step math. Supports a custom ``system_prompt``. 18 19:class:`PlanExecuteAgent` 20 **Use when the task has clearly separable sequential steps known upfront.** 21 Phase 1 asks the LLM to decompose the task into a JSON step list. 22 Phase 2 executes each step sequentially; each step can call tools via 23 an inner Thought/Action loop. Good for structured workflows (e.g., search 24 → summarise → write). Supports custom ``plan_prompt`` and ``execute_prompt``. 25 26:class:`ReflexionAgent` 27 **Wrapper — not a standalone agent.** Adds a self-critique-and-retry loop 28 around any inner agent. Use when output quality matters more than latency 29 and a second-pass correction is acceptable. 30 31:class:`A2AClient` 32 **Use to call a separately-deployed A2A-compliant agent service.** 33 Sends tasks via JSON-RPC 2.0 ``tasks/send`` (A2A protocol). 34 Discovers agent capabilities via ``GET /.well-known/agent.json`` (Agent Card). 35 No local LLM or tools needed. 36 37:class:`A2AAdapter` 38 **Use on the receiving side of an A2A service.** 39 Handles the A2A protocol boilerplate for discovery and ``tasks/send`` 40 dispatch while leaving business logic to the host agent. 41""" 42 43from gmf_forge_ai_orchestration.agents.base import AgentResult, AgentStep, BaseAgent 44from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent 45from gmf_forge_ai_orchestration.agents.plan_execute_agent import PlanExecuteAgent 46from gmf_forge_ai_orchestration.agents.reflexion_agent import ReflexionAgent 47from gmf_forge_ai_orchestration.agents.chain_of_thought_agent import ChainOfThoughtAgent 48from gmf_forge_ai_orchestration.protocols.a2a.a2a_client import ( 49 A2AClient, 50 A2AClientError, 51) 52from gmf_forge_ai_orchestration.protocols.a2a.a2a_adapter import ( 53 A2AAdapter, 54 A2AAdapterError, 55) 56 57__all__ = [ 58 "AgentResult", 59 "AgentStep", 60 "BaseAgent", 61 "ReActAgent", 62 "PlanExecuteAgent", 63 "ReflexionAgent", 64 "ChainOfThoughtAgent", 65 "A2AClient", 66 "A2AClientError", 67 "A2AAdapter", 68 "A2AAdapterError", 69]
34@dataclass 35class AgentResult: 36 """The final result of an agent execution.""" 37 38 output: str 39 steps: List[AgentStep] = field(default_factory=list) 40 metadata: Dict[str, Any] = field(default_factory=dict) 41 success: bool = True 42 error: Optional[str] = None
The final result of an agent execution.
23@dataclass 24class AgentStep: 25 """One thought/action/observation cycle within an agent execution.""" 26 27 thought: str 28 action: str 29 action_input: Dict[str, Any] = field(default_factory=dict) 30 observation: str = "" 31 metadata: Dict[str, Any] = field(default_factory=dict)
One thought/action/observation cycle within an agent execution.
45class BaseAgent(ABC): 46 """ 47 Abstract base class for all agents. 48 49 Wires together: 50 - :class:`UnifiedLLMGateway` for LLM calls 51 - :class:`ToolRegistry` for tool discovery and execution 52 - :class:`BaseBehavior` pipeline applied around every execution 53 - :class:`BaseStateStore` for persisting conversation/step state 54 - Full shared-core observability stack (Logger, Metrics, PerformanceMonitor, Tracing) 55 56 Args: 57 llm_gateway: Required. The LLM gateway to use for completions. 58 tool_registry: Optional tool registry. Provides available tools to the agent. 59 behaviors: Ordered list of behaviors applied around each execution. 60 state_store: Optional state store for persisting steps and conversation. 61 agent_id: Stable identifier used in logging and metrics. Defaults to class name. 62 logger: Optional :class:`BasicLogger`. Created automatically if omitted. 63 metrics: Optional :class:`BasicMetricsCollector`. 64 performance_monitor: Optional :class:`BasicPerformanceMonitor`. 65 tracer: Optional :class:`TracingProvider`. Falls back to ``get_tracer()``. 66 """ 67 68 def __init__( 69 self, 70 llm_gateway: Optional["UnifiedLLMGateway"] = None, 71 tool_registry: Optional["ToolRegistry"] = None, 72 behaviors: Optional[List["BaseBehavior"]] = None, 73 state_store: Optional["BaseStateStore"] = None, 74 checkpoint_manager: Optional["CheckpointManager"] = None, 75 agent_id: Optional[str] = None, 76 logger: Optional[BasicLogger] = None, 77 metrics: Optional[BasicMetricsCollector] = None, 78 performance_monitor: Optional[BasicPerformanceMonitor] = None, 79 tracer: Optional[TracingProvider] = None, 80 ) -> None: 81 self.llm_gateway = llm_gateway 82 self.tool_registry = tool_registry 83 self.behaviors: List["BaseBehavior"] = behaviors or [] 84 self.state_store = state_store 85 self.checkpoint_manager = checkpoint_manager 86 self.agent_id = agent_id or self.__class__.__name__ 87 self._logger = logger or BasicLogger(f"gmf_forge_ai.agent.{self.agent_id}") 88 self._metrics = metrics 89 self._performance_monitor = performance_monitor 90 self._tracer = tracer or get_tracer() 91 92 # ------------------------------------------------------------------ 93 # Abstract interface 94 # ------------------------------------------------------------------ 95 96 @abstractmethod 97 async def execute( 98 self, task: str, context: Optional[Dict[str, Any]] = None 99 ) -> AgentResult: 100 """Execute the task and return a result.""" 101 102 @abstractmethod 103 async def stream_execute( 104 self, task: str, context: Optional[Dict[str, Any]] = None 105 ) -> AsyncIterator[AgentStep]: 106 """Execute the task, yielding each step as it completes.""" 107 108 # ------------------------------------------------------------------ 109 # Behavior pipeline helpers (called by subclasses) 110 # ------------------------------------------------------------------ 111 112 async def _apply_behaviors_before( 113 self, context: "BehaviorContext" 114 ) -> "BehaviorContext": 115 for behavior in self.behaviors: 116 context = await behavior.before_execute(context) 117 return context 118 119 async def _apply_behaviors_after( 120 self, context: "BehaviorContext", result: AgentResult 121 ) -> AgentResult: 122 for behavior in self.behaviors: 123 result = await behavior.after_execute(context, result) 124 return result 125 126 async def _apply_behaviors_on_error( 127 self, context: "BehaviorContext", error: Exception 128 ) -> Optional[AgentResult]: 129 for behavior in self.behaviors: 130 fallback = await behavior.on_error(context, error) 131 if fallback is not None: 132 return fallback 133 return None 134 135 # ------------------------------------------------------------------ 136 # Shared observability helpers (called by subclasses) 137 # ------------------------------------------------------------------ 138 139 def _log_execution_start(self, task: str) -> None: 140 self._logger.info( 141 "Agent execution started", agent_id=self.agent_id, task=task 142 ) 143 if self._metrics: 144 self._metrics.increment("agent.executions", agent_id=self.agent_id) 145 146 def _log_execution_end(self, task: str, success: bool, steps: int) -> None: 147 self._logger.info( 148 "Agent execution finished", 149 agent_id=self.agent_id, 150 task=task, 151 success=success, 152 steps=steps, 153 ) 154 if self._metrics: 155 self._metrics.histogram("agent.steps", steps, agent_id=self.agent_id) 156 157 def _log_execution_error(self, task: str, error: Exception) -> None: 158 self._logger.error( 159 "Agent execution error", 160 agent_id=self.agent_id, 161 task=task, 162 error=str(error), 163 error_type=type(error).__name__, 164 ) 165 if self._metrics: 166 self._metrics.increment("agent.errors", agent_id=self.agent_id)
Abstract base class for all agents.
Wires together:
UnifiedLLMGatewayfor LLM callsToolRegistryfor tool discovery and executionBaseBehaviorpipeline applied around every executionBaseStateStorefor persisting conversation/step state- Full shared-core observability stack (Logger, Metrics, PerformanceMonitor, Tracing)
Args:
llm_gateway: Required. The LLM gateway to use for completions.
tool_registry: Optional tool registry. Provides available tools to the agent.
behaviors: Ordered list of behaviors applied around each execution.
state_store: Optional state store for persisting steps and conversation.
agent_id: Stable identifier used in logging and metrics. Defaults to class name.
logger: Optional BasicLogger. Created automatically if omitted.
metrics: Optional BasicMetricsCollector.
performance_monitor: Optional BasicPerformanceMonitor.
tracer: Optional TracingProvider. Falls back to get_tracer().
96 @abstractmethod 97 async def execute( 98 self, task: str, context: Optional[Dict[str, Any]] = None 99 ) -> AgentResult: 100 """Execute the task and return a result."""
Execute the task and return a result.
102 @abstractmethod 103 async def stream_execute( 104 self, task: str, context: Optional[Dict[str, Any]] = None 105 ) -> AsyncIterator[AgentStep]: 106 """Execute the task, yielding each step as it completes."""
Execute the task, yielding each step as it completes.
46class ReActAgent(BaseAgent): 47 """ 48 ReAct agent: interleaves Reasoning (Thought) and Acting (Action/Observation). 49 50 **When to use:** Any task that requires tool calls (search, API lookup, 51 database queries). The LLM reasons step by step and acts on each step. 52 53 **When NOT to use:** Pure reasoning/analysis tasks with no tools — use 54 :class:`ChainOfThoughtAgent` instead (cheaper: one LLM call vs. many). 55 56 On each step the LLM produces a Thought, an Action (tool name or 57 ``"Final Answer"``), and an Action Input. If the action is a tool call, 58 the tool's output is fed back as an Observation and the loop continues. 59 60 .. warning:: 61 The system prompt MUST instruct the LLM to respond using the 62 ``Thought:/Action:/Action Input:`` format. Without this contract the 63 regex parser will not match, the agent will treat the first response 64 as a Final Answer, and **no tool calls will ever be made**. 65 Always include ``{tool_descriptions}`` in a custom prompt so the LLM 66 knows what tools are available. 67 68 Args: 69 max_steps: Maximum thought/action cycles before stopping (default: 10). 70 model: LLM model name passed to the gateway (optional). 71 temperature: Sampling temperature (default: 0.0 for determinism). 72 system_prompt: Override the default ReAct system prompt. Use 73 ``{tool_descriptions}`` as a placeholder if you want the agent's 74 available tools listed in your prompt. The task is always appended 75 separately and does not need a placeholder here. 76 77 All other args inherited from :class:`BaseAgent`. 78 """ 79 80 def __init__(self, *args: Any, max_steps: int = 10, model: Optional[str] = None, 81 temperature: float = 0.0, system_prompt: Optional[str] = None, 82 **kwargs: Any) -> None: 83 super().__init__(*args, **kwargs) 84 self.max_steps = max_steps 85 self.model = model 86 self.temperature = temperature 87 self._system_prompt = system_prompt 88 89 #: Default ReAct system prompt used when no ``system_prompt`` is passed. 90 #: Inspect this to understand the expected format or use it as a base 91 #: for your own customisations. 92 DEFAULT_SYSTEM_PROMPT: str = _REACT_SYSTEM 93 94 # ------------------------------------------------------------------ 95 # Internal prompt helpers 96 # ------------------------------------------------------------------ 97 98 def _tool_descriptions(self) -> str: 99 if not self.tool_registry: 100 return "No tools available." 101 tools = self.tool_registry.list_tools() 102 if not tools: 103 return "No tools available." 104 lines = [] 105 for t in tools: 106 lines.append(f"- {t.name}: {t.description}") 107 return "\n".join(lines) 108 109 def _build_prompt(self, task: str, history: List[AgentStep]) -> str: 110 template = self._system_prompt if self._system_prompt is not None else _REACT_SYSTEM 111 if "{tool_descriptions}" in template: 112 system = template.format(tool_descriptions=self._tool_descriptions()) 113 else: 114 system = template 115 turns = [f"Task: {task}\n"] 116 for step in history: 117 turns.append(f"Thought: {step.thought}") 118 turns.append(f"Action: {step.action}") 119 turns.append(f"Action Input: {json.dumps(step.action_input)}") 120 if step.observation: 121 turns.append(f"Observation: {step.observation}") 122 return system + "\n" + "\n".join(turns) 123 124 def _parse_step(self, text: str) -> Optional[AgentStep]: 125 def _coerce_action_input(raw_input: str) -> Dict[str, Any]: 126 try: 127 action_input = json.loads(raw_input) 128 if isinstance(action_input, dict): 129 return action_input 130 except (json.JSONDecodeError, ValueError): 131 pass 132 return {"raw": raw_input} 133 134 match = _REACT_STEP_PATTERN.search(text) 135 if match: 136 thought = match.group("thought").strip() 137 action = match.group("action").strip() 138 raw_input = match.group("action_input").strip() 139 action_input = _coerce_action_input(raw_input) 140 return AgentStep(thought=thought, action=action, action_input=action_input) 141 142 # Some models omit "Thought:" and return only Action/Action Input. 143 final_only = _REACT_FINAL_ONLY_PATTERN.search(text) 144 if final_only: 145 raw_input = final_only.group("action_input").strip() 146 action_input = _coerce_action_input(raw_input) 147 return AgentStep(thought="", action="Final Answer", action_input=action_input) 148 149 return None 150 151 def _extract_final_answer(self, step: AgentStep) -> str: 152 """Return normalized final answer text from a Final Answer step.""" 153 answer_obj: Any 154 if isinstance(step.action_input, dict) and "answer" in step.action_input: 155 answer_obj = step.action_input["answer"] 156 else: 157 answer_obj = step.action_input 158 159 answer = str(answer_obj).strip() 160 161 # Unwrap nested ReAct control text accidentally returned as answer body. 162 for _ in range(3): 163 nested = _REACT_FINAL_ONLY_PATTERN.search(answer) 164 if nested: 165 raw_input = nested.group("action_input").strip() 166 elif answer.startswith("Final Answer:"): 167 raw_input = answer[len("Final Answer:"):].strip() 168 else: 169 break 170 try: 171 parsed = json.loads(raw_input) 172 if isinstance(parsed, dict) and "answer" in parsed: 173 answer = str(parsed["answer"]).strip() 174 else: 175 answer = raw_input 176 except (json.JSONDecodeError, ValueError): 177 answer = raw_input 178 179 return answer.strip() 180 181 async def _save_checkpoint( 182 self, 183 execution_id: str, 184 task: str, 185 context: Dict[str, Any], 186 steps: List[AgentStep], 187 step_number: int, 188 ) -> None: 189 if not self.checkpoint_manager: 190 return 191 await self.checkpoint_manager.save( 192 agent_id=self.agent_id, 193 execution_id=execution_id, 194 state={ 195 "execution_id": execution_id, 196 "task": task, 197 "context": context, 198 "step_number": step_number, 199 "steps": [dataclasses.asdict(s) for s in steps], 200 }, 201 metadata={"step_number": step_number}, 202 ) 203 204 async def resume_from( 205 self, execution_id: str, from_step: Optional[int] = None 206 ) -> AgentResult: 207 """Resume a ReAct execution from its latest (or selected) checkpoint.""" 208 if not self.checkpoint_manager: 209 raise ValueError("Checkpoint manager is not configured for this agent") 210 211 checkpoint = None 212 if from_step is not None: 213 checkpoints = await self.checkpoint_manager.list_by_execution(execution_id) 214 for ckpt in checkpoints: 215 if ckpt.state.get("step_number") == from_step: 216 checkpoint = ckpt 217 break 218 else: 219 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 220 221 if checkpoint is None: 222 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 223 224 state = checkpoint.state 225 task = str(state.get("task", "")) 226 context = state.get("context") or {} 227 raw_steps = state.get("steps") or [] 228 steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)] 229 230 # If the checkpoint already contains a final answer, return idempotently. 231 if steps and steps[-1].action == "Final Answer": 232 answer = self._extract_final_answer(steps[-1]) 233 return AgentResult( 234 output=str(answer), 235 steps=steps, 236 success=True, 237 metadata={"execution_id": execution_id}, 238 ) 239 240 ctx = BehaviorContext( 241 agent_id=self.agent_id, 242 task=task, 243 execution_id=execution_id, 244 last_completed_step=int(state.get("step_number", len(steps) - 1)), 245 metadata={"resume_agent": self}, 246 ) 247 ctx = await self._apply_behaviors_before(ctx) 248 self._log_execution_start(task) 249 250 for step_num in range(len(steps), self.max_steps): 251 prompt = self._build_prompt(task, steps) 252 response = await self.llm_gateway.complete( 253 prompt, model=self.model, temperature=self.temperature 254 ) 255 step = self._parse_step(response.content) 256 if step is None: 257 step = AgentStep( 258 thought="Could not parse structured response.", 259 action="Final Answer", 260 action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))}, 261 ) 262 263 if step.action == "Final Answer": 264 answer = self._extract_final_answer(step) 265 step.observation = answer 266 steps.append(step) 267 await self._save_checkpoint(execution_id, task, context, steps, step_num) 268 ctx.last_completed_step = step_num 269 result = AgentResult( 270 output=answer, 271 steps=steps, 272 success=True, 273 metadata={"execution_id": execution_id}, 274 ) 275 result = await self._apply_behaviors_after(ctx, result) 276 self._log_execution_end(task, success=True, steps=len(steps)) 277 return result 278 279 observation = "" 280 if self.tool_registry: 281 try: 282 tool_result = await self.tool_registry.execute(step.action, **step.action_input) 283 observation = str(tool_result) 284 except Exception as tool_exc: 285 observation = f"Tool error: {tool_exc}" 286 else: 287 observation = f"No tool registry — cannot execute '{step.action}'" 288 289 step.observation = observation 290 steps.append(step) 291 await self._save_checkpoint(execution_id, task, context, steps, step_num) 292 ctx.last_completed_step = step_num 293 294 final_output = steps[-1].observation if steps else "No output" 295 result = AgentResult( 296 output=final_output, 297 steps=steps, 298 success=False, 299 error=f"Reached max_steps={self.max_steps} without Final Answer", 300 metadata={"execution_id": execution_id}, 301 ) 302 result = await self._apply_behaviors_after(ctx, result) 303 self._log_execution_end(task, success=False, steps=len(steps)) 304 return result 305 306 # ------------------------------------------------------------------ 307 # BaseAgent implementation 308 # ------------------------------------------------------------------ 309 310 async def execute( 311 self, task: str, context: Optional[Dict[str, Any]] = None 312 ) -> AgentResult: 313 execution_id = str(uuid4()) 314 run_context = context or {} 315 ctx = BehaviorContext( 316 agent_id=self.agent_id, 317 task=task, 318 execution_id=execution_id, 319 metadata={"resume_agent": self, **run_context}, 320 ) 321 ctx = await self._apply_behaviors_before(ctx) 322 self._log_execution_start(task) 323 324 while True: 325 _retry = False 326 _control_exit = None # HumanApprovalRequired or PendingApproval — not errors 327 steps: List[AgentStep] = [] 328 329 with self._tracer.trace( 330 "react_agent.execute", input=task, metadata={"agent_id": self.agent_id} 331 ) as trace: 332 try: 333 for _ in range(self.max_steps): 334 prompt = self._build_prompt(task, steps) 335 336 with trace.generation( 337 "llm_call", model=self.model or "default", input=prompt 338 ) as gen: 339 perf_id = None 340 if self._performance_monitor: 341 perf_id = self._performance_monitor.start_request( 342 provider="llm_gateway", model=self.model or "default" 343 ) 344 try: 345 response = await self.llm_gateway.complete( 346 prompt, model=self.model, temperature=self.temperature 347 ) 348 if self._performance_monitor and perf_id: 349 self._performance_monitor.end_request( 350 request_id=perf_id, 351 prompt_tokens=response.usage.get("prompt_tokens", 0), 352 completion_tokens=response.usage.get("completion_tokens", 0), 353 success=True, 354 ) 355 gen.set_output(response.content) 356 gen.set_token_usage(**response.usage) 357 except Exception as exc: 358 if self._performance_monitor and perf_id: 359 self._performance_monitor.end_request( 360 request_id=perf_id, 361 prompt_tokens=0, 362 completion_tokens=0, 363 success=False, 364 error=str(exc), 365 ) 366 raise 367 368 step = self._parse_step(response.content) 369 if step is None: 370 # Unparseable response — treat as final answer 371 step = AgentStep( 372 thought="Could not parse structured response.", 373 action="Final Answer", 374 action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))}, 375 ) 376 377 with trace.span("step", input=step.action) as span: 378 if step.action == "Final Answer": 379 answer = self._extract_final_answer(step) 380 step.observation = answer 381 steps.append(step) 382 span.set_output(answer) 383 384 # Persist to state store if available 385 if self.state_store: 386 await self.state_store.set( 387 f"agent:{self.agent_id}:last_steps", 388 [vars(s) for s in steps], 389 ttl=self.checkpoint_manager.default_ttl if self.checkpoint_manager else None, 390 ) 391 392 await self._save_checkpoint( 393 execution_id, 394 task, 395 run_context, 396 steps, 397 len(steps) - 1, 398 ) 399 ctx.last_completed_step = len(steps) - 1 400 401 result = AgentResult( 402 output=answer, 403 steps=steps, 404 success=True, 405 metadata={"execution_id": execution_id}, 406 ) 407 try: 408 result = await self._apply_behaviors_after(ctx, result) 409 except (HumanApprovalRequired, PendingApproval) as ctrl_exc: 410 _control_exit = ctrl_exc 411 412 if _control_exit is None: 413 self._log_execution_end(task, success=True, steps=len(steps)) 414 trace.set_output(answer) 415 return result 416 else: 417 # Control-flow exit (HumanApprovalRequired / PendingApproval) 418 # span output already set above; also set trace output so 419 # it logs the answer rather than finishing with no output. 420 trace.set_output(answer) 421 422 else: 423 # Tool call 424 observation = "" 425 if self.tool_registry: 426 try: 427 tool_result = await self.tool_registry.execute( 428 step.action, **step.action_input 429 ) 430 observation = str(tool_result) 431 except Exception as tool_exc: 432 observation = f"Tool error: {tool_exc}" 433 else: 434 observation = f"No tool registry — cannot execute '{step.action}'" 435 436 step.observation = observation 437 span.set_output(observation) 438 steps.append(step) 439 await self._save_checkpoint( 440 execution_id, 441 task, 442 run_context, 443 steps, 444 len(steps) - 1, 445 ) 446 ctx.last_completed_step = len(steps) - 1 447 448 if _control_exit is not None: 449 break # exit for loop; span already closed cleanly 450 451 # Max steps reached (or for loop broken by control-flow signal) 452 if _control_exit is None: 453 final_output = steps[-1].observation if steps else "No output" 454 result = AgentResult( 455 output=final_output, 456 steps=steps, 457 success=False, 458 error=f"Reached max_steps={self.max_steps} without Final Answer", 459 metadata={"execution_id": execution_id}, 460 ) 461 result = await self._apply_behaviors_after(ctx, result) 462 self._log_execution_end(task, success=False, steps=len(steps)) 463 trace.set_output(final_output) 464 return result 465 # else: control-flow exit — trace closes cleanly, re-raised below 466 467 except Exception as exc: 468 self._log_execution_error(task, exc) 469 fallback = await self._apply_behaviors_on_error(ctx, exc) 470 if fallback is RETRY_SENTINEL: 471 # RetryBehavior slept and wants us to retry from scratch. 472 # Mark the trace as failed so it logs "failed" not "finished". 473 trace.set_error(exc) 474 ctx.attempt += 1 475 _retry = True 476 elif fallback is not None: 477 return fallback 478 else: 479 raise 480 481 if _control_exit is not None: 482 raise _control_exit 483 484 if not _retry: 485 break 486 487 async def stream_execute( 488 self, task: str, context: Optional[Dict[str, Any]] = None 489 ) -> AsyncIterator[AgentStep]: 490 ctx = BehaviorContext(agent_id=self.agent_id, task=task) 491 ctx = await self._apply_behaviors_before(ctx) 492 self._log_execution_start(task) 493 494 steps: List[AgentStep] = [] 495 for _ in range(self.max_steps): 496 prompt = self._build_prompt(task, steps) 497 response = await self.llm_gateway.complete( 498 prompt, model=self.model, temperature=self.temperature 499 ) 500 step = self._parse_step(response.content) 501 if step is None: 502 step = AgentStep( 503 thought="Unparseable response.", 504 action="Final Answer", 505 action_input={"answer": response.content}, 506 ) 507 508 if step.action == "Final Answer": 509 step.observation = self._extract_final_answer(step) 510 steps.append(step) 511 yield step 512 return 513 514 if self.tool_registry: 515 try: 516 tool_result = await self.tool_registry.execute(step.action, **step.action_input) 517 step.observation = str(tool_result) 518 except Exception as tool_exc: 519 step.observation = f"Tool error: {tool_exc}" 520 else: 521 step.observation = f"No tool registry — cannot execute '{step.action}'" 522 523 steps.append(step) 524 yield step
ReAct agent: interleaves Reasoning (Thought) and Acting (Action/Observation).
When to use: Any task that requires tool calls (search, API lookup, database queries). The LLM reasons step by step and acts on each step.
When NOT to use: Pure reasoning/analysis tasks with no tools — use
ChainOfThoughtAgent instead (cheaper: one LLM call vs. many).
On each step the LLM produces a Thought, an Action (tool name or
"Final Answer"), and an Action Input. If the action is a tool call,
the tool's output is fed back as an Observation and the loop continues.
The system prompt MUST instruct the LLM to respond using the
Thought:/Action:/Action Input: format. Without this contract the
regex parser will not match, the agent will treat the first response
as a Final Answer, and no tool calls will ever be made.
Always include {tool_descriptions} in a custom prompt so the LLM
knows what tools are available.
Args:
max_steps: Maximum thought/action cycles before stopping (default: 10).
model: LLM model name passed to the gateway (optional).
temperature: Sampling temperature (default: 0.0 for determinism).
system_prompt: Override the default ReAct system prompt. Use
{tool_descriptions} as a placeholder if you want the agent's
available tools listed in your prompt. The task is always appended
separately and does not need a placeholder here.
All other args inherited from BaseAgent.
80 def __init__(self, *args: Any, max_steps: int = 10, model: Optional[str] = None, 81 temperature: float = 0.0, system_prompt: Optional[str] = None, 82 **kwargs: Any) -> None: 83 super().__init__(*args, **kwargs) 84 self.max_steps = max_steps 85 self.model = model 86 self.temperature = temperature 87 self._system_prompt = system_prompt
204 async def resume_from( 205 self, execution_id: str, from_step: Optional[int] = None 206 ) -> AgentResult: 207 """Resume a ReAct execution from its latest (or selected) checkpoint.""" 208 if not self.checkpoint_manager: 209 raise ValueError("Checkpoint manager is not configured for this agent") 210 211 checkpoint = None 212 if from_step is not None: 213 checkpoints = await self.checkpoint_manager.list_by_execution(execution_id) 214 for ckpt in checkpoints: 215 if ckpt.state.get("step_number") == from_step: 216 checkpoint = ckpt 217 break 218 else: 219 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 220 221 if checkpoint is None: 222 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 223 224 state = checkpoint.state 225 task = str(state.get("task", "")) 226 context = state.get("context") or {} 227 raw_steps = state.get("steps") or [] 228 steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)] 229 230 # If the checkpoint already contains a final answer, return idempotently. 231 if steps and steps[-1].action == "Final Answer": 232 answer = self._extract_final_answer(steps[-1]) 233 return AgentResult( 234 output=str(answer), 235 steps=steps, 236 success=True, 237 metadata={"execution_id": execution_id}, 238 ) 239 240 ctx = BehaviorContext( 241 agent_id=self.agent_id, 242 task=task, 243 execution_id=execution_id, 244 last_completed_step=int(state.get("step_number", len(steps) - 1)), 245 metadata={"resume_agent": self}, 246 ) 247 ctx = await self._apply_behaviors_before(ctx) 248 self._log_execution_start(task) 249 250 for step_num in range(len(steps), self.max_steps): 251 prompt = self._build_prompt(task, steps) 252 response = await self.llm_gateway.complete( 253 prompt, model=self.model, temperature=self.temperature 254 ) 255 step = self._parse_step(response.content) 256 if step is None: 257 step = AgentStep( 258 thought="Could not parse structured response.", 259 action="Final Answer", 260 action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))}, 261 ) 262 263 if step.action == "Final Answer": 264 answer = self._extract_final_answer(step) 265 step.observation = answer 266 steps.append(step) 267 await self._save_checkpoint(execution_id, task, context, steps, step_num) 268 ctx.last_completed_step = step_num 269 result = AgentResult( 270 output=answer, 271 steps=steps, 272 success=True, 273 metadata={"execution_id": execution_id}, 274 ) 275 result = await self._apply_behaviors_after(ctx, result) 276 self._log_execution_end(task, success=True, steps=len(steps)) 277 return result 278 279 observation = "" 280 if self.tool_registry: 281 try: 282 tool_result = await self.tool_registry.execute(step.action, **step.action_input) 283 observation = str(tool_result) 284 except Exception as tool_exc: 285 observation = f"Tool error: {tool_exc}" 286 else: 287 observation = f"No tool registry — cannot execute '{step.action}'" 288 289 step.observation = observation 290 steps.append(step) 291 await self._save_checkpoint(execution_id, task, context, steps, step_num) 292 ctx.last_completed_step = step_num 293 294 final_output = steps[-1].observation if steps else "No output" 295 result = AgentResult( 296 output=final_output, 297 steps=steps, 298 success=False, 299 error=f"Reached max_steps={self.max_steps} without Final Answer", 300 metadata={"execution_id": execution_id}, 301 ) 302 result = await self._apply_behaviors_after(ctx, result) 303 self._log_execution_end(task, success=False, steps=len(steps)) 304 return result
Resume a ReAct execution from its latest (or selected) checkpoint.
310 async def execute( 311 self, task: str, context: Optional[Dict[str, Any]] = None 312 ) -> AgentResult: 313 execution_id = str(uuid4()) 314 run_context = context or {} 315 ctx = BehaviorContext( 316 agent_id=self.agent_id, 317 task=task, 318 execution_id=execution_id, 319 metadata={"resume_agent": self, **run_context}, 320 ) 321 ctx = await self._apply_behaviors_before(ctx) 322 self._log_execution_start(task) 323 324 while True: 325 _retry = False 326 _control_exit = None # HumanApprovalRequired or PendingApproval — not errors 327 steps: List[AgentStep] = [] 328 329 with self._tracer.trace( 330 "react_agent.execute", input=task, metadata={"agent_id": self.agent_id} 331 ) as trace: 332 try: 333 for _ in range(self.max_steps): 334 prompt = self._build_prompt(task, steps) 335 336 with trace.generation( 337 "llm_call", model=self.model or "default", input=prompt 338 ) as gen: 339 perf_id = None 340 if self._performance_monitor: 341 perf_id = self._performance_monitor.start_request( 342 provider="llm_gateway", model=self.model or "default" 343 ) 344 try: 345 response = await self.llm_gateway.complete( 346 prompt, model=self.model, temperature=self.temperature 347 ) 348 if self._performance_monitor and perf_id: 349 self._performance_monitor.end_request( 350 request_id=perf_id, 351 prompt_tokens=response.usage.get("prompt_tokens", 0), 352 completion_tokens=response.usage.get("completion_tokens", 0), 353 success=True, 354 ) 355 gen.set_output(response.content) 356 gen.set_token_usage(**response.usage) 357 except Exception as exc: 358 if self._performance_monitor and perf_id: 359 self._performance_monitor.end_request( 360 request_id=perf_id, 361 prompt_tokens=0, 362 completion_tokens=0, 363 success=False, 364 error=str(exc), 365 ) 366 raise 367 368 step = self._parse_step(response.content) 369 if step is None: 370 # Unparseable response — treat as final answer 371 step = AgentStep( 372 thought="Could not parse structured response.", 373 action="Final Answer", 374 action_input={"answer": self._extract_final_answer(AgentStep(thought="", action="Final Answer", action_input={"answer": response.content}))}, 375 ) 376 377 with trace.span("step", input=step.action) as span: 378 if step.action == "Final Answer": 379 answer = self._extract_final_answer(step) 380 step.observation = answer 381 steps.append(step) 382 span.set_output(answer) 383 384 # Persist to state store if available 385 if self.state_store: 386 await self.state_store.set( 387 f"agent:{self.agent_id}:last_steps", 388 [vars(s) for s in steps], 389 ttl=self.checkpoint_manager.default_ttl if self.checkpoint_manager else None, 390 ) 391 392 await self._save_checkpoint( 393 execution_id, 394 task, 395 run_context, 396 steps, 397 len(steps) - 1, 398 ) 399 ctx.last_completed_step = len(steps) - 1 400 401 result = AgentResult( 402 output=answer, 403 steps=steps, 404 success=True, 405 metadata={"execution_id": execution_id}, 406 ) 407 try: 408 result = await self._apply_behaviors_after(ctx, result) 409 except (HumanApprovalRequired, PendingApproval) as ctrl_exc: 410 _control_exit = ctrl_exc 411 412 if _control_exit is None: 413 self._log_execution_end(task, success=True, steps=len(steps)) 414 trace.set_output(answer) 415 return result 416 else: 417 # Control-flow exit (HumanApprovalRequired / PendingApproval) 418 # span output already set above; also set trace output so 419 # it logs the answer rather than finishing with no output. 420 trace.set_output(answer) 421 422 else: 423 # Tool call 424 observation = "" 425 if self.tool_registry: 426 try: 427 tool_result = await self.tool_registry.execute( 428 step.action, **step.action_input 429 ) 430 observation = str(tool_result) 431 except Exception as tool_exc: 432 observation = f"Tool error: {tool_exc}" 433 else: 434 observation = f"No tool registry — cannot execute '{step.action}'" 435 436 step.observation = observation 437 span.set_output(observation) 438 steps.append(step) 439 await self._save_checkpoint( 440 execution_id, 441 task, 442 run_context, 443 steps, 444 len(steps) - 1, 445 ) 446 ctx.last_completed_step = len(steps) - 1 447 448 if _control_exit is not None: 449 break # exit for loop; span already closed cleanly 450 451 # Max steps reached (or for loop broken by control-flow signal) 452 if _control_exit is None: 453 final_output = steps[-1].observation if steps else "No output" 454 result = AgentResult( 455 output=final_output, 456 steps=steps, 457 success=False, 458 error=f"Reached max_steps={self.max_steps} without Final Answer", 459 metadata={"execution_id": execution_id}, 460 ) 461 result = await self._apply_behaviors_after(ctx, result) 462 self._log_execution_end(task, success=False, steps=len(steps)) 463 trace.set_output(final_output) 464 return result 465 # else: control-flow exit — trace closes cleanly, re-raised below 466 467 except Exception as exc: 468 self._log_execution_error(task, exc) 469 fallback = await self._apply_behaviors_on_error(ctx, exc) 470 if fallback is RETRY_SENTINEL: 471 # RetryBehavior slept and wants us to retry from scratch. 472 # Mark the trace as failed so it logs "failed" not "finished". 473 trace.set_error(exc) 474 ctx.attempt += 1 475 _retry = True 476 elif fallback is not None: 477 return fallback 478 else: 479 raise 480 481 if _control_exit is not None: 482 raise _control_exit 483 484 if not _retry: 485 break
Execute the task and return a result.
487 async def stream_execute( 488 self, task: str, context: Optional[Dict[str, Any]] = None 489 ) -> AsyncIterator[AgentStep]: 490 ctx = BehaviorContext(agent_id=self.agent_id, task=task) 491 ctx = await self._apply_behaviors_before(ctx) 492 self._log_execution_start(task) 493 494 steps: List[AgentStep] = [] 495 for _ in range(self.max_steps): 496 prompt = self._build_prompt(task, steps) 497 response = await self.llm_gateway.complete( 498 prompt, model=self.model, temperature=self.temperature 499 ) 500 step = self._parse_step(response.content) 501 if step is None: 502 step = AgentStep( 503 thought="Unparseable response.", 504 action="Final Answer", 505 action_input={"answer": response.content}, 506 ) 507 508 if step.action == "Final Answer": 509 step.observation = self._extract_final_answer(step) 510 steps.append(step) 511 yield step 512 return 513 514 if self.tool_registry: 515 try: 516 tool_result = await self.tool_registry.execute(step.action, **step.action_input) 517 step.observation = str(tool_result) 518 except Exception as tool_exc: 519 step.observation = f"Tool error: {tool_exc}" 520 else: 521 step.observation = f"No tool registry — cannot execute '{step.action}'" 522 523 steps.append(step) 524 yield step
Execute the task, yielding each step as it completes.
54class PlanExecuteAgent(BaseAgent): 55 """ 56 Two-phase agent: LLM plans all steps first, then executes each step sequentially. 57 58 Phase 1 — Plan: Ask the LLM to decompose the task into a JSON list of steps. 59 Phase 2 — Execute: Feed each step back to the LLM (with accumulated context) to 60 produce a result. Each step runs an inner tool-calling loop — if the LLM emits a 61 ``Thought/Action/Action Input`` block referencing a registered tool, the tool is 62 invoked and its observation is fed back for the LLM to produce a final step result. 63 64 Args: 65 max_plan_steps: Maximum number of planned steps allowed (default: 10). 66 max_tool_calls_per_step: Maximum tool calls allowed within a single plan step 67 (default: 3). 68 model: LLM model name (optional). 69 temperature: Sampling temperature (default: 0.1). 70 plan_prompt: Override the planning prompt. Must contain ``{task}``. 71 execute_prompt: Override the step-execution prompt. Must contain 72 ``{task}``, ``{plan}``, ``{previous_results}``, ``{step_num}``, 73 ``{current_step}``, and ``{tool_section}``. 74 75 All other args inherited from :class:`BaseAgent`. 76 """ 77 78 def __init__( 79 self, 80 *args: Any, 81 max_plan_steps: int = 10, 82 max_tool_calls_per_step: int = 3, 83 model: Optional[str] = None, 84 temperature: float = 0.1, 85 plan_prompt: Optional[str] = None, 86 execute_prompt: Optional[str] = None, 87 **kwargs: Any, 88 ) -> None: 89 super().__init__(*args, **kwargs) 90 self.max_plan_steps = max_plan_steps 91 self.max_tool_calls_per_step = max_tool_calls_per_step 92 self.model = model 93 self.temperature = temperature 94 self._plan_prompt = plan_prompt 95 self._execute_prompt = execute_prompt 96 97 #: Default planning prompt. Must contain ``{task}`` if overriding. 98 DEFAULT_PLAN_PROMPT: str = _PLAN_PROMPT 99 #: Default step-execution prompt. Must contain ``{task}``, ``{plan}``, 100 #: ``{previous_results}``, ``{step_num}``, ``{current_step}``, and 101 #: ``{tool_section}`` if overriding (``{tool_section}`` is auto-populated 102 #: from the tool registry; pass an empty string if no tools are needed). 103 DEFAULT_EXECUTE_PROMPT: str = _EXECUTE_PROMPT 104 105 # ------------------------------------------------------------------ 106 # Phase 1: Plan 107 # ------------------------------------------------------------------ 108 109 async def _plan(self, task: str) -> List[str]: 110 template = self._plan_prompt if self._plan_prompt is not None else _PLAN_PROMPT 111 prompt = template.format(task=task) 112 response = await self.llm_gateway.complete( 113 prompt, model=self.model, temperature=0.0 114 ) 115 raw = response.content.strip() 116 # Extract JSON array even if wrapped in markdown fences 117 match = re.search(r"\[.*\]", raw, re.DOTALL) 118 if match: 119 try: 120 steps = json.loads(match.group()) 121 if isinstance(steps, list): 122 return [str(s) for s in steps[: self.max_plan_steps]] 123 except (json.JSONDecodeError, ValueError): 124 pass 125 # Fallback: treat each line as a step 126 lines = [ln.strip().lstrip("0123456789.-) ") for ln in raw.splitlines() if ln.strip()] 127 return lines[: self.max_plan_steps] 128 129 # ------------------------------------------------------------------ 130 # Phase 2: Execute a single step (with optional tool calls) 131 # ------------------------------------------------------------------ 132 133 def _tool_descriptions(self) -> str: 134 if not self.tool_registry: 135 return "" 136 tools = self.tool_registry.list_tools() 137 if not tools: 138 return "" 139 return "\n".join(f"- {t.name}: {t.description}" for t in tools) 140 141 async def _execute_step( 142 self, task: str, plan: List[str], step_num: int, previous: List[str] 143 ) -> str: 144 tool_descriptions = self._tool_descriptions() 145 tool_section = ( 146 _TOOL_SECTION.format(tool_descriptions=tool_descriptions) 147 if tool_descriptions 148 else "" 149 ) 150 template = self._execute_prompt if self._execute_prompt is not None else _EXECUTE_PROMPT 151 prompt = template.format( 152 task=task, 153 plan="\n".join(f"{i+1}. {s}" for i, s in enumerate(plan)), 154 previous_results="\n".join( 155 f"Step {i+1} result: {r}" for i, r in enumerate(previous) 156 ) or "None yet.", 157 step_num=step_num, 158 current_step=plan[step_num - 1], 159 tool_section=tool_section, 160 ) 161 162 # Inner tool-calling loop: allow up to max_tool_calls tool calls per step 163 conversation = prompt 164 for _ in range(self.max_tool_calls_per_step): 165 response = await self.llm_gateway.complete( 166 conversation, model=self.model, temperature=self.temperature 167 ) 168 raw = response.content.strip() 169 170 # Check if the LLM wants to call a tool 171 match = _STEP_PATTERN.search(raw) 172 if match and self.tool_registry: 173 action = match.group("action").strip() 174 raw_input = match.group("action_input").strip() 175 try: 176 action_input = json.loads(raw_input) 177 if not isinstance(action_input, dict): 178 action_input = {"raw": raw_input} 179 except (json.JSONDecodeError, ValueError): 180 action_input = {"raw": raw_input} 181 182 try: 183 tool_result = await self.tool_registry.execute(action, **action_input) 184 observation = str(tool_result) 185 except Exception as exc: 186 observation = f"Tool error: {exc}" 187 188 # Append the tool call + observation to the conversation and continue 189 conversation = ( 190 conversation 191 + f"\n{raw}\nObservation: {observation}\n" 192 + "Now provide the final result for this step based on the above observation." 193 ) 194 else: 195 # No tool call pattern — treat the response as the step result 196 return raw 197 198 # Exhausted tool call budget — return last response 199 return raw 200 201 async def _save_checkpoint( 202 self, 203 execution_id: str, 204 task: str, 205 context: Dict[str, Any], 206 plan: List[str], 207 steps: List[AgentStep], 208 step_results: List[str], 209 step_number: int, 210 ) -> None: 211 if not self.checkpoint_manager: 212 return 213 await self.checkpoint_manager.save( 214 agent_id=self.agent_id, 215 execution_id=execution_id, 216 state={ 217 "execution_id": execution_id, 218 "task": task, 219 "context": context, 220 "plan": plan, 221 "step_number": step_number, 222 "step_results": list(step_results), 223 "steps": [dataclasses.asdict(s) for s in steps], 224 }, 225 metadata={"step_number": step_number}, 226 ttl=86400, 227 ) 228 229 async def resume_from( 230 self, execution_id: str, from_step: Optional[int] = None 231 ) -> AgentResult: 232 """Resume a Plan-Execute run from latest (or selected) checkpoint.""" 233 if not self.checkpoint_manager: 234 raise ValueError("Checkpoint manager is not configured for this agent") 235 236 checkpoint = None 237 if from_step is not None: 238 checkpoints = await self.checkpoint_manager.list_by_execution(execution_id) 239 for ckpt in checkpoints: 240 if ckpt.state.get("step_number") == from_step: 241 checkpoint = ckpt 242 break 243 else: 244 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 245 246 if checkpoint is None: 247 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 248 249 state = checkpoint.state 250 task = str(state.get("task", "")) 251 context = state.get("context") or {} 252 plan = [str(s) for s in state.get("plan") or []] 253 raw_steps = state.get("steps") or [] 254 steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)] 255 step_results = [str(r) for r in state.get("step_results") or []] 256 current_step = int(state.get("step_number", len(step_results) - 1)) + 1 257 258 ctx = BehaviorContext( 259 agent_id=self.agent_id, 260 task=task, 261 execution_id=execution_id, 262 last_completed_step=max(current_step - 1, -1), 263 metadata={"resume_agent": self}, 264 ) 265 ctx = await self._apply_behaviors_before(ctx) 266 267 for i in range(current_step, len(plan)): 268 step_num = i + 1 269 result_text = await self._execute_step(task, plan, step_num, step_results) 270 step_results.append(result_text) 271 agent_step = AgentStep( 272 thought=f"Executing plan step {step_num}: {plan[i]}", 273 action="llm_execution", 274 action_input={"step": plan[i]}, 275 observation=result_text, 276 ) 277 steps.append(agent_step) 278 await self._save_checkpoint( 279 execution_id, 280 task, 281 context, 282 plan, 283 steps, 284 step_results, 285 i, 286 ) 287 ctx.last_completed_step = i 288 289 final_output = step_results[-1] if step_results else "" 290 result = AgentResult( 291 output=final_output, 292 steps=steps, 293 success=True, 294 metadata={"plan": plan, "execution_id": execution_id}, 295 ) 296 return await self._apply_behaviors_after(ctx, result) 297 298 # ------------------------------------------------------------------ 299 # BaseAgent implementation 300 # ------------------------------------------------------------------ 301 302 async def execute( 303 self, task: str, context: Optional[Dict[str, Any]] = None 304 ) -> AgentResult: 305 execution_id = str(uuid4()) 306 run_context = context or {} 307 ctx = BehaviorContext( 308 agent_id=self.agent_id, 309 task=task, 310 execution_id=execution_id, 311 metadata={"resume_agent": self, **run_context}, 312 ) 313 ctx = await self._apply_behaviors_before(ctx) 314 self._log_execution_start(task) 315 316 with self._tracer.trace( 317 "plan_execute_agent.execute", input=task, metadata={"agent_id": self.agent_id} 318 ) as trace: 319 try: 320 # Phase 1 — Plan 321 with trace.span("plan", input=task) as plan_span: 322 plan = await self._plan(task) 323 plan_span.set_output({"steps": plan}) 324 self._logger.info( 325 "Plan created", 326 agent_id=self.agent_id, 327 step_count=len(plan), 328 ) 329 await self._save_checkpoint( 330 execution_id, 331 task, 332 run_context, 333 plan, 334 [], 335 [], 336 -1, 337 ) 338 339 # Phase 2 — Execute 340 steps: List[AgentStep] = [] 341 step_results: List[str] = [] 342 for i, plan_step in enumerate(plan): 343 step_num = i + 1 344 with trace.span(f"execute_step_{step_num}", input=plan_step) as step_span: 345 perf_id = None 346 if self._performance_monitor: 347 perf_id = self._performance_monitor.start_request( 348 provider="llm_gateway", 349 model=self.model or "default", 350 ) 351 try: 352 result_text = await self._execute_step( 353 task, plan, step_num, step_results 354 ) 355 if self._performance_monitor and perf_id: 356 self._performance_monitor.end_request( 357 request_id=perf_id, 358 prompt_tokens=0, 359 completion_tokens=0, 360 success=True, 361 ) 362 except Exception as exc: 363 if self._performance_monitor and perf_id: 364 self._performance_monitor.end_request( 365 request_id=perf_id, 366 prompt_tokens=0, 367 completion_tokens=0, 368 success=False, 369 error=str(exc), 370 ) 371 raise 372 373 step_results.append(result_text) 374 agent_step = AgentStep( 375 thought=f"Executing plan step {step_num}: {plan_step}", 376 action="llm_execution", 377 action_input={"step": plan_step}, 378 observation=result_text, 379 ) 380 steps.append(agent_step) 381 step_span.set_output(result_text) 382 await self._save_checkpoint( 383 execution_id, 384 task, 385 run_context, 386 plan, 387 steps, 388 step_results, 389 i, 390 ) 391 ctx.last_completed_step = i 392 393 final_output = step_results[-1] if step_results else "" 394 if self.state_store: 395 await self.state_store.set( 396 f"agent:{self.agent_id}:last_steps", 397 [vars(s) for s in steps], 398 ttl=3600, 399 ) 400 401 result = AgentResult( 402 output=final_output, 403 steps=steps, 404 success=True, 405 metadata={"plan": plan, "execution_id": execution_id}, 406 ) 407 result = await self._apply_behaviors_after(ctx, result) 408 self._log_execution_end(task, success=True, steps=len(steps)) 409 trace.set_output(final_output) 410 return result 411 412 except Exception as exc: 413 self._log_execution_error(task, exc) 414 fallback = await self._apply_behaviors_on_error(ctx, exc) 415 if fallback is not None: 416 return fallback 417 raise 418 419 async def stream_execute( 420 self, task: str, context: Optional[Dict[str, Any]] = None 421 ) -> AsyncIterator[AgentStep]: 422 ctx = BehaviorContext(agent_id=self.agent_id, task=task) 423 ctx = await self._apply_behaviors_before(ctx) 424 self._log_execution_start(task) 425 426 plan = await self._plan(task) 427 step_results: List[str] = [] 428 for i, plan_step in enumerate(plan): 429 result_text = await self._execute_step(task, plan, i + 1, step_results) 430 step_results.append(result_text) 431 step = AgentStep( 432 thought=f"Executing plan step {i+1}: {plan_step}", 433 action="llm_execution", 434 action_input={"step": plan_step}, 435 observation=result_text, 436 ) 437 yield step
Two-phase agent: LLM plans all steps first, then executes each step sequentially.
Phase 1 — Plan: Ask the LLM to decompose the task into a JSON list of steps.
Phase 2 — Execute: Feed each step back to the LLM (with accumulated context) to
produce a result. Each step runs an inner tool-calling loop — if the LLM emits a
Thought/Action/Action Input block referencing a registered tool, the tool is
invoked and its observation is fed back for the LLM to produce a final step result.
Args:
max_plan_steps: Maximum number of planned steps allowed (default: 10).
max_tool_calls_per_step: Maximum tool calls allowed within a single plan step
(default: 3).
model: LLM model name (optional).
temperature: Sampling temperature (default: 0.1).
plan_prompt: Override the planning prompt. Must contain {task}.
execute_prompt: Override the step-execution prompt. Must contain
{task}, {plan}, {previous_results}, {step_num},
{current_step}, and {tool_section}.
All other args inherited from BaseAgent.
78 def __init__( 79 self, 80 *args: Any, 81 max_plan_steps: int = 10, 82 max_tool_calls_per_step: int = 3, 83 model: Optional[str] = None, 84 temperature: float = 0.1, 85 plan_prompt: Optional[str] = None, 86 execute_prompt: Optional[str] = None, 87 **kwargs: Any, 88 ) -> None: 89 super().__init__(*args, **kwargs) 90 self.max_plan_steps = max_plan_steps 91 self.max_tool_calls_per_step = max_tool_calls_per_step 92 self.model = model 93 self.temperature = temperature 94 self._plan_prompt = plan_prompt 95 self._execute_prompt = execute_prompt
229 async def resume_from( 230 self, execution_id: str, from_step: Optional[int] = None 231 ) -> AgentResult: 232 """Resume a Plan-Execute run from latest (or selected) checkpoint.""" 233 if not self.checkpoint_manager: 234 raise ValueError("Checkpoint manager is not configured for this agent") 235 236 checkpoint = None 237 if from_step is not None: 238 checkpoints = await self.checkpoint_manager.list_by_execution(execution_id) 239 for ckpt in checkpoints: 240 if ckpt.state.get("step_number") == from_step: 241 checkpoint = ckpt 242 break 243 else: 244 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 245 246 if checkpoint is None: 247 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 248 249 state = checkpoint.state 250 task = str(state.get("task", "")) 251 context = state.get("context") or {} 252 plan = [str(s) for s in state.get("plan") or []] 253 raw_steps = state.get("steps") or [] 254 steps = [AgentStep(**raw) for raw in raw_steps if isinstance(raw, dict)] 255 step_results = [str(r) for r in state.get("step_results") or []] 256 current_step = int(state.get("step_number", len(step_results) - 1)) + 1 257 258 ctx = BehaviorContext( 259 agent_id=self.agent_id, 260 task=task, 261 execution_id=execution_id, 262 last_completed_step=max(current_step - 1, -1), 263 metadata={"resume_agent": self}, 264 ) 265 ctx = await self._apply_behaviors_before(ctx) 266 267 for i in range(current_step, len(plan)): 268 step_num = i + 1 269 result_text = await self._execute_step(task, plan, step_num, step_results) 270 step_results.append(result_text) 271 agent_step = AgentStep( 272 thought=f"Executing plan step {step_num}: {plan[i]}", 273 action="llm_execution", 274 action_input={"step": plan[i]}, 275 observation=result_text, 276 ) 277 steps.append(agent_step) 278 await self._save_checkpoint( 279 execution_id, 280 task, 281 context, 282 plan, 283 steps, 284 step_results, 285 i, 286 ) 287 ctx.last_completed_step = i 288 289 final_output = step_results[-1] if step_results else "" 290 result = AgentResult( 291 output=final_output, 292 steps=steps, 293 success=True, 294 metadata={"plan": plan, "execution_id": execution_id}, 295 ) 296 return await self._apply_behaviors_after(ctx, result)
Resume a Plan-Execute run from latest (or selected) checkpoint.
302 async def execute( 303 self, task: str, context: Optional[Dict[str, Any]] = None 304 ) -> AgentResult: 305 execution_id = str(uuid4()) 306 run_context = context or {} 307 ctx = BehaviorContext( 308 agent_id=self.agent_id, 309 task=task, 310 execution_id=execution_id, 311 metadata={"resume_agent": self, **run_context}, 312 ) 313 ctx = await self._apply_behaviors_before(ctx) 314 self._log_execution_start(task) 315 316 with self._tracer.trace( 317 "plan_execute_agent.execute", input=task, metadata={"agent_id": self.agent_id} 318 ) as trace: 319 try: 320 # Phase 1 — Plan 321 with trace.span("plan", input=task) as plan_span: 322 plan = await self._plan(task) 323 plan_span.set_output({"steps": plan}) 324 self._logger.info( 325 "Plan created", 326 agent_id=self.agent_id, 327 step_count=len(plan), 328 ) 329 await self._save_checkpoint( 330 execution_id, 331 task, 332 run_context, 333 plan, 334 [], 335 [], 336 -1, 337 ) 338 339 # Phase 2 — Execute 340 steps: List[AgentStep] = [] 341 step_results: List[str] = [] 342 for i, plan_step in enumerate(plan): 343 step_num = i + 1 344 with trace.span(f"execute_step_{step_num}", input=plan_step) as step_span: 345 perf_id = None 346 if self._performance_monitor: 347 perf_id = self._performance_monitor.start_request( 348 provider="llm_gateway", 349 model=self.model or "default", 350 ) 351 try: 352 result_text = await self._execute_step( 353 task, plan, step_num, step_results 354 ) 355 if self._performance_monitor and perf_id: 356 self._performance_monitor.end_request( 357 request_id=perf_id, 358 prompt_tokens=0, 359 completion_tokens=0, 360 success=True, 361 ) 362 except Exception as exc: 363 if self._performance_monitor and perf_id: 364 self._performance_monitor.end_request( 365 request_id=perf_id, 366 prompt_tokens=0, 367 completion_tokens=0, 368 success=False, 369 error=str(exc), 370 ) 371 raise 372 373 step_results.append(result_text) 374 agent_step = AgentStep( 375 thought=f"Executing plan step {step_num}: {plan_step}", 376 action="llm_execution", 377 action_input={"step": plan_step}, 378 observation=result_text, 379 ) 380 steps.append(agent_step) 381 step_span.set_output(result_text) 382 await self._save_checkpoint( 383 execution_id, 384 task, 385 run_context, 386 plan, 387 steps, 388 step_results, 389 i, 390 ) 391 ctx.last_completed_step = i 392 393 final_output = step_results[-1] if step_results else "" 394 if self.state_store: 395 await self.state_store.set( 396 f"agent:{self.agent_id}:last_steps", 397 [vars(s) for s in steps], 398 ttl=3600, 399 ) 400 401 result = AgentResult( 402 output=final_output, 403 steps=steps, 404 success=True, 405 metadata={"plan": plan, "execution_id": execution_id}, 406 ) 407 result = await self._apply_behaviors_after(ctx, result) 408 self._log_execution_end(task, success=True, steps=len(steps)) 409 trace.set_output(final_output) 410 return result 411 412 except Exception as exc: 413 self._log_execution_error(task, exc) 414 fallback = await self._apply_behaviors_on_error(ctx, exc) 415 if fallback is not None: 416 return fallback 417 raise
Execute the task and return a result.
419 async def stream_execute( 420 self, task: str, context: Optional[Dict[str, Any]] = None 421 ) -> AsyncIterator[AgentStep]: 422 ctx = BehaviorContext(agent_id=self.agent_id, task=task) 423 ctx = await self._apply_behaviors_before(ctx) 424 self._log_execution_start(task) 425 426 plan = await self._plan(task) 427 step_results: List[str] = [] 428 for i, plan_step in enumerate(plan): 429 result_text = await self._execute_step(task, plan, i + 1, step_results) 430 step_results.append(result_text) 431 step = AgentStep( 432 thought=f"Executing plan step {i+1}: {plan_step}", 433 action="llm_execution", 434 action_input={"step": plan_step}, 435 observation=result_text, 436 ) 437 yield step
Execute the task, yielding each step as it completes.
31class ReflexionAgent(BaseAgent): 32 """ 33 Wraps another agent and applies self-reflection on failure. 34 35 After each attempt, a separate LLM call critiques the output. If the 36 critique says the result is unsatisfactory, the agent reflects and retries 37 with an improved prompt up to ``max_reflections`` times. 38 39 Args: 40 inner_agent: The underlying agent to run and reflect on. 41 max_reflections: Maximum reflection/retry cycles (default: 2). 42 model: Model for critique and reflection calls (defaults to inner agent 43 model or gateway default). 44 critique_prompt: Override the critique prompt. Must contain ``{task}`` 45 and ``{response}``. 46 reflect_prompt: Override the reflection/retry prompt. Must contain 47 ``{task}``, ``{previous_output}``, and ``{critique}``. 48 49 All other args inherited from :class:`BaseAgent`. 50 """ 51 52 def __init__( 53 self, 54 inner_agent: BaseAgent, 55 *args: Any, 56 max_reflections: int = 2, 57 model: Optional[str] = None, 58 critique_prompt: Optional[str] = None, 59 reflect_prompt: Optional[str] = None, 60 **kwargs: Any, 61 ) -> None: 62 super().__init__(inner_agent.llm_gateway, *args, **kwargs) 63 self.inner_agent = inner_agent 64 self.max_reflections = max_reflections 65 self.model = model 66 self._critique_prompt = critique_prompt 67 self._reflect_prompt = reflect_prompt 68 69 #: Default critique prompt. Must contain ``{task}`` and ``{response}`` if overriding. 70 DEFAULT_CRITIQUE_PROMPT: str = _CRITIQUE_PROMPT 71 #: Default reflection prompt. Must contain ``{task}``, ``{previous_output}``, 72 #: and ``{critique}`` if overriding. 73 DEFAULT_REFLECT_PROMPT: str = _REFLECT_PROMPT 74 75 async def _critique(self, task: str, output: str) -> Optional[str]: 76 """Returns the critique reason string, or None if the output is satisfactory.""" 77 template = self._critique_prompt if self._critique_prompt is not None else _CRITIQUE_PROMPT 78 prompt = template.format(task=task, response=output) 79 response = await self.llm_gateway.complete(prompt, model=self.model, temperature=0.0) 80 text = response.content.strip() 81 if text.upper().startswith("YES"): 82 return None 83 # Extract reason after "NO:" 84 if ":" in text: 85 return text.split(":", 1)[1].strip() 86 return text 87 88 async def _reflect_and_retry( 89 self, task: str, previous_output: str, critique: str 90 ) -> AgentResult: 91 template = self._reflect_prompt if self._reflect_prompt is not None else _REFLECT_PROMPT 92 improved_task = template.format( 93 task=task, previous_output=previous_output, critique=critique 94 ) 95 return await self.inner_agent.execute(improved_task) 96 97 async def _save_checkpoint( 98 self, 99 execution_id: str, 100 task: str, 101 context: Dict[str, Any], 102 reflection_round: int, 103 result: AgentResult, 104 critique: Optional[str] = None, 105 ) -> None: 106 if not self.checkpoint_manager: 107 return 108 await self.checkpoint_manager.save( 109 agent_id=self.agent_id, 110 execution_id=execution_id, 111 state={ 112 "execution_id": execution_id, 113 "task": task, 114 "context": context, 115 "reflection_round": reflection_round, 116 "critique": critique, 117 "result": { 118 "output": result.output, 119 "success": result.success, 120 "error": result.error, 121 "metadata": result.metadata, 122 "steps": [dataclasses.asdict(s) for s in result.steps], 123 }, 124 }, 125 metadata={"reflection_round": reflection_round}, 126 ttl=86400, 127 ) 128 129 async def resume_from( 130 self, execution_id: str, from_reflection: Optional[int] = None 131 ) -> AgentResult: 132 """Resume a reflexion execution from latest (or selected) reflection checkpoint.""" 133 if not self.checkpoint_manager: 134 raise ValueError("Checkpoint manager is not configured for this agent") 135 136 checkpoint = None 137 if from_reflection is not None: 138 checkpoints = await self.checkpoint_manager.list_by_execution(execution_id) 139 for ckpt in checkpoints: 140 if ckpt.state.get("reflection_round") == from_reflection: 141 checkpoint = ckpt 142 break 143 else: 144 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 145 146 if checkpoint is None: 147 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 148 149 state = checkpoint.state 150 task = str(state.get("task", "")) 151 context = state.get("context") or {} 152 reflection_round = int(state.get("reflection_round", 0)) 153 raw = state.get("result") or {} 154 155 steps = [ 156 AgentStep(**s) for s in raw.get("steps", []) if isinstance(s, dict) 157 ] 158 result = AgentResult( 159 output=str(raw.get("output", "")), 160 steps=steps, 161 success=bool(raw.get("success", True)), 162 error=raw.get("error"), 163 metadata=raw.get("metadata", {}), 164 ) 165 166 ctx = BehaviorContext( 167 agent_id=self.agent_id, 168 task=task, 169 execution_id=execution_id, 170 last_completed_step=reflection_round, 171 metadata={"resume_agent": self}, 172 ) 173 ctx = await self._apply_behaviors_before(ctx) 174 175 all_steps: List[AgentStep] = list(result.steps) 176 for idx in range(reflection_round, self.max_reflections): 177 critique = await self._critique(task, result.output) 178 if critique is None: 179 break 180 result = await self._reflect_and_retry(task, result.output, critique) 181 all_steps.extend(result.steps) 182 await self._save_checkpoint( 183 execution_id, 184 task, 185 context, 186 idx + 1, 187 result, 188 critique, 189 ) 190 ctx.last_completed_step = idx + 1 191 192 final = AgentResult( 193 output=result.output, 194 steps=all_steps, 195 success=result.success, 196 error=result.error, 197 metadata={**result.metadata, "execution_id": execution_id}, 198 ) 199 return await self._apply_behaviors_after(ctx, final) 200 201 # ------------------------------------------------------------------ 202 # BaseAgent implementation 203 # ------------------------------------------------------------------ 204 205 async def execute( 206 self, task: str, context: Optional[Dict[str, Any]] = None 207 ) -> AgentResult: 208 execution_id = str(uuid4()) 209 run_context = context or {} 210 ctx = BehaviorContext( 211 agent_id=self.agent_id, 212 task=task, 213 execution_id=execution_id, 214 metadata={"resume_agent": self, **run_context}, 215 ) 216 ctx = await self._apply_behaviors_before(ctx) 217 self._log_execution_start(task) 218 219 with self._tracer.trace( 220 "reflexion_agent.execute", input=task, metadata={"agent_id": self.agent_id} 221 ) as trace: 222 try: 223 result = await self.inner_agent.execute(task, context) 224 all_steps: List[AgentStep] = list(result.steps) 225 await self._save_checkpoint(execution_id, task, run_context, 0, result) 226 ctx.last_completed_step = 0 227 228 for reflection_num in range(self.max_reflections): 229 with trace.span( 230 f"critique_{reflection_num + 1}", input=result.output 231 ) as cspan: 232 critique = await self._critique(task, result.output) 233 cspan.set_output({"satisfactory": critique is None, "critique": critique}) 234 235 if critique is None: 236 self._logger.info( 237 "Reflexion: output accepted", 238 agent_id=self.agent_id, 239 reflection=reflection_num + 1, 240 ) 241 break 242 243 self._logger.info( 244 "Reflexion: retrying", 245 agent_id=self.agent_id, 246 reflection=reflection_num + 1, 247 critique=critique, 248 ) 249 if self._metrics: 250 self._metrics.increment( 251 "agent.reflexions", agent_id=self.agent_id 252 ) 253 254 with trace.span(f"reflect_{reflection_num + 1}", input=critique) as rspan: 255 result = await self._reflect_and_retry(task, result.output, critique) 256 all_steps.extend(result.steps) 257 await self._save_checkpoint( 258 execution_id, 259 task, 260 run_context, 261 reflection_num + 1, 262 result, 263 critique, 264 ) 265 ctx.last_completed_step = reflection_num + 1 266 rspan.set_output(result.output) 267 268 final = AgentResult( 269 output=result.output, 270 steps=all_steps, 271 success=result.success, 272 error=result.error, 273 metadata={**result.metadata, "execution_id": execution_id}, 274 ) 275 final = await self._apply_behaviors_after(ctx, final) 276 self._log_execution_end(task, success=final.success, steps=len(all_steps)) 277 trace.set_output(final.output) 278 return final 279 280 except Exception as exc: 281 self._log_execution_error(task, exc) 282 fallback = await self._apply_behaviors_on_error(ctx, exc) 283 if fallback is not None: 284 return fallback 285 raise 286 287 async def stream_execute( 288 self, task: str, context: Optional[Dict[str, Any]] = None 289 ) -> AsyncIterator[AgentStep]: 290 # Reflexion does full-cycle reflection, so streaming yields inner steps 291 async for step in self.inner_agent.stream_execute(task, context): 292 yield step
Wraps another agent and applies self-reflection on failure.
After each attempt, a separate LLM call critiques the output. If the
critique says the result is unsatisfactory, the agent reflects and retries
with an improved prompt up to max_reflections times.
Args:
inner_agent: The underlying agent to run and reflect on.
max_reflections: Maximum reflection/retry cycles (default: 2).
model: Model for critique and reflection calls (defaults to inner agent
model or gateway default).
critique_prompt: Override the critique prompt. Must contain {task}
and {response}.
reflect_prompt: Override the reflection/retry prompt. Must contain
{task}, {previous_output}, and {critique}.
All other args inherited from BaseAgent.
52 def __init__( 53 self, 54 inner_agent: BaseAgent, 55 *args: Any, 56 max_reflections: int = 2, 57 model: Optional[str] = None, 58 critique_prompt: Optional[str] = None, 59 reflect_prompt: Optional[str] = None, 60 **kwargs: Any, 61 ) -> None: 62 super().__init__(inner_agent.llm_gateway, *args, **kwargs) 63 self.inner_agent = inner_agent 64 self.max_reflections = max_reflections 65 self.model = model 66 self._critique_prompt = critique_prompt 67 self._reflect_prompt = reflect_prompt
129 async def resume_from( 130 self, execution_id: str, from_reflection: Optional[int] = None 131 ) -> AgentResult: 132 """Resume a reflexion execution from latest (or selected) reflection checkpoint.""" 133 if not self.checkpoint_manager: 134 raise ValueError("Checkpoint manager is not configured for this agent") 135 136 checkpoint = None 137 if from_reflection is not None: 138 checkpoints = await self.checkpoint_manager.list_by_execution(execution_id) 139 for ckpt in checkpoints: 140 if ckpt.state.get("reflection_round") == from_reflection: 141 checkpoint = ckpt 142 break 143 else: 144 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 145 146 if checkpoint is None: 147 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 148 149 state = checkpoint.state 150 task = str(state.get("task", "")) 151 context = state.get("context") or {} 152 reflection_round = int(state.get("reflection_round", 0)) 153 raw = state.get("result") or {} 154 155 steps = [ 156 AgentStep(**s) for s in raw.get("steps", []) if isinstance(s, dict) 157 ] 158 result = AgentResult( 159 output=str(raw.get("output", "")), 160 steps=steps, 161 success=bool(raw.get("success", True)), 162 error=raw.get("error"), 163 metadata=raw.get("metadata", {}), 164 ) 165 166 ctx = BehaviorContext( 167 agent_id=self.agent_id, 168 task=task, 169 execution_id=execution_id, 170 last_completed_step=reflection_round, 171 metadata={"resume_agent": self}, 172 ) 173 ctx = await self._apply_behaviors_before(ctx) 174 175 all_steps: List[AgentStep] = list(result.steps) 176 for idx in range(reflection_round, self.max_reflections): 177 critique = await self._critique(task, result.output) 178 if critique is None: 179 break 180 result = await self._reflect_and_retry(task, result.output, critique) 181 all_steps.extend(result.steps) 182 await self._save_checkpoint( 183 execution_id, 184 task, 185 context, 186 idx + 1, 187 result, 188 critique, 189 ) 190 ctx.last_completed_step = idx + 1 191 192 final = AgentResult( 193 output=result.output, 194 steps=all_steps, 195 success=result.success, 196 error=result.error, 197 metadata={**result.metadata, "execution_id": execution_id}, 198 ) 199 return await self._apply_behaviors_after(ctx, final)
Resume a reflexion execution from latest (or selected) reflection checkpoint.
205 async def execute( 206 self, task: str, context: Optional[Dict[str, Any]] = None 207 ) -> AgentResult: 208 execution_id = str(uuid4()) 209 run_context = context or {} 210 ctx = BehaviorContext( 211 agent_id=self.agent_id, 212 task=task, 213 execution_id=execution_id, 214 metadata={"resume_agent": self, **run_context}, 215 ) 216 ctx = await self._apply_behaviors_before(ctx) 217 self._log_execution_start(task) 218 219 with self._tracer.trace( 220 "reflexion_agent.execute", input=task, metadata={"agent_id": self.agent_id} 221 ) as trace: 222 try: 223 result = await self.inner_agent.execute(task, context) 224 all_steps: List[AgentStep] = list(result.steps) 225 await self._save_checkpoint(execution_id, task, run_context, 0, result) 226 ctx.last_completed_step = 0 227 228 for reflection_num in range(self.max_reflections): 229 with trace.span( 230 f"critique_{reflection_num + 1}", input=result.output 231 ) as cspan: 232 critique = await self._critique(task, result.output) 233 cspan.set_output({"satisfactory": critique is None, "critique": critique}) 234 235 if critique is None: 236 self._logger.info( 237 "Reflexion: output accepted", 238 agent_id=self.agent_id, 239 reflection=reflection_num + 1, 240 ) 241 break 242 243 self._logger.info( 244 "Reflexion: retrying", 245 agent_id=self.agent_id, 246 reflection=reflection_num + 1, 247 critique=critique, 248 ) 249 if self._metrics: 250 self._metrics.increment( 251 "agent.reflexions", agent_id=self.agent_id 252 ) 253 254 with trace.span(f"reflect_{reflection_num + 1}", input=critique) as rspan: 255 result = await self._reflect_and_retry(task, result.output, critique) 256 all_steps.extend(result.steps) 257 await self._save_checkpoint( 258 execution_id, 259 task, 260 run_context, 261 reflection_num + 1, 262 result, 263 critique, 264 ) 265 ctx.last_completed_step = reflection_num + 1 266 rspan.set_output(result.output) 267 268 final = AgentResult( 269 output=result.output, 270 steps=all_steps, 271 success=result.success, 272 error=result.error, 273 metadata={**result.metadata, "execution_id": execution_id}, 274 ) 275 final = await self._apply_behaviors_after(ctx, final) 276 self._log_execution_end(task, success=final.success, steps=len(all_steps)) 277 trace.set_output(final.output) 278 return final 279 280 except Exception as exc: 281 self._log_execution_error(task, exc) 282 fallback = await self._apply_behaviors_on_error(ctx, exc) 283 if fallback is not None: 284 return fallback 285 raise
Execute the task and return a result.
287 async def stream_execute( 288 self, task: str, context: Optional[Dict[str, Any]] = None 289 ) -> AsyncIterator[AgentStep]: 290 # Reflexion does full-cycle reflection, so streaming yields inner steps 291 async for step in self.inner_agent.stream_execute(task, context): 292 yield step
Execute the task, yielding each step as it completes.
28class ChainOfThoughtAgent(BaseAgent): 29 """ 30 Prompts the LLM to reason via a structured ``<thinking>`` scratchpad before 31 producing a final answer. 32 33 **When to use:** Tasks requiring structured multi-step reasoning with no 34 external tool calls — classification, analysis, summarisation, math, or 35 any question answerable from the LLM's own knowledge. Single LLM call: 36 lower latency and cost than :class:`ReActAgent`. 37 38 **When NOT to use:** Tasks that require searching a database, calling an 39 API, or any live data lookup — use :class:`ReActAgent` instead. 40 41 .. note:: 42 Passing a ``tool_registry`` to this agent has no effect — tools are 43 never invoked. If you need tool calls, use :class:`ReActAgent`. 44 45 The scratchpad is extracted and stored in the returned :class:`AgentStep` 46 as the ``thought`` field, while the final answer becomes ``observation``. 47 48 Args: 49 model: LLM model name (optional). 50 temperature: Sampling temperature (default: 0.1). 51 system_prompt: Override the default Chain-of-Thought prompt template. 52 Must contain ``{task}`` as a placeholder where the task will be 53 inserted. 54 55 All other args inherited from :class:`BaseAgent`. 56 """ 57 58 def __init__( 59 self, 60 *args: Any, 61 model: Optional[str] = None, 62 temperature: float = 0.1, 63 system_prompt: Optional[str] = None, 64 **kwargs: Any, 65 ) -> None: 66 super().__init__(*args, **kwargs) 67 self.model = model 68 self.temperature = temperature 69 self._system_prompt = system_prompt 70 71 #: Default CoT prompt template used when no ``system_prompt`` is passed. 72 #: Must contain ``{task}`` if overriding. 73 DEFAULT_SYSTEM_PROMPT: str = _COT_PROMPT 74 75 def _build_prompt(self, task: str) -> str: 76 template = self._system_prompt if self._system_prompt is not None else _COT_PROMPT 77 return template.format(task=task) 78 79 def _parse_response(self, text: str) -> tuple[str, str]: 80 """Returns (thinking_scratchpad, final_answer).""" 81 match = _THINKING_PATTERN.search(text) 82 if match: 83 thinking = match.group(1).strip() 84 answer = _THINKING_PATTERN.sub("", text).strip() 85 else: 86 # No <thinking> block — treat everything as the answer 87 thinking = "" 88 answer = text.strip() 89 return thinking, answer 90 91 async def _save_checkpoint( 92 self, 93 execution_id: str, 94 task: str, 95 context: Dict[str, Any], 96 thinking: str, 97 answer: str, 98 ) -> None: 99 if not self.checkpoint_manager: 100 return 101 await self.checkpoint_manager.save( 102 agent_id=self.agent_id, 103 execution_id=execution_id, 104 state={ 105 "execution_id": execution_id, 106 "task": task, 107 "context": context, 108 "step_number": 0, 109 "steps": [ 110 { 111 "thought": thinking, 112 "action": "chain_of_thought", 113 "action_input": {"task": task}, 114 "observation": answer, 115 "metadata": {}, 116 } 117 ], 118 "thinking": thinking, 119 "answer": answer, 120 }, 121 metadata={"step_number": 0}, 122 ttl=86400, 123 ) 124 125 async def resume_from(self, execution_id: str) -> AgentResult: 126 """Resume (idempotently) from a completed Chain-of-Thought checkpoint.""" 127 if not self.checkpoint_manager: 128 raise ValueError("Checkpoint manager is not configured for this agent") 129 130 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 131 if checkpoint is None: 132 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 133 134 state = checkpoint.state 135 thinking = str(state.get("thinking", "")) 136 answer = str(state.get("answer", "")) 137 task = str(state.get("task", "")) 138 139 ctx = BehaviorContext( 140 agent_id=self.agent_id, 141 task=task, 142 execution_id=execution_id, 143 last_completed_step=0, 144 metadata={"resume_agent": self}, 145 ) 146 ctx = await self._apply_behaviors_before(ctx) 147 148 step = AgentStep( 149 thought=thinking, 150 action="chain_of_thought", 151 action_input={"task": task}, 152 observation=answer, 153 ) 154 result = AgentResult( 155 output=answer, 156 steps=[step], 157 success=True, 158 metadata={"thinking": thinking, "execution_id": execution_id}, 159 ) 160 return await self._apply_behaviors_after(ctx, result) 161 162 # ------------------------------------------------------------------ 163 # BaseAgent implementation 164 # ------------------------------------------------------------------ 165 166 async def execute( 167 self, task: str, context: Optional[Dict[str, Any]] = None 168 ) -> AgentResult: 169 execution_id = str(uuid4()) 170 run_context = context or {} 171 ctx = BehaviorContext( 172 agent_id=self.agent_id, 173 task=task, 174 execution_id=execution_id, 175 metadata={"resume_agent": self, **run_context}, 176 ) 177 ctx = await self._apply_behaviors_before(ctx) 178 self._log_execution_start(task) 179 180 with self._tracer.trace( 181 "cot_agent.execute", input=task, metadata={"agent_id": self.agent_id} 182 ) as trace: 183 try: 184 prompt = self._build_prompt(task) 185 186 perf_id = None 187 if self._performance_monitor: 188 perf_id = self._performance_monitor.start_request( 189 provider="llm_gateway", model=self.model or "default" 190 ) 191 192 with trace.generation("llm_call", model=self.model or "default", input=prompt) as gen: 193 try: 194 response = await self.llm_gateway.complete( 195 prompt, model=self.model, temperature=self.temperature 196 ) 197 if self._performance_monitor and perf_id: 198 self._performance_monitor.end_request( 199 request_id=perf_id, 200 prompt_tokens=response.usage.get("prompt_tokens", 0), 201 completion_tokens=response.usage.get("completion_tokens", 0), 202 success=True, 203 ) 204 gen.set_output(response.content) 205 gen.set_token_usage(**response.usage) 206 except Exception as exc: 207 if self._performance_monitor and perf_id: 208 self._performance_monitor.end_request( 209 request_id=perf_id, 210 prompt_tokens=0, 211 completion_tokens=0, 212 success=False, 213 error=str(exc), 214 ) 215 raise 216 217 thinking, answer = self._parse_response(response.content) 218 step = AgentStep( 219 thought=thinking, 220 action="chain_of_thought", 221 action_input={"task": task}, 222 observation=answer, 223 ) 224 225 if self.state_store: 226 await self.state_store.set( 227 f"agent:{self.agent_id}:last_steps", 228 [vars(step)], 229 ttl=3600, 230 ) 231 232 await self._save_checkpoint( 233 execution_id=execution_id, 234 task=task, 235 context=run_context, 236 thinking=thinking, 237 answer=answer, 238 ) 239 ctx.last_completed_step = 0 240 241 result = AgentResult( 242 output=answer, 243 steps=[step], 244 success=True, 245 metadata={"thinking": thinking, "execution_id": execution_id}, 246 ) 247 result = await self._apply_behaviors_after(ctx, result) 248 self._log_execution_end(task, success=True, steps=1) 249 trace.set_output(answer) 250 return result 251 252 except Exception as exc: 253 self._log_execution_error(task, exc) 254 fallback = await self._apply_behaviors_on_error(ctx, exc) 255 if fallback is not None: 256 return fallback 257 raise 258 259 async def stream_execute( 260 self, task: str, context: Optional[Dict[str, Any]] = None 261 ) -> AsyncIterator[AgentStep]: 262 result = await self.execute(task, context) 263 for step in result.steps: 264 yield step
Prompts the LLM to reason via a structured <thinking> scratchpad before
producing a final answer.
When to use: Tasks requiring structured multi-step reasoning with no
external tool calls — classification, analysis, summarisation, math, or
any question answerable from the LLM's own knowledge. Single LLM call:
lower latency and cost than ReActAgent.
When NOT to use: Tasks that require searching a database, calling an
API, or any live data lookup — use ReActAgent instead.
Passing a tool_registry to this agent has no effect — tools are
never invoked. If you need tool calls, use ReActAgent.
The scratchpad is extracted and stored in the returned AgentStep
as the thought field, while the final answer becomes observation.
Args:
model: LLM model name (optional).
temperature: Sampling temperature (default: 0.1).
system_prompt: Override the default Chain-of-Thought prompt template.
Must contain {task} as a placeholder where the task will be
inserted.
All other args inherited from BaseAgent.
58 def __init__( 59 self, 60 *args: Any, 61 model: Optional[str] = None, 62 temperature: float = 0.1, 63 system_prompt: Optional[str] = None, 64 **kwargs: Any, 65 ) -> None: 66 super().__init__(*args, **kwargs) 67 self.model = model 68 self.temperature = temperature 69 self._system_prompt = system_prompt
125 async def resume_from(self, execution_id: str) -> AgentResult: 126 """Resume (idempotently) from a completed Chain-of-Thought checkpoint.""" 127 if not self.checkpoint_manager: 128 raise ValueError("Checkpoint manager is not configured for this agent") 129 130 checkpoint = await self.checkpoint_manager.load_latest_for_execution(execution_id) 131 if checkpoint is None: 132 raise ValueError(f"No checkpoint found for execution_id={execution_id}") 133 134 state = checkpoint.state 135 thinking = str(state.get("thinking", "")) 136 answer = str(state.get("answer", "")) 137 task = str(state.get("task", "")) 138 139 ctx = BehaviorContext( 140 agent_id=self.agent_id, 141 task=task, 142 execution_id=execution_id, 143 last_completed_step=0, 144 metadata={"resume_agent": self}, 145 ) 146 ctx = await self._apply_behaviors_before(ctx) 147 148 step = AgentStep( 149 thought=thinking, 150 action="chain_of_thought", 151 action_input={"task": task}, 152 observation=answer, 153 ) 154 result = AgentResult( 155 output=answer, 156 steps=[step], 157 success=True, 158 metadata={"thinking": thinking, "execution_id": execution_id}, 159 ) 160 return await self._apply_behaviors_after(ctx, result)
Resume (idempotently) from a completed Chain-of-Thought checkpoint.
166 async def execute( 167 self, task: str, context: Optional[Dict[str, Any]] = None 168 ) -> AgentResult: 169 execution_id = str(uuid4()) 170 run_context = context or {} 171 ctx = BehaviorContext( 172 agent_id=self.agent_id, 173 task=task, 174 execution_id=execution_id, 175 metadata={"resume_agent": self, **run_context}, 176 ) 177 ctx = await self._apply_behaviors_before(ctx) 178 self._log_execution_start(task) 179 180 with self._tracer.trace( 181 "cot_agent.execute", input=task, metadata={"agent_id": self.agent_id} 182 ) as trace: 183 try: 184 prompt = self._build_prompt(task) 185 186 perf_id = None 187 if self._performance_monitor: 188 perf_id = self._performance_monitor.start_request( 189 provider="llm_gateway", model=self.model or "default" 190 ) 191 192 with trace.generation("llm_call", model=self.model or "default", input=prompt) as gen: 193 try: 194 response = await self.llm_gateway.complete( 195 prompt, model=self.model, temperature=self.temperature 196 ) 197 if self._performance_monitor and perf_id: 198 self._performance_monitor.end_request( 199 request_id=perf_id, 200 prompt_tokens=response.usage.get("prompt_tokens", 0), 201 completion_tokens=response.usage.get("completion_tokens", 0), 202 success=True, 203 ) 204 gen.set_output(response.content) 205 gen.set_token_usage(**response.usage) 206 except Exception as exc: 207 if self._performance_monitor and perf_id: 208 self._performance_monitor.end_request( 209 request_id=perf_id, 210 prompt_tokens=0, 211 completion_tokens=0, 212 success=False, 213 error=str(exc), 214 ) 215 raise 216 217 thinking, answer = self._parse_response(response.content) 218 step = AgentStep( 219 thought=thinking, 220 action="chain_of_thought", 221 action_input={"task": task}, 222 observation=answer, 223 ) 224 225 if self.state_store: 226 await self.state_store.set( 227 f"agent:{self.agent_id}:last_steps", 228 [vars(step)], 229 ttl=3600, 230 ) 231 232 await self._save_checkpoint( 233 execution_id=execution_id, 234 task=task, 235 context=run_context, 236 thinking=thinking, 237 answer=answer, 238 ) 239 ctx.last_completed_step = 0 240 241 result = AgentResult( 242 output=answer, 243 steps=[step], 244 success=True, 245 metadata={"thinking": thinking, "execution_id": execution_id}, 246 ) 247 result = await self._apply_behaviors_after(ctx, result) 248 self._log_execution_end(task, success=True, steps=1) 249 trace.set_output(answer) 250 return result 251 252 except Exception as exc: 253 self._log_execution_error(task, exc) 254 fallback = await self._apply_behaviors_on_error(ctx, exc) 255 if fallback is not None: 256 return fallback 257 raise
Execute the task and return a result.
259 async def stream_execute( 260 self, task: str, context: Optional[Dict[str, Any]] = None 261 ) -> AsyncIterator[AgentStep]: 262 result = await self.execute(task, context) 263 for step in result.steps: 264 yield step
Execute the task, yielding each step as it completes.
76class A2AClient(BaseAgent): 77 """ 78 Proxies ``execute()`` calls to a remote A2A-compliant agent service. 79 80 Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 ``tasks/send`` over HTTP. 81 Context (user_assertion, obo_token, locale, etc.) is forwarded as 82 ``message.metadata`` in the JSON-RPC payload. 83 84 This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) 85 treat a separately-deployed A2A agent service as a first-class agent — no code 86 changes needed in the orchestrator. 87 88 Args: 89 endpoint_url: Base URL of the remote agent service 90 (e.g. ``"http://search-agent:8080"``). 91 ``/rpc`` is appended automatically. 92 timeout: HTTP request timeout in seconds (default: 30). 93 headers: Extra HTTP headers sent with every request (e.g. ``Authorization``). 94 agent_id: Stable identifier used in logging/metrics. Defaults to ``"A2AClient"``. 95 logger: Optional :class:`BasicLogger`. 96 metrics: Optional :class:`BasicMetricsCollector`. 97 98 Example:: 99 100 agent = A2AClient( 101 endpoint_url="http://search-agent-service:8080", 102 agent_id="search_agent", 103 ) 104 result = await agent.execute("Find recent AI papers") 105 106 Discovery:: 107 108 card = await A2AClient.fetch_agent_card("http://search-agent-service:8080") 109 agent = A2AClient( 110 endpoint_url=card["url"], 111 agent_id=card["name"], 112 ) 113 """ 114 115 def __init__( 116 self, 117 endpoint_url: str, 118 timeout: float = 30.0, 119 headers: Optional[Dict[str, str]] = None, 120 behaviors: Optional[List["BaseBehavior"]] = None, 121 agent_id: Optional[str] = None, 122 logger: Optional[BasicLogger] = None, 123 metrics: Optional[BasicMetricsCollector] = None, 124 ) -> None: 125 super().__init__( 126 llm_gateway=None, 127 behaviors=behaviors, 128 agent_id=agent_id or "A2AClient", 129 logger=logger, 130 metrics=metrics, 131 ) 132 self.endpoint_url = endpoint_url.rstrip("/") 133 self.timeout = timeout 134 self._headers = {"Content-Type": "application/json", **(headers or {})} 135 136 # ------------------------------------------------------------------ 137 # BaseAgent implementation 138 # ------------------------------------------------------------------ 139 140 async def execute( 141 self, task: str, context: Optional[Dict[str, Any]] = None 142 ) -> AgentResult: 143 bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {}) 144 bctx = await self._apply_behaviors_before(bctx) 145 146 while True: 147 self._log_execution_start(task) 148 try: 149 result = await self._call_remote(task, context or {}) 150 except Exception as exc: 151 fallback = await self._apply_behaviors_on_error(bctx, exc) 152 if fallback is RETRY_SENTINEL: 153 bctx.attempt += 1 154 continue 155 if fallback is not None: 156 return fallback # type: ignore[return-value] 157 raise 158 159 result = await self._apply_behaviors_after(bctx, result) 160 self._log_execution_end(task, result.success, len(result.steps)) 161 return result 162 163 async def stream_execute( 164 self, task: str, context: Optional[Dict[str, Any]] = None 165 ) -> AsyncIterator[AgentStep]: 166 """Executes remotely then yields each step from the response.""" 167 result = await self.execute(task, context) 168 for step in result.steps: 169 yield step 170 171 # ------------------------------------------------------------------ 172 # A2A JSON-RPC transport 173 # ------------------------------------------------------------------ 174 175 async def _call_remote(self, task: str, context: Dict[str, Any]) -> AgentResult: 176 """Send an A2A tasks/send JSON-RPC 2.0 request and return an AgentResult.""" 177 url = f"{self.endpoint_url}/rpc" 178 request_id = str(uuid4()) 179 task_id = str(uuid4()) 180 181 payload = JsonRpcRequest( 182 id=request_id, 183 method="tasks/send", 184 params=A2ATaskParams( 185 id=task_id, 186 message=A2AMessage( 187 parts=[A2APart(type="text", text=task)], 188 # Context (user_assertion, obo_token, locale, etc.) is carried 189 # in message.metadata — A2A agents read it from there. 190 metadata=context, 191 ), 192 ), 193 ).model_dump() 194 195 self._logger.info( 196 "A2AClient tasks/send", 197 url=url, 198 agent_id=self.agent_id, 199 task_id=task_id, 200 ) 201 202 async with httpx.AsyncClient(timeout=self.timeout) as client: 203 try: 204 response = await client.post(url, json=payload, headers=self._headers) 205 except httpx.ConnectError as exc: 206 raise A2AClientError( 207 f"Cannot reach remote agent at {url}: {exc}" 208 ) from exc 209 except httpx.TimeoutException as exc: 210 raise A2AClientError( 211 f"Remote agent timed out after {self.timeout}s: {exc}" 212 ) from exc 213 214 if response.status_code != 200: 215 raise A2AClientError( 216 f"Remote agent returned HTTP {response.status_code}: {response.text[:200]}" 217 ) 218 219 return self._parse_response(response.text) 220 221 def _parse_response(self, body: str) -> AgentResult: 222 """Parse an A2A JSON-RPC tasks/send response into an AgentResult.""" 223 try: 224 envelope = json.loads(body) 225 except json.JSONDecodeError as exc: 226 raise A2AClientError(f"Remote agent returned non-JSON body: {body[:200]}") from exc 227 228 if "error" in envelope: 229 err = JsonRpcErrorResponse.model_validate(envelope) 230 raise A2AClientError( 231 f"A2A JSON-RPC error {err.error.code}: {err.error.message}" 232 ) 233 234 try: 235 response = JsonRpcResponse.model_validate(envelope) 236 except Exception as exc: 237 raise A2AClientError(f"Malformed A2A response: {exc}") from exc 238 239 task = response.result 240 success = task.status.state == "completed" 241 error_msg: Optional[str] = task.status.message if not success else None 242 243 # Extract text output from the first artifact's first text part 244 output = "" 245 for artifact in task.artifacts: 246 for part in artifact.parts: 247 if part.type == "text": 248 output = part.text or "" 249 break 250 if output: 251 break 252 253 # Steps — validate each raw step dict through AgentStepModel 254 steps: List[AgentStep] = [] 255 for raw_step in task.metadata.get("steps", []): 256 s = AgentStepModel.model_validate(raw_step) 257 steps.append( 258 AgentStep( 259 thought=s.thought, 260 action=s.action, 261 action_input=s.action_input, 262 observation=s.observation, 263 metadata=s.metadata, 264 ) 265 ) 266 267 # Propagate all task metadata except "steps" (already parsed above) 268 metadata = {k: v for k, v in task.metadata.items() if k != "steps"} 269 metadata["a2a_task_id"] = task.id 270 271 return AgentResult( 272 output=output, 273 steps=steps, 274 metadata=metadata, 275 success=success, 276 error=error_msg, 277 ) 278 279 # ------------------------------------------------------------------ 280 # A2A discovery helper 281 # ------------------------------------------------------------------ 282 283 @staticmethod 284 async def fetch_agent_card( 285 base_url: str, 286 timeout: float = 10.0, 287 headers: Optional[Dict[str, str]] = None, 288 ) -> AgentCard: 289 """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``. 290 291 Returns an :class:`AgentCard` instance. Standard fields: ``name``, 292 ``description``, ``url``, ``version``, ``skills``. 293 294 Args: 295 base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``). 296 timeout: HTTP timeout in seconds. 297 headers: Optional extra request headers. 298 299 Raises: 300 A2AClientError: if the endpoint is unreachable or returns non-200. 301 """ 302 url = f"{base_url.rstrip('/')}/.well-known/agent.json" 303 async with httpx.AsyncClient(timeout=timeout) as client: 304 try: 305 response = await client.get(url, headers=headers or {}) 306 except httpx.ConnectError as exc: 307 raise A2AClientError( 308 f"Cannot reach agent card at {url}: {exc}" 309 ) from exc 310 except httpx.TimeoutException as exc: 311 raise A2AClientError( 312 f"Timed out fetching agent card from {url}: {exc}" 313 ) from exc 314 315 if response.status_code != 200: 316 raise A2AClientError( 317 f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}" 318 ) 319 320 try: 321 return AgentCard.model_validate(response.json()) 322 except json.JSONDecodeError as exc: 323 raise A2AClientError( 324 f"Agent card at {url} returned non-JSON: {response.text[:200]}" 325 ) from exc 326 except Exception as exc: 327 raise A2AClientError( 328 f"Agent card at {url} has invalid schema: {exc}" 329 ) from exc
Proxies execute() calls to a remote A2A-compliant agent service.
Uses the Agent2Agent (A2A) protocol — JSON-RPC 2.0 tasks/send over HTTP.
Context (user_assertion, obo_token, locale, etc.) is forwarded as
message.metadata in the JSON-RPC payload.
This lets any orchestrator (PipelineOrchestrator, SupervisorOrchestrator, etc.) treat a separately-deployed A2A agent service as a first-class agent — no code changes needed in the orchestrator.
Args:
endpoint_url: Base URL of the remote agent service
(e.g. "http://search-agent:8080").
/rpc is appended automatically.
timeout: HTTP request timeout in seconds (default: 30).
headers: Extra HTTP headers sent with every request (e.g. Authorization).
agent_id: Stable identifier used in logging/metrics. Defaults to "A2AClient".
logger: Optional BasicLogger.
metrics: Optional BasicMetricsCollector.
Example::
agent = A2AClient(
endpoint_url="http://search-agent-service:8080",
agent_id="search_agent",
)
result = await agent.execute("Find recent AI papers")
Discovery::
card = await A2AClient.fetch_agent_card("http://search-agent-service:8080")
agent = A2AClient(
endpoint_url=card["url"],
agent_id=card["name"],
)
115 def __init__( 116 self, 117 endpoint_url: str, 118 timeout: float = 30.0, 119 headers: Optional[Dict[str, str]] = None, 120 behaviors: Optional[List["BaseBehavior"]] = None, 121 agent_id: Optional[str] = None, 122 logger: Optional[BasicLogger] = None, 123 metrics: Optional[BasicMetricsCollector] = None, 124 ) -> None: 125 super().__init__( 126 llm_gateway=None, 127 behaviors=behaviors, 128 agent_id=agent_id or "A2AClient", 129 logger=logger, 130 metrics=metrics, 131 ) 132 self.endpoint_url = endpoint_url.rstrip("/") 133 self.timeout = timeout 134 self._headers = {"Content-Type": "application/json", **(headers or {})}
140 async def execute( 141 self, task: str, context: Optional[Dict[str, Any]] = None 142 ) -> AgentResult: 143 bctx = BehaviorContext(agent_id=self.agent_id, task=task, metadata=context or {}) 144 bctx = await self._apply_behaviors_before(bctx) 145 146 while True: 147 self._log_execution_start(task) 148 try: 149 result = await self._call_remote(task, context or {}) 150 except Exception as exc: 151 fallback = await self._apply_behaviors_on_error(bctx, exc) 152 if fallback is RETRY_SENTINEL: 153 bctx.attempt += 1 154 continue 155 if fallback is not None: 156 return fallback # type: ignore[return-value] 157 raise 158 159 result = await self._apply_behaviors_after(bctx, result) 160 self._log_execution_end(task, result.success, len(result.steps)) 161 return result
Execute the task and return a result.
163 async def stream_execute( 164 self, task: str, context: Optional[Dict[str, Any]] = None 165 ) -> AsyncIterator[AgentStep]: 166 """Executes remotely then yields each step from the response.""" 167 result = await self.execute(task, context) 168 for step in result.steps: 169 yield step
Executes remotely then yields each step from the response.
283 @staticmethod 284 async def fetch_agent_card( 285 base_url: str, 286 timeout: float = 10.0, 287 headers: Optional[Dict[str, str]] = None, 288 ) -> AgentCard: 289 """Fetch the A2A Agent Card from ``GET /.well-known/agent.json``. 290 291 Returns an :class:`AgentCard` instance. Standard fields: ``name``, 292 ``description``, ``url``, ``version``, ``skills``. 293 294 Args: 295 base_url: Base URL of the remote agent (e.g. ``"http://agent:8080"``). 296 timeout: HTTP timeout in seconds. 297 headers: Optional extra request headers. 298 299 Raises: 300 A2AClientError: if the endpoint is unreachable or returns non-200. 301 """ 302 url = f"{base_url.rstrip('/')}/.well-known/agent.json" 303 async with httpx.AsyncClient(timeout=timeout) as client: 304 try: 305 response = await client.get(url, headers=headers or {}) 306 except httpx.ConnectError as exc: 307 raise A2AClientError( 308 f"Cannot reach agent card at {url}: {exc}" 309 ) from exc 310 except httpx.TimeoutException as exc: 311 raise A2AClientError( 312 f"Timed out fetching agent card from {url}: {exc}" 313 ) from exc 314 315 if response.status_code != 200: 316 raise A2AClientError( 317 f"GET {url} returned HTTP {response.status_code}: {response.text[:200]}" 318 ) 319 320 try: 321 return AgentCard.model_validate(response.json()) 322 except json.JSONDecodeError as exc: 323 raise A2AClientError( 324 f"Agent card at {url} returned non-JSON: {response.text[:200]}" 325 ) from exc 326 except Exception as exc: 327 raise A2AClientError( 328 f"Agent card at {url} has invalid schema: {exc}" 329 ) from exc
Fetch the A2A Agent Card from GET /.well-known/agent.json.
Returns an AgentCard instance. Standard fields: name,
description, url, version, skills.
Args:
base_url: Base URL of the remote agent (e.g. "http://agent:8080").
timeout: HTTP timeout in seconds.
headers: Optional extra request headers.
Raises: A2AClientError: if the endpoint is unreachable or returns non-200.
72class A2AClientError(RuntimeError): 73 """Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol."""
Raised when the remote A2A agent returns an error, is unreachable, or violates the protocol.
43class A2AAdapter: 44 """Protocol-only A2A receiver adapter. 45 46 The adapter does not know about tools, checkpoints, routing, or any other 47 orchestration concerns. It only translates between the A2A wire protocol and 48 the host agent's execution callback. 49 """ 50 51 def __init__( 52 self, 53 agent_id: str, 54 description: str, 55 url: str, 56 version: str = "0.1.0", 57 skills: Optional[Sequence[Mapping[str, Any]]] = None, 58 logger: Optional[BasicLogger] = None, 59 ) -> None: 60 self.agent_id = agent_id 61 self.description = description 62 self.url = url.rstrip("/") 63 self.version = version 64 self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])] 65 self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}") 66 67 def agent_card(self) -> AgentCard: 68 """Return the A2A Agent Card for ``GET /.well-known/agent.json``.""" 69 return AgentCard( 70 name=self.agent_id, 71 description=self.description, 72 url=self.url, 73 version=self.version, 74 skills=[AgentSkill(**s) for s in self._skills], 75 ) 76 77 def build_jsonrpc_error( 78 self, 79 rpc_id: Optional[str], 80 code: int, 81 message: str, 82 data: Optional[Any] = None, 83 ) -> Dict[str, Any]: 84 """Build a JSON-RPC 2.0 error response.""" 85 return JsonRpcErrorResponse( 86 id=rpc_id, 87 error=JsonRpcError(code=code, message=message, data=data), 88 ).model_dump() 89 90 def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]: 91 """Parse a ``tasks/send`` JSON-RPC request body. 92 93 Returns: 94 A tuple of ``(rpc_id, task_id, task_text, context)``. 95 """ 96 method = str(body.get("method", "")) 97 if method != "tasks/send": 98 raise A2AAdapterError(f"Method not found: {method}") 99 100 params = body.get("params", {}) 101 if not isinstance(params, Mapping): 102 raise A2AAdapterError("Invalid params payload") 103 104 rpc_id = body.get("id") 105 task_id = str(params.get("id") or uuid4()) 106 107 message = params.get("message", {}) 108 if not isinstance(message, Mapping): 109 raise A2AAdapterError("Invalid message payload") 110 111 task_text = self._extract_task_text(message.get("parts", [])) 112 context = message.get("metadata", {}) 113 if not isinstance(context, Mapping): 114 raise A2AAdapterError("Invalid message metadata payload") 115 116 return rpc_id, task_id, task_text, dict(context) 117 118 def build_task_response( 119 self, 120 rpc_id: Optional[str], 121 task_id: str, 122 result: AgentResult, 123 ) -> Dict[str, Any]: 124 """Convert an :class:`AgentResult` into an A2A JSON-RPC success response.""" 125 metadata = dict(result.metadata) 126 metadata["steps"] = [self._serialize_step(step) for step in result.steps] 127 metadata.setdefault("a2a_task_id", task_id) 128 129 artifacts = ( 130 [A2AArtifact(parts=[A2APart(type="text", text=result.output)])] 131 if result.output 132 else [] 133 ) 134 135 return JsonRpcResponse( 136 id=rpc_id, 137 result=A2ATask( 138 id=task_id, 139 status=A2ATaskStatus( 140 state="completed" if result.success else "failed", 141 message=result.error if not result.success else None, 142 ), 143 artifacts=artifacts, 144 metadata=metadata, 145 ), 146 ).model_dump() 147 148 async def handle_rpc( 149 self, 150 body: Mapping[str, Any], 151 execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]], 152 ) -> Dict[str, Any]: 153 """Handle an incoming A2A JSON-RPC body and return the response payload.""" 154 rpc_id = body.get("id") if isinstance(body, Mapping) else None 155 156 try: 157 rpc_id, task_id, task_text, context = self.parse_tasks_send(body) 158 except A2AAdapterError as exc: 159 code = -32601 if str(exc).startswith("Method not found") else -32602 160 return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc)) 161 162 self._logger.info( 163 "A2A tasks/send received", 164 agent_id=self.agent_id, 165 task_id=task_id, 166 task_preview=task_text[:80], 167 ) 168 169 try: 170 result = await execute(task_text, context) 171 except Exception as exc: 172 self._logger.error( 173 "Agent execution failed", 174 agent_id=self.agent_id, 175 task_id=task_id, 176 error=str(exc), 177 ) 178 return self.build_jsonrpc_error( 179 rpc_id if isinstance(rpc_id, str) else None, 180 -32603, 181 str(exc), 182 ) 183 184 return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result) 185 186 def _extract_task_text(self, parts: Any) -> str: 187 if not isinstance(parts, Sequence): 188 raise A2AAdapterError("Invalid message parts payload") 189 190 for part in parts: 191 if isinstance(part, Mapping) and part.get("type") == "text": 192 return str(part.get("text", "")) 193 return "" 194 195 def _serialize_step(self, step: AgentStep) -> Dict[str, Any]: 196 return AgentStepModel( 197 thought=step.thought, 198 action=step.action, 199 action_input=step.action_input, 200 observation=step.observation, 201 metadata=step.metadata, 202 ).model_dump()
Protocol-only A2A receiver adapter.
The adapter does not know about tools, checkpoints, routing, or any other orchestration concerns. It only translates between the A2A wire protocol and the host agent's execution callback.
51 def __init__( 52 self, 53 agent_id: str, 54 description: str, 55 url: str, 56 version: str = "0.1.0", 57 skills: Optional[Sequence[Mapping[str, Any]]] = None, 58 logger: Optional[BasicLogger] = None, 59 ) -> None: 60 self.agent_id = agent_id 61 self.description = description 62 self.url = url.rstrip("/") 63 self.version = version 64 self._skills: List[Dict[str, Any]] = [dict(skill) for skill in (skills or [])] 65 self._logger = logger or BasicLogger(f"gmf_forge_ai.a2a.{self.agent_id}")
67 def agent_card(self) -> AgentCard: 68 """Return the A2A Agent Card for ``GET /.well-known/agent.json``.""" 69 return AgentCard( 70 name=self.agent_id, 71 description=self.description, 72 url=self.url, 73 version=self.version, 74 skills=[AgentSkill(**s) for s in self._skills], 75 )
Return the A2A Agent Card for GET /.well-known/agent.json.
77 def build_jsonrpc_error( 78 self, 79 rpc_id: Optional[str], 80 code: int, 81 message: str, 82 data: Optional[Any] = None, 83 ) -> Dict[str, Any]: 84 """Build a JSON-RPC 2.0 error response.""" 85 return JsonRpcErrorResponse( 86 id=rpc_id, 87 error=JsonRpcError(code=code, message=message, data=data), 88 ).model_dump()
Build a JSON-RPC 2.0 error response.
90 def parse_tasks_send(self, body: Mapping[str, Any]) -> tuple[Optional[str], str, str, Dict[str, Any]]: 91 """Parse a ``tasks/send`` JSON-RPC request body. 92 93 Returns: 94 A tuple of ``(rpc_id, task_id, task_text, context)``. 95 """ 96 method = str(body.get("method", "")) 97 if method != "tasks/send": 98 raise A2AAdapterError(f"Method not found: {method}") 99 100 params = body.get("params", {}) 101 if not isinstance(params, Mapping): 102 raise A2AAdapterError("Invalid params payload") 103 104 rpc_id = body.get("id") 105 task_id = str(params.get("id") or uuid4()) 106 107 message = params.get("message", {}) 108 if not isinstance(message, Mapping): 109 raise A2AAdapterError("Invalid message payload") 110 111 task_text = self._extract_task_text(message.get("parts", [])) 112 context = message.get("metadata", {}) 113 if not isinstance(context, Mapping): 114 raise A2AAdapterError("Invalid message metadata payload") 115 116 return rpc_id, task_id, task_text, dict(context)
Parse a tasks/send JSON-RPC request body.
Returns:
A tuple of (rpc_id, task_id, task_text, context).
118 def build_task_response( 119 self, 120 rpc_id: Optional[str], 121 task_id: str, 122 result: AgentResult, 123 ) -> Dict[str, Any]: 124 """Convert an :class:`AgentResult` into an A2A JSON-RPC success response.""" 125 metadata = dict(result.metadata) 126 metadata["steps"] = [self._serialize_step(step) for step in result.steps] 127 metadata.setdefault("a2a_task_id", task_id) 128 129 artifacts = ( 130 [A2AArtifact(parts=[A2APart(type="text", text=result.output)])] 131 if result.output 132 else [] 133 ) 134 135 return JsonRpcResponse( 136 id=rpc_id, 137 result=A2ATask( 138 id=task_id, 139 status=A2ATaskStatus( 140 state="completed" if result.success else "failed", 141 message=result.error if not result.success else None, 142 ), 143 artifacts=artifacts, 144 metadata=metadata, 145 ), 146 ).model_dump()
Convert an AgentResult into an A2A JSON-RPC success response.
148 async def handle_rpc( 149 self, 150 body: Mapping[str, Any], 151 execute: Callable[[str, Dict[str, Any]], Awaitable[AgentResult]], 152 ) -> Dict[str, Any]: 153 """Handle an incoming A2A JSON-RPC body and return the response payload.""" 154 rpc_id = body.get("id") if isinstance(body, Mapping) else None 155 156 try: 157 rpc_id, task_id, task_text, context = self.parse_tasks_send(body) 158 except A2AAdapterError as exc: 159 code = -32601 if str(exc).startswith("Method not found") else -32602 160 return self.build_jsonrpc_error(rpc_id if isinstance(rpc_id, str) else None, code, str(exc)) 161 162 self._logger.info( 163 "A2A tasks/send received", 164 agent_id=self.agent_id, 165 task_id=task_id, 166 task_preview=task_text[:80], 167 ) 168 169 try: 170 result = await execute(task_text, context) 171 except Exception as exc: 172 self._logger.error( 173 "Agent execution failed", 174 agent_id=self.agent_id, 175 task_id=task_id, 176 error=str(exc), 177 ) 178 return self.build_jsonrpc_error( 179 rpc_id if isinstance(rpc_id, str) else None, 180 -32603, 181 str(exc), 182 ) 183 184 return self.build_task_response(rpc_id if isinstance(rpc_id, str) else None, task_id, result)
Handle an incoming A2A JSON-RPC body and return the response payload.
39class A2AAdapterError(RuntimeError): 40 """Raised when an incoming A2A request is malformed or unsupported."""
Raised when an incoming A2A request is malformed or unsupported.