gmf_forge_ai_orchestration.multi_agent
Multi-agent orchestration — Supervisor, Pipeline, Debate, and Swarm.
1"""Multi-agent orchestration — Supervisor, Pipeline, Debate, and Swarm.""" 2 3from gmf_forge_ai_orchestration.multi_agent.base import BaseOrchestrator, OrchestratorResult 4from gmf_forge_ai_orchestration.multi_agent.supervisor import SupervisorOrchestrator 5from gmf_forge_ai_orchestration.multi_agent.pipeline import PipelineOrchestrator 6from gmf_forge_ai_orchestration.multi_agent.debate import DebateOrchestrator 7from gmf_forge_ai_orchestration.multi_agent.swarm import SwarmOrchestrator 8 9__all__ = [ 10 "BaseOrchestrator", 11 "OrchestratorResult", 12 "SupervisorOrchestrator", 13 "PipelineOrchestrator", 14 "DebateOrchestrator", 15 "SwarmOrchestrator", 16]
30class BaseOrchestrator(ABC): 31 """ 32 Abstract base class for multi-agent orchestrators. 33 34 Args: 35 logger: Optional :class:`BasicLogger`. 36 metrics: Optional :class:`BasicMetricsCollector`. 37 tracer: Optional :class:`TracingProvider`. Falls back to ``get_tracer()``. 38 """ 39 40 def __init__( 41 self, 42 router: Optional["BaseRouter"] = None, 43 logger: Optional[BasicLogger] = None, 44 metrics: Optional[BasicMetricsCollector] = None, 45 tracer: Optional[TracingProvider] = None, 46 ) -> None: 47 self._logger = logger or BasicLogger( 48 f"gmf_forge_ai.orchestrator.{self.__class__.__name__}" 49 ) 50 self._metrics = metrics 51 self._tracer = tracer or get_tracer() 52 self.router: Optional["BaseRouter"] = router 53 54 @abstractmethod 55 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 56 """Execute the multi-agent orchestration and return the aggregated result.""" 57 58 def _log_start(self, task: str) -> None: 59 self._logger.info("Orchestrator started", orchestrator=self.__class__.__name__, task=task) 60 if self._metrics: 61 self._metrics.increment("orchestrator.runs", orchestrator=self.__class__.__name__) 62 63 def _log_agent_dispatch(self, agent_id: str, task: str) -> None: 64 self._logger.info("Dispatching to agent", agent_id=agent_id, task=task[:100]) 65 66 def _log_finished(self, rounds: int, success: bool) -> None: 67 self._logger.info( 68 "Orchestrator finished", 69 orchestrator=self.__class__.__name__, 70 rounds=rounds, 71 success=success, 72 ) 73 if self._metrics: 74 self._metrics.increment("orchestrator.rounds", count=rounds)
Abstract base class for multi-agent orchestrators.
Args:
logger: Optional BasicLogger.
metrics: Optional BasicMetricsCollector.
tracer: Optional TracingProvider. Falls back to get_tracer().
54 @abstractmethod 55 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 56 """Execute the multi-agent orchestration and return the aggregated result."""
Execute the multi-agent orchestration and return the aggregated result.
17@dataclass 18class OrchestratorResult: 19 """The aggregated result of a multi-agent orchestration run.""" 20 21 final_output: str 22 agent_outputs: Dict[str, AgentResult] = field(default_factory=dict) 23 subtask_outputs: List[Tuple[str, str, AgentResult]] = field(default_factory=list) 24 rounds: int = 0 25 success: bool = True 26 error: Optional[str] = None 27 metadata: Dict[str, Any] = field(default_factory=dict)
The aggregated result of a multi-agent orchestration run.
55class SupervisorOrchestrator(BaseOrchestrator): 56 """ 57 LLM-based supervisor that plans subtask assignment and synthesizes results. 58 59 Phase 1 — Plan: The LLM supervisor decomposes the task and assigns each 60 subtask to the most appropriate worker agent. 61 Phase 2 — Execute: All worker agents run (concurrently if not interdependent). 62 Phase 3 — Synthesize: The supervisor LLM merges all results into a final answer. 63 64 Args: 65 supervisor_gateway: The LLM gateway used by the supervisor. 66 agents: Mapping of agent name → :class:`BaseAgent`. 67 agent_descriptions: Optional human-readable descriptions for the supervisor prompt. 68 supervisor_model: LLM model for supervisor calls (optional). 69 logger, metrics, tracer: Observability (optional). 70 """ 71 72 def __init__( 73 self, 74 supervisor_gateway: Any, 75 agents: Optional[Dict[str, BaseAgent]] = None, 76 agent_descriptions: Optional[Dict[str, str]] = None, 77 supervisor_model: Optional[str] = None, 78 router: Optional[BaseRouter] = None, 79 logger: Optional[BasicLogger] = None, 80 metrics: Optional[BasicMetricsCollector] = None, 81 tracer: Optional[TracingProvider] = None, 82 ) -> None: 83 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 84 self.supervisor_gateway = supervisor_gateway 85 self.agents: Dict[str, BaseAgent] = agents or {} 86 self.agent_descriptions: Dict[str, str] = agent_descriptions or {} 87 self.supervisor_model = supervisor_model 88 89 async def _decompose(self, task: str) -> List[str]: 90 """Use LLM to decompose a task into subtask strings (no agent assignment).""" 91 prompt = _SUPERVISOR_DECOMPOSE_PROMPT.format(task=task) 92 response = await self.supervisor_gateway.complete( 93 prompt, model=self.supervisor_model, temperature=0.0 94 ) 95 raw = response.content.strip() 96 match = re.search(r"\[.*\]", raw, re.DOTALL) 97 if match: 98 try: 99 subtasks = json.loads(match.group()) 100 if isinstance(subtasks, list) and all(isinstance(s, str) for s in subtasks): 101 return subtasks 102 except (json.JSONDecodeError, ValueError): 103 pass 104 return [task] # fallback: treat whole task as one subtask 105 106 async def _plan(self, task: str) -> List[Dict[str, str]]: 107 """Decompose task and assign agents. 108 109 When a ``router`` is injected: the LLM only decomposes the task into 110 subtasks; the router selects the best agent for each subtask. 111 112 When no router is provided: the LLM both decomposes and assigns agents 113 (original behaviour). 114 """ 115 if self.router: 116 subtasks = await self._decompose(task) 117 available = list(self.agents.keys()) 118 plan: List[Dict[str, str]] = [] 119 for subtask in subtasks: 120 decision = await self.router.route( 121 RoutingRequest(input=subtask, available_agents=available) 122 ) 123 agent_name = decision.target if decision.target in self.agents else available[0] 124 plan.append({"agent": agent_name, "subtask": subtask}) 125 self._logger.info( 126 "Router assigned subtask", 127 agent=agent_name, 128 confidence=decision.confidence, 129 subtask=subtask[:80], 130 ) 131 return plan 132 133 # --- Original LLM-based plan (no router) --- 134 desc_block = "\n".join( 135 f"- {name}: {self.agent_descriptions.get(name, 'General agent.')}" 136 for name in self.agents 137 ) 138 prompt = _SUPERVISOR_PLAN_PROMPT.format( 139 agent_descriptions=desc_block, task=task 140 ) 141 response = await self.supervisor_gateway.complete( 142 prompt, model=self.supervisor_model, temperature=0.0 143 ) 144 raw = response.content.strip() 145 match = re.search(r"\[.*\]", raw, re.DOTALL) 146 if match: 147 try: 148 llm_plan = json.loads(match.group()) 149 if isinstance(llm_plan, list): 150 return [p for p in llm_plan if "agent" in p and "subtask" in p] 151 except (json.JSONDecodeError, ValueError): 152 pass 153 # Fallback: assign whole task to first agent 154 first_agent = next(iter(self.agents), "") 155 return [{"agent": first_agent, "subtask": task}] 156 157 async def _synthesize(self, task: str, subtask_results: List[Tuple[str, str, AgentResult]]) -> str: 158 results_block = "\n\n".join( 159 f"[{agent}]: {result.output}" for agent, _subtask, result in subtask_results 160 ) 161 prompt = _SUPERVISOR_SYNTHESIS_PROMPT.format( 162 task=task, agent_results=results_block 163 ) 164 response = await self.supervisor_gateway.complete( 165 prompt, model=self.supervisor_model, temperature=0.1 166 ) 167 return response.content.strip() 168 169 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 170 self._log_start(task) 171 172 with self._tracer.trace( 173 "supervisor.run", input=task, metadata={"agent_count": len(self.agents)} 174 ) as trace: 175 try: 176 # Phase 1: Plan 177 with trace.span("supervisor.plan", input=task) as plan_span: 178 plan = await self._plan(task) 179 plan_span.set_output({"plan": plan}) 180 181 # Phase 2: Execute subtasks 182 agent_outputs: Dict[str, AgentResult] = {} 183 subtask_list: List[Tuple[str, str, AgentResult]] = [] 184 for assignment in plan: 185 agent_name = assignment["agent"] 186 subtask = assignment["subtask"] 187 agent = self.agents.get(agent_name) 188 if agent is None: 189 self._logger.warning("Unknown agent in plan", agent=agent_name) 190 continue 191 192 self._log_agent_dispatch(agent_name, subtask) 193 with trace.span(f"agent.{agent_name}", input=subtask) as aspan: 194 result = await agent.execute(subtask, context=context) 195 agent_outputs[agent_name] = result 196 subtask_list.append((agent_name, subtask, result)) 197 aspan.set_output(result.output) 198 199 # Phase 3: Synthesize 200 with trace.span("supervisor.synthesize") as syn_span: 201 final_output = await self._synthesize(task, subtask_list) 202 syn_span.set_output(final_output) 203 204 self._log_finished(rounds=1, success=True) 205 trace.set_output(final_output) 206 return OrchestratorResult( 207 final_output=final_output, 208 agent_outputs=agent_outputs, 209 subtask_outputs=subtask_list, 210 rounds=1, 211 success=True, 212 ) 213 214 except Exception as exc: 215 self._logger.error("SupervisorOrchestrator failed", error=str(exc)) 216 trace.set_error(exc) 217 return OrchestratorResult( 218 final_output="", 219 rounds=1, 220 success=False, 221 error=str(exc), 222 )
LLM-based supervisor that plans subtask assignment and synthesizes results.
Phase 1 — Plan: The LLM supervisor decomposes the task and assigns each subtask to the most appropriate worker agent. Phase 2 — Execute: All worker agents run (concurrently if not interdependent). Phase 3 — Synthesize: The supervisor LLM merges all results into a final answer.
Args:
supervisor_gateway: The LLM gateway used by the supervisor.
agents: Mapping of agent name → BaseAgent.
agent_descriptions: Optional human-readable descriptions for the supervisor prompt.
supervisor_model: LLM model for supervisor calls (optional).
logger, metrics, tracer: Observability (optional).
72 def __init__( 73 self, 74 supervisor_gateway: Any, 75 agents: Optional[Dict[str, BaseAgent]] = None, 76 agent_descriptions: Optional[Dict[str, str]] = None, 77 supervisor_model: Optional[str] = None, 78 router: Optional[BaseRouter] = None, 79 logger: Optional[BasicLogger] = None, 80 metrics: Optional[BasicMetricsCollector] = None, 81 tracer: Optional[TracingProvider] = None, 82 ) -> None: 83 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 84 self.supervisor_gateway = supervisor_gateway 85 self.agents: Dict[str, BaseAgent] = agents or {} 86 self.agent_descriptions: Dict[str, str] = agent_descriptions or {} 87 self.supervisor_model = supervisor_model
169 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 170 self._log_start(task) 171 172 with self._tracer.trace( 173 "supervisor.run", input=task, metadata={"agent_count": len(self.agents)} 174 ) as trace: 175 try: 176 # Phase 1: Plan 177 with trace.span("supervisor.plan", input=task) as plan_span: 178 plan = await self._plan(task) 179 plan_span.set_output({"plan": plan}) 180 181 # Phase 2: Execute subtasks 182 agent_outputs: Dict[str, AgentResult] = {} 183 subtask_list: List[Tuple[str, str, AgentResult]] = [] 184 for assignment in plan: 185 agent_name = assignment["agent"] 186 subtask = assignment["subtask"] 187 agent = self.agents.get(agent_name) 188 if agent is None: 189 self._logger.warning("Unknown agent in plan", agent=agent_name) 190 continue 191 192 self._log_agent_dispatch(agent_name, subtask) 193 with trace.span(f"agent.{agent_name}", input=subtask) as aspan: 194 result = await agent.execute(subtask, context=context) 195 agent_outputs[agent_name] = result 196 subtask_list.append((agent_name, subtask, result)) 197 aspan.set_output(result.output) 198 199 # Phase 3: Synthesize 200 with trace.span("supervisor.synthesize") as syn_span: 201 final_output = await self._synthesize(task, subtask_list) 202 syn_span.set_output(final_output) 203 204 self._log_finished(rounds=1, success=True) 205 trace.set_output(final_output) 206 return OrchestratorResult( 207 final_output=final_output, 208 agent_outputs=agent_outputs, 209 subtask_outputs=subtask_list, 210 rounds=1, 211 success=True, 212 ) 213 214 except Exception as exc: 215 self._logger.error("SupervisorOrchestrator failed", error=str(exc)) 216 trace.set_error(exc) 217 return OrchestratorResult( 218 final_output="", 219 rounds=1, 220 success=False, 221 error=str(exc), 222 )
Execute the multi-agent orchestration and return the aggregated result.
14class PipelineOrchestrator(BaseOrchestrator): 15 """ 16 Runs a sequence of agents where the output of each becomes the input to the next. 17 18 The first agent in the pipeline receives the original ``task``. Each 19 subsequent agent receives the previous agent's ``output`` as its task. 20 21 Args: 22 agents: Ordered list of :class:`BaseAgent` instances. 23 pass_context: If True, the full context dict (including all prior outputs) 24 is also forwarded to each agent (default: True). 25 logger, metrics, tracer: Observability (optional). 26 27 Example:: 28 29 pipeline = PipelineOrchestrator( 30 agents=[search_agent, summarise_agent, translate_agent] 31 ) 32 result = await pipeline.run("Summarise the latest AI news in Spanish") 33 """ 34 35 def __init__( 36 self, 37 agents: Optional[List[BaseAgent]] = None, 38 pass_context: bool = True, 39 router: Optional[BaseRouter] = None, 40 logger: Optional[BasicLogger] = None, 41 metrics: Optional[BasicMetricsCollector] = None, 42 tracer: Optional[TracingProvider] = None, 43 ) -> None: 44 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 45 self.agents: List[BaseAgent] = agents or [] 46 self.pass_context = pass_context 47 48 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 49 self._log_start(task) 50 51 agent_outputs: Dict[str, AgentResult] = {} 52 current_task = task 53 # Merge caller-supplied context (user_assertion, etc.) so it flows to agents; 54 # step outputs are accumulated on top under step_N_output keys. 55 pipeline_context: Dict[str, Any] = {"original_task": task, **(context or {})} 56 57 with self._tracer.trace( 58 "pipeline.run", 59 input=task, 60 metadata={"pipeline_length": len(self.agents)}, 61 ) as trace: 62 try: 63 for i, agent in enumerate(self.agents): 64 # If a router is provided, let it pick the best agent from 65 # the pool for the current task rather than using fixed order. 66 if self.router: 67 available = [a.agent_id for a in self.agents] 68 decision = await self.router.route( 69 RoutingRequest(input=current_task, available_agents=available) 70 ) 71 agent = next( 72 (a for a in self.agents if a.agent_id == decision.target), 73 self.agents[i], # fallback to positional agent 74 ) 75 self._logger.info( 76 "Router selected pipeline agent", 77 step=i, 78 agent=agent.agent_id, 79 confidence=decision.confidence, 80 ) 81 agent_id = agent.agent_id or f"agent_{i}" 82 self._log_agent_dispatch(agent_id, current_task) 83 84 with trace.span(f"pipeline.step_{i}.{agent_id}", input=current_task) as span: 85 # pass_context=True: full accumulated pipeline context (step outputs included) 86 # pass_context=False: only caller's original context (no step outputs) 87 exec_context = pipeline_context if self.pass_context else (context or None) 88 result = await agent.execute(current_task, context=exec_context) 89 agent_outputs[f"step_{i}_{agent_id}"] = result 90 pipeline_context[f"step_{i}_output"] = result.output 91 span.set_output(result.output) 92 93 if not result.success: 94 self._logger.warning( 95 "Pipeline step failed", 96 step=i, 97 agent=agent_id, 98 error=result.error, 99 ) 100 101 # Output of this step becomes the task for the next 102 current_task = result.output 103 104 self._log_finished(rounds=len(self.agents), success=True) 105 trace.set_output(current_task) 106 return OrchestratorResult( 107 final_output=current_task, 108 agent_outputs=agent_outputs, 109 rounds=len(self.agents), 110 success=True, 111 ) 112 113 except Exception as exc: 114 self._logger.error("PipelineOrchestrator failed", error=str(exc)) 115 trace.set_error(exc) 116 return OrchestratorResult( 117 final_output=current_task, 118 agent_outputs=agent_outputs, 119 rounds=len(agent_outputs), 120 success=False, 121 error=str(exc), 122 )
Runs a sequence of agents where the output of each becomes the input to the next.
The first agent in the pipeline receives the original task. Each
subsequent agent receives the previous agent's output as its task.
Args:
agents: Ordered list of BaseAgent instances.
pass_context: If True, the full context dict (including all prior outputs)
is also forwarded to each agent (default: True).
logger, metrics, tracer: Observability (optional).
Example::
pipeline = PipelineOrchestrator(
agents=[search_agent, summarise_agent, translate_agent]
)
result = await pipeline.run("Summarise the latest AI news in Spanish")
35 def __init__( 36 self, 37 agents: Optional[List[BaseAgent]] = None, 38 pass_context: bool = True, 39 router: Optional[BaseRouter] = None, 40 logger: Optional[BasicLogger] = None, 41 metrics: Optional[BasicMetricsCollector] = None, 42 tracer: Optional[TracingProvider] = None, 43 ) -> None: 44 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 45 self.agents: List[BaseAgent] = agents or [] 46 self.pass_context = pass_context
48 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 49 self._log_start(task) 50 51 agent_outputs: Dict[str, AgentResult] = {} 52 current_task = task 53 # Merge caller-supplied context (user_assertion, etc.) so it flows to agents; 54 # step outputs are accumulated on top under step_N_output keys. 55 pipeline_context: Dict[str, Any] = {"original_task": task, **(context or {})} 56 57 with self._tracer.trace( 58 "pipeline.run", 59 input=task, 60 metadata={"pipeline_length": len(self.agents)}, 61 ) as trace: 62 try: 63 for i, agent in enumerate(self.agents): 64 # If a router is provided, let it pick the best agent from 65 # the pool for the current task rather than using fixed order. 66 if self.router: 67 available = [a.agent_id for a in self.agents] 68 decision = await self.router.route( 69 RoutingRequest(input=current_task, available_agents=available) 70 ) 71 agent = next( 72 (a for a in self.agents if a.agent_id == decision.target), 73 self.agents[i], # fallback to positional agent 74 ) 75 self._logger.info( 76 "Router selected pipeline agent", 77 step=i, 78 agent=agent.agent_id, 79 confidence=decision.confidence, 80 ) 81 agent_id = agent.agent_id or f"agent_{i}" 82 self._log_agent_dispatch(agent_id, current_task) 83 84 with trace.span(f"pipeline.step_{i}.{agent_id}", input=current_task) as span: 85 # pass_context=True: full accumulated pipeline context (step outputs included) 86 # pass_context=False: only caller's original context (no step outputs) 87 exec_context = pipeline_context if self.pass_context else (context or None) 88 result = await agent.execute(current_task, context=exec_context) 89 agent_outputs[f"step_{i}_{agent_id}"] = result 90 pipeline_context[f"step_{i}_output"] = result.output 91 span.set_output(result.output) 92 93 if not result.success: 94 self._logger.warning( 95 "Pipeline step failed", 96 step=i, 97 agent=agent_id, 98 error=result.error, 99 ) 100 101 # Output of this step becomes the task for the next 102 current_task = result.output 103 104 self._log_finished(rounds=len(self.agents), success=True) 105 trace.set_output(current_task) 106 return OrchestratorResult( 107 final_output=current_task, 108 agent_outputs=agent_outputs, 109 rounds=len(self.agents), 110 success=True, 111 ) 112 113 except Exception as exc: 114 self._logger.error("PipelineOrchestrator failed", error=str(exc)) 115 trace.set_error(exc) 116 return OrchestratorResult( 117 final_output=current_task, 118 agent_outputs=agent_outputs, 119 rounds=len(agent_outputs), 120 success=False, 121 error=str(exc), 122 )
Execute the multi-agent orchestration and return the aggregated result.
42class DebateOrchestrator(BaseOrchestrator): 43 """ 44 Runs structured multi-agent debate to improve answer quality. 45 46 Each agent generates an initial position. Then for ``debate_rounds`` 47 rounds, each agent critiques other agents' positions and refines its own. 48 A synthesis LLM call (or the first agent's gateway) produces the final answer. 49 50 Args: 51 agents: List of debating :class:`BaseAgent` instances. 52 debate_rounds: Number of critique/refine cycles (default: 1). 53 synthesis_gateway: Optional separate LLM gateway for the final synthesis. 54 If None, uses the first agent's gateway. 55 synthesis_model: Model for synthesis (optional). 56 logger, metrics, tracer: Observability (optional). 57 58 Example:: 59 60 debate = DebateOrchestrator( 61 agents=[agent_a, agent_b, agent_c], 62 debate_rounds=2, 63 ) 64 result = await debate.run("What is the best AI architecture for RAG?") 65 """ 66 67 def __init__( 68 self, 69 agents: Optional[List[BaseAgent]] = None, 70 debate_rounds: int = 1, 71 synthesis_gateway: Optional[Any] = None, 72 synthesis_model: Optional[str] = None, 73 router: Optional[BaseRouter] = None, 74 logger: Optional[BasicLogger] = None, 75 metrics: Optional[BasicMetricsCollector] = None, 76 tracer: Optional[TracingProvider] = None, 77 ) -> None: 78 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 79 self.agents: List[BaseAgent] = agents or [] 80 self.debate_rounds = debate_rounds 81 self._synthesis_gateway = synthesis_gateway 82 self.synthesis_model = synthesis_model 83 84 def _synthesis_gw(self) -> Any: 85 if self._synthesis_gateway: 86 return self._synthesis_gateway 87 if self.agents: 88 return self.agents[0].llm_gateway 89 raise ValueError("DebateOrchestrator: no agents or synthesis_gateway provided.") 90 91 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 92 self._log_start(task) 93 94 with self._tracer.trace( 95 "debate.run", 96 input=task, 97 metadata={"agents": len(self.agents), "rounds": self.debate_rounds}, 98 ) as trace: 99 try: 100 # Phase 1: Initial positions 101 positions: Dict[str, str] = {} 102 agent_outputs: Dict[str, AgentResult] = {} 103 104 with trace.span("debate.initial_positions") as init_span: 105 for agent in self.agents: 106 prompt = _DEBATE_INITIAL_PROMPT.format(task=task) 107 result = await agent.execute(prompt, context=context) 108 positions[agent.agent_id] = result.output 109 agent_outputs[agent.agent_id] = result 110 111 init_span.set_output({"agents": list(positions.keys())}) 112 113 # Phase 2: Critique rounds 114 for round_num in range(self.debate_rounds): 115 self._logger.info("Debate round", round=round_num + 1) 116 if self._metrics: 117 self._metrics.increment("orchestrator.debate_rounds") 118 119 with trace.span(f"debate.round_{round_num + 1}") as round_span: 120 new_positions: Dict[str, str] = {} 121 for agent in self.agents: 122 others = "\n\n".join( 123 f"[{aid}]: {pos}" 124 for aid, pos in positions.items() 125 if aid != agent.agent_id 126 ) 127 critique_prompt = _DEBATE_CRITIQUE_PROMPT.format( 128 task=task, 129 positions=others, 130 own_position=positions.get(agent.agent_id, ""), 131 ) 132 result = await agent.execute(critique_prompt, context=context) 133 new_positions[agent.agent_id] = result.output 134 agent_outputs[agent.agent_id] = result 135 136 positions = new_positions 137 round_span.set_output({"positions_updated": len(positions)}) 138 139 # Phase 3: Synthesis 140 with trace.span("debate.synthesis") as syn_span: 141 all_positions = "\n\n".join( 142 f"[{aid}] (round {self.debate_rounds}):\n{pos}" 143 for aid, pos in positions.items() 144 ) 145 synthesis_prompt = _DEBATE_SYNTHESIS_PROMPT.format( 146 task=task, 147 rounds=self.debate_rounds, 148 all_positions=all_positions, 149 ) 150 synthesis_response = await self._synthesis_gw().complete( 151 synthesis_prompt, model=self.synthesis_model, temperature=0.1 152 ) 153 final_output = synthesis_response.content.strip() 154 syn_span.set_output(final_output) 155 156 self._log_finished(rounds=self.debate_rounds, success=True) 157 trace.set_output(final_output) 158 return OrchestratorResult( 159 final_output=final_output, 160 agent_outputs=agent_outputs, 161 rounds=self.debate_rounds, 162 success=True, 163 ) 164 165 except Exception as exc: 166 self._logger.error("DebateOrchestrator failed", error=str(exc)) 167 trace.set_error(exc) 168 return OrchestratorResult( 169 final_output="", 170 rounds=self.debate_rounds, 171 success=False, 172 error=str(exc), 173 )
Runs structured multi-agent debate to improve answer quality.
Each agent generates an initial position. Then for debate_rounds
rounds, each agent critiques other agents' positions and refines its own.
A synthesis LLM call (or the first agent's gateway) produces the final answer.
Args:
agents: List of debating BaseAgent instances.
debate_rounds: Number of critique/refine cycles (default: 1).
synthesis_gateway: Optional separate LLM gateway for the final synthesis.
If None, uses the first agent's gateway.
synthesis_model: Model for synthesis (optional).
logger, metrics, tracer: Observability (optional).
Example::
debate = DebateOrchestrator(
agents=[agent_a, agent_b, agent_c],
debate_rounds=2,
)
result = await debate.run("What is the best AI architecture for RAG?")
67 def __init__( 68 self, 69 agents: Optional[List[BaseAgent]] = None, 70 debate_rounds: int = 1, 71 synthesis_gateway: Optional[Any] = None, 72 synthesis_model: Optional[str] = None, 73 router: Optional[BaseRouter] = None, 74 logger: Optional[BasicLogger] = None, 75 metrics: Optional[BasicMetricsCollector] = None, 76 tracer: Optional[TracingProvider] = None, 77 ) -> None: 78 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 79 self.agents: List[BaseAgent] = agents or [] 80 self.debate_rounds = debate_rounds 81 self._synthesis_gateway = synthesis_gateway 82 self.synthesis_model = synthesis_model
91 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 92 self._log_start(task) 93 94 with self._tracer.trace( 95 "debate.run", 96 input=task, 97 metadata={"agents": len(self.agents), "rounds": self.debate_rounds}, 98 ) as trace: 99 try: 100 # Phase 1: Initial positions 101 positions: Dict[str, str] = {} 102 agent_outputs: Dict[str, AgentResult] = {} 103 104 with trace.span("debate.initial_positions") as init_span: 105 for agent in self.agents: 106 prompt = _DEBATE_INITIAL_PROMPT.format(task=task) 107 result = await agent.execute(prompt, context=context) 108 positions[agent.agent_id] = result.output 109 agent_outputs[agent.agent_id] = result 110 111 init_span.set_output({"agents": list(positions.keys())}) 112 113 # Phase 2: Critique rounds 114 for round_num in range(self.debate_rounds): 115 self._logger.info("Debate round", round=round_num + 1) 116 if self._metrics: 117 self._metrics.increment("orchestrator.debate_rounds") 118 119 with trace.span(f"debate.round_{round_num + 1}") as round_span: 120 new_positions: Dict[str, str] = {} 121 for agent in self.agents: 122 others = "\n\n".join( 123 f"[{aid}]: {pos}" 124 for aid, pos in positions.items() 125 if aid != agent.agent_id 126 ) 127 critique_prompt = _DEBATE_CRITIQUE_PROMPT.format( 128 task=task, 129 positions=others, 130 own_position=positions.get(agent.agent_id, ""), 131 ) 132 result = await agent.execute(critique_prompt, context=context) 133 new_positions[agent.agent_id] = result.output 134 agent_outputs[agent.agent_id] = result 135 136 positions = new_positions 137 round_span.set_output({"positions_updated": len(positions)}) 138 139 # Phase 3: Synthesis 140 with trace.span("debate.synthesis") as syn_span: 141 all_positions = "\n\n".join( 142 f"[{aid}] (round {self.debate_rounds}):\n{pos}" 143 for aid, pos in positions.items() 144 ) 145 synthesis_prompt = _DEBATE_SYNTHESIS_PROMPT.format( 146 task=task, 147 rounds=self.debate_rounds, 148 all_positions=all_positions, 149 ) 150 synthesis_response = await self._synthesis_gw().complete( 151 synthesis_prompt, model=self.synthesis_model, temperature=0.1 152 ) 153 final_output = synthesis_response.content.strip() 154 syn_span.set_output(final_output) 155 156 self._log_finished(rounds=self.debate_rounds, success=True) 157 trace.set_output(final_output) 158 return OrchestratorResult( 159 final_output=final_output, 160 agent_outputs=agent_outputs, 161 rounds=self.debate_rounds, 162 success=True, 163 ) 164 165 except Exception as exc: 166 self._logger.error("DebateOrchestrator failed", error=str(exc)) 167 trace.set_error(exc) 168 return OrchestratorResult( 169 final_output="", 170 rounds=self.debate_rounds, 171 success=False, 172 error=str(exc), 173 )
Execute the multi-agent orchestration and return the aggregated result.
29class SwarmOrchestrator(BaseOrchestrator): 30 """ 31 Dynamically routes work to the best available agent each round. 32 33 Each round: 34 1. A coordinator LLM call decides whether the task is done or describes the 35 next sub-task. 36 2. The :class:`BaseRouter` selects the agent best suited for that sub-task. 37 3. The selected agent executes the sub-task. 38 4. Results accumulate until ``DONE`` is signalled or ``max_rounds`` is reached. 39 40 Args: 41 coordinator_gateway: LLM gateway used by the coordinator. 42 agents: Mapping of agent name → :class:`BaseAgent`. 43 router: :class:`BaseRouter` that selects agents each round. 44 max_rounds: Maximum dispatch rounds (default: 10). 45 coordinator_model: Model for coordinator calls (optional). 46 logger, metrics, tracer: Observability (optional). 47 """ 48 49 def __init__( 50 self, 51 coordinator_gateway: Any, 52 agents: Optional[Dict[str, BaseAgent]] = None, 53 router: Optional[BaseRouter] = None, 54 max_rounds: int = 10, 55 coordinator_model: Optional[str] = None, 56 logger: Optional[BasicLogger] = None, 57 metrics: Optional[BasicMetricsCollector] = None, 58 tracer: Optional[TracingProvider] = None, 59 ) -> None: 60 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 61 self.coordinator_gateway = coordinator_gateway 62 self.agents: Dict[str, BaseAgent] = agents or {} 63 self.max_rounds = max_rounds 64 self.coordinator_model = coordinator_model 65 66 async def _next_step(self, task: str, history: List[str]) -> Optional[str]: 67 """Returns None to signal DONE, otherwise the next sub-task description. 68 69 When a router is present the coordinator is asked to name an explicit agent 70 using the structured ``AGENT: / TASK:`` format. The router then matches on 71 the agent name directly rather than performing keyword matching on free text. 72 """ 73 history_text = "\n".join( 74 f"Round {i+1}: {h}" for i, h in enumerate(history) 75 ) or "Nothing yet." 76 agents_list = ", ".join(self.agents.keys()) if self.agents else "(none)" 77 prompt = _SWARM_CONTINUE_PROMPT.format( 78 task=task, history=history_text, agents=agents_list 79 ) 80 response = await self.coordinator_gateway.complete( 81 prompt, model=self.coordinator_model, temperature=0.0 82 ) 83 text = response.content.strip() 84 if text.upper().startswith("DONE"): 85 return None # Task complete 86 return text 87 88 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 89 self._log_start(task) 90 91 agent_outputs: Dict[str, AgentResult] = {} 92 history: List[str] = [] 93 final_output = "" 94 available = list(self.agents.keys()) 95 96 with self._tracer.trace( 97 "swarm.run", input=task, metadata={"max_rounds": self.max_rounds} 98 ) as trace: 99 try: 100 for round_num in range(self.max_rounds): 101 with trace.span(f"swarm.round_{round_num + 1}") as round_span: 102 # Coordinator decides next step 103 next_task = await self._next_step(task, history) 104 if next_task is None: 105 # Extract final answer from DONE line 106 if history: 107 done_response = await self.coordinator_gateway.complete( 108 f"Summarise the work done:\n" + "\n".join(history), 109 model=self.coordinator_model, 110 temperature=0.1, 111 ) 112 final_output = done_response.content.strip() 113 round_span.set_output("DONE") 114 break 115 116 # Route to best agent 117 routing_req = RoutingRequest( 118 input=next_task, available_agents=available 119 ) 120 if self.router: 121 decision = await self.router.route(routing_req) 122 agent_name = decision.target 123 else: 124 agent_name = available[round_num % len(available)] 125 126 agent = self.agents.get(agent_name) 127 if agent is None: 128 self._logger.warning("Swarm: agent not found", agent=agent_name) 129 continue 130 131 # Extract just the task description from structured AGENT:/TASK: format 132 agent_task = next_task 133 for line in next_task.splitlines(): 134 stripped = line.strip() 135 if stripped.upper().startswith("TASK:"): 136 agent_task = stripped[5:].strip() 137 break 138 139 self._log_agent_dispatch(agent_name, agent_task) 140 result = await agent.execute(agent_task, context=context) 141 history.append(f"[{agent_name}]: {result.output[:300]}") 142 agent_outputs[f"round_{round_num + 1}_{agent_name}"] = result 143 final_output = result.output 144 round_span.set_output(result.output[:200]) 145 146 if self._metrics: 147 self._metrics.increment( 148 "orchestrator.swarm_dispatches", agent=agent_name 149 ) 150 151 self._log_finished(rounds=len(history), success=True) 152 trace.set_output(final_output) 153 return OrchestratorResult( 154 final_output=final_output, 155 agent_outputs=agent_outputs, 156 rounds=len(history), 157 success=True, 158 ) 159 160 except Exception as exc: 161 self._logger.error("SwarmOrchestrator failed", error=str(exc)) 162 trace.set_error(exc) 163 return OrchestratorResult( 164 final_output=final_output, 165 agent_outputs=agent_outputs, 166 rounds=len(history), 167 success=False, 168 error=str(exc), 169 )
Dynamically routes work to the best available agent each round.
Each round:
- A coordinator LLM call decides whether the task is done or describes the next sub-task.
- The
BaseRouterselects the agent best suited for that sub-task. - The selected agent executes the sub-task.
- Results accumulate until
DONEis signalled ormax_roundsis reached.
Args:
coordinator_gateway: LLM gateway used by the coordinator.
agents: Mapping of agent name → BaseAgent.
router: BaseRouter that selects agents each round.
max_rounds: Maximum dispatch rounds (default: 10).
coordinator_model: Model for coordinator calls (optional).
logger, metrics, tracer: Observability (optional).
49 def __init__( 50 self, 51 coordinator_gateway: Any, 52 agents: Optional[Dict[str, BaseAgent]] = None, 53 router: Optional[BaseRouter] = None, 54 max_rounds: int = 10, 55 coordinator_model: Optional[str] = None, 56 logger: Optional[BasicLogger] = None, 57 metrics: Optional[BasicMetricsCollector] = None, 58 tracer: Optional[TracingProvider] = None, 59 ) -> None: 60 super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer) 61 self.coordinator_gateway = coordinator_gateway 62 self.agents: Dict[str, BaseAgent] = agents or {} 63 self.max_rounds = max_rounds 64 self.coordinator_model = coordinator_model
88 async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult: 89 self._log_start(task) 90 91 agent_outputs: Dict[str, AgentResult] = {} 92 history: List[str] = [] 93 final_output = "" 94 available = list(self.agents.keys()) 95 96 with self._tracer.trace( 97 "swarm.run", input=task, metadata={"max_rounds": self.max_rounds} 98 ) as trace: 99 try: 100 for round_num in range(self.max_rounds): 101 with trace.span(f"swarm.round_{round_num + 1}") as round_span: 102 # Coordinator decides next step 103 next_task = await self._next_step(task, history) 104 if next_task is None: 105 # Extract final answer from DONE line 106 if history: 107 done_response = await self.coordinator_gateway.complete( 108 f"Summarise the work done:\n" + "\n".join(history), 109 model=self.coordinator_model, 110 temperature=0.1, 111 ) 112 final_output = done_response.content.strip() 113 round_span.set_output("DONE") 114 break 115 116 # Route to best agent 117 routing_req = RoutingRequest( 118 input=next_task, available_agents=available 119 ) 120 if self.router: 121 decision = await self.router.route(routing_req) 122 agent_name = decision.target 123 else: 124 agent_name = available[round_num % len(available)] 125 126 agent = self.agents.get(agent_name) 127 if agent is None: 128 self._logger.warning("Swarm: agent not found", agent=agent_name) 129 continue 130 131 # Extract just the task description from structured AGENT:/TASK: format 132 agent_task = next_task 133 for line in next_task.splitlines(): 134 stripped = line.strip() 135 if stripped.upper().startswith("TASK:"): 136 agent_task = stripped[5:].strip() 137 break 138 139 self._log_agent_dispatch(agent_name, agent_task) 140 result = await agent.execute(agent_task, context=context) 141 history.append(f"[{agent_name}]: {result.output[:300]}") 142 agent_outputs[f"round_{round_num + 1}_{agent_name}"] = result 143 final_output = result.output 144 round_span.set_output(result.output[:200]) 145 146 if self._metrics: 147 self._metrics.increment( 148 "orchestrator.swarm_dispatches", agent=agent_name 149 ) 150 151 self._log_finished(rounds=len(history), success=True) 152 trace.set_output(final_output) 153 return OrchestratorResult( 154 final_output=final_output, 155 agent_outputs=agent_outputs, 156 rounds=len(history), 157 success=True, 158 ) 159 160 except Exception as exc: 161 self._logger.error("SwarmOrchestrator failed", error=str(exc)) 162 trace.set_error(exc) 163 return OrchestratorResult( 164 final_output=final_output, 165 agent_outputs=agent_outputs, 166 rounds=len(history), 167 success=False, 168 error=str(exc), 169 )
Execute the multi-agent orchestration and return the aggregated result.