gmf_forge_ai_orchestration.multi_agent

Multi-agent orchestration — Supervisor, Pipeline, Debate, and Swarm.

 1"""Multi-agent orchestration — Supervisor, Pipeline, Debate, and Swarm."""
 2
 3from gmf_forge_ai_orchestration.multi_agent.base import BaseOrchestrator, OrchestratorResult
 4from gmf_forge_ai_orchestration.multi_agent.supervisor import SupervisorOrchestrator
 5from gmf_forge_ai_orchestration.multi_agent.pipeline import PipelineOrchestrator
 6from gmf_forge_ai_orchestration.multi_agent.debate import DebateOrchestrator
 7from gmf_forge_ai_orchestration.multi_agent.swarm import SwarmOrchestrator
 8
 9__all__ = [
10    "BaseOrchestrator",
11    "OrchestratorResult",
12    "SupervisorOrchestrator",
13    "PipelineOrchestrator",
14    "DebateOrchestrator",
15    "SwarmOrchestrator",
16]
class BaseOrchestrator(abc.ABC):
30class BaseOrchestrator(ABC):
31    """
32    Abstract base class for multi-agent orchestrators.
33
34    Args:
35        logger: Optional :class:`BasicLogger`.
36        metrics: Optional :class:`BasicMetricsCollector`.
37        tracer: Optional :class:`TracingProvider`. Falls back to ``get_tracer()``.
38    """
39
40    def __init__(
41        self,
42        router: Optional["BaseRouter"] = None,
43        logger: Optional[BasicLogger] = None,
44        metrics: Optional[BasicMetricsCollector] = None,
45        tracer: Optional[TracingProvider] = None,
46    ) -> None:
47        self._logger = logger or BasicLogger(
48            f"gmf_forge_ai.orchestrator.{self.__class__.__name__}"
49        )
50        self._metrics = metrics
51        self._tracer = tracer or get_tracer()
52        self.router: Optional["BaseRouter"] = router
53
54    @abstractmethod
55    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
56        """Execute the multi-agent orchestration and return the aggregated result."""
57
58    def _log_start(self, task: str) -> None:
59        self._logger.info("Orchestrator started", orchestrator=self.__class__.__name__, task=task)
60        if self._metrics:
61            self._metrics.increment("orchestrator.runs", orchestrator=self.__class__.__name__)
62
63    def _log_agent_dispatch(self, agent_id: str, task: str) -> None:
64        self._logger.info("Dispatching to agent", agent_id=agent_id, task=task[:100])
65
66    def _log_finished(self, rounds: int, success: bool) -> None:
67        self._logger.info(
68            "Orchestrator finished",
69            orchestrator=self.__class__.__name__,
70            rounds=rounds,
71            success=success,
72        )
73        if self._metrics:
74            self._metrics.increment("orchestrator.rounds", count=rounds)

Abstract base class for multi-agent orchestrators.

Args: logger: Optional BasicLogger. metrics: Optional BasicMetricsCollector. tracer: Optional TracingProvider. Falls back to get_tracer().

@abstractmethod
async def run( self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
54    @abstractmethod
55    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
56        """Execute the multi-agent orchestration and return the aggregated result."""

Execute the multi-agent orchestration and return the aggregated result.

@dataclass
class OrchestratorResult:
17@dataclass
18class OrchestratorResult:
19    """The aggregated result of a multi-agent orchestration run."""
20
21    final_output: str
22    agent_outputs: Dict[str, AgentResult] = field(default_factory=dict)
23    subtask_outputs: List[Tuple[str, str, AgentResult]] = field(default_factory=list)
24    rounds: int = 0
25    success: bool = True
26    error: Optional[str] = None
27    metadata: Dict[str, Any] = field(default_factory=dict)

The aggregated result of a multi-agent orchestration run.

OrchestratorResult( final_output: str, agent_outputs: Dict[str, gmf_forge_ai_orchestration.AgentResult] = <factory>, subtask_outputs: List[Tuple[str, str, gmf_forge_ai_orchestration.AgentResult]] = <factory>, rounds: int = 0, success: bool = True, error: Optional[str] = None, metadata: Dict[str, Any] = <factory>)
final_output: str
agent_outputs: Dict[str, gmf_forge_ai_orchestration.AgentResult]
subtask_outputs: List[Tuple[str, str, gmf_forge_ai_orchestration.AgentResult]]
rounds: int = 0
success: bool = True
error: Optional[str] = None
metadata: Dict[str, Any]
class SupervisorOrchestrator(gmf_forge_ai_orchestration.multi_agent.BaseOrchestrator):
 55class SupervisorOrchestrator(BaseOrchestrator):
 56    """
 57    LLM-based supervisor that plans subtask assignment and synthesizes results.
 58
 59    Phase 1 — Plan: The LLM supervisor decomposes the task and assigns each
 60    subtask to the most appropriate worker agent.
 61    Phase 2 — Execute: All worker agents run (concurrently if not interdependent).
 62    Phase 3 — Synthesize: The supervisor LLM merges all results into a final answer.
 63
 64    Args:
 65        supervisor_gateway: The LLM gateway used by the supervisor.
 66        agents: Mapping of agent name → :class:`BaseAgent`.
 67        agent_descriptions: Optional human-readable descriptions for the supervisor prompt.
 68        supervisor_model: LLM model for supervisor calls (optional).
 69        logger, metrics, tracer: Observability (optional).
 70    """
 71
 72    def __init__(
 73        self,
 74        supervisor_gateway: Any,
 75        agents: Optional[Dict[str, BaseAgent]] = None,
 76        agent_descriptions: Optional[Dict[str, str]] = None,
 77        supervisor_model: Optional[str] = None,
 78        router: Optional[BaseRouter] = None,
 79        logger: Optional[BasicLogger] = None,
 80        metrics: Optional[BasicMetricsCollector] = None,
 81        tracer: Optional[TracingProvider] = None,
 82    ) -> None:
 83        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
 84        self.supervisor_gateway = supervisor_gateway
 85        self.agents: Dict[str, BaseAgent] = agents or {}
 86        self.agent_descriptions: Dict[str, str] = agent_descriptions or {}
 87        self.supervisor_model = supervisor_model
 88
 89    async def _decompose(self, task: str) -> List[str]:
 90        """Use LLM to decompose a task into subtask strings (no agent assignment)."""
 91        prompt = _SUPERVISOR_DECOMPOSE_PROMPT.format(task=task)
 92        response = await self.supervisor_gateway.complete(
 93            prompt, model=self.supervisor_model, temperature=0.0
 94        )
 95        raw = response.content.strip()
 96        match = re.search(r"\[.*\]", raw, re.DOTALL)
 97        if match:
 98            try:
 99                subtasks = json.loads(match.group())
100                if isinstance(subtasks, list) and all(isinstance(s, str) for s in subtasks):
101                    return subtasks
102            except (json.JSONDecodeError, ValueError):
103                pass
104        return [task]  # fallback: treat whole task as one subtask
105
106    async def _plan(self, task: str) -> List[Dict[str, str]]:
107        """Decompose task and assign agents.
108
109        When a ``router`` is injected: the LLM only decomposes the task into
110        subtasks; the router selects the best agent for each subtask.
111
112        When no router is provided: the LLM both decomposes and assigns agents
113        (original behaviour).
114        """
115        if self.router:
116            subtasks = await self._decompose(task)
117            available = list(self.agents.keys())
118            plan: List[Dict[str, str]] = []
119            for subtask in subtasks:
120                decision = await self.router.route(
121                    RoutingRequest(input=subtask, available_agents=available)
122                )
123                agent_name = decision.target if decision.target in self.agents else available[0]
124                plan.append({"agent": agent_name, "subtask": subtask})
125                self._logger.info(
126                    "Router assigned subtask",
127                    agent=agent_name,
128                    confidence=decision.confidence,
129                    subtask=subtask[:80],
130                )
131            return plan
132
133        # --- Original LLM-based plan (no router) ---
134        desc_block = "\n".join(
135            f"- {name}: {self.agent_descriptions.get(name, 'General agent.')}"
136            for name in self.agents
137        )
138        prompt = _SUPERVISOR_PLAN_PROMPT.format(
139            agent_descriptions=desc_block, task=task
140        )
141        response = await self.supervisor_gateway.complete(
142            prompt, model=self.supervisor_model, temperature=0.0
143        )
144        raw = response.content.strip()
145        match = re.search(r"\[.*\]", raw, re.DOTALL)
146        if match:
147            try:
148                llm_plan = json.loads(match.group())
149                if isinstance(llm_plan, list):
150                    return [p for p in llm_plan if "agent" in p and "subtask" in p]
151            except (json.JSONDecodeError, ValueError):
152                pass
153        # Fallback: assign whole task to first agent
154        first_agent = next(iter(self.agents), "")
155        return [{"agent": first_agent, "subtask": task}]
156
157    async def _synthesize(self, task: str, subtask_results: List[Tuple[str, str, AgentResult]]) -> str:
158        results_block = "\n\n".join(
159            f"[{agent}]: {result.output}" for agent, _subtask, result in subtask_results
160        )
161        prompt = _SUPERVISOR_SYNTHESIS_PROMPT.format(
162            task=task, agent_results=results_block
163        )
164        response = await self.supervisor_gateway.complete(
165            prompt, model=self.supervisor_model, temperature=0.1
166        )
167        return response.content.strip()
168
169    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
170        self._log_start(task)
171
172        with self._tracer.trace(
173            "supervisor.run", input=task, metadata={"agent_count": len(self.agents)}
174        ) as trace:
175            try:
176                # Phase 1: Plan
177                with trace.span("supervisor.plan", input=task) as plan_span:
178                    plan = await self._plan(task)
179                    plan_span.set_output({"plan": plan})
180
181                # Phase 2: Execute subtasks
182                agent_outputs: Dict[str, AgentResult] = {}
183                subtask_list: List[Tuple[str, str, AgentResult]] = []
184                for assignment in plan:
185                    agent_name = assignment["agent"]
186                    subtask = assignment["subtask"]
187                    agent = self.agents.get(agent_name)
188                    if agent is None:
189                        self._logger.warning("Unknown agent in plan", agent=agent_name)
190                        continue
191
192                    self._log_agent_dispatch(agent_name, subtask)
193                    with trace.span(f"agent.{agent_name}", input=subtask) as aspan:
194                        result = await agent.execute(subtask, context=context)
195                        agent_outputs[agent_name] = result
196                        subtask_list.append((agent_name, subtask, result))
197                        aspan.set_output(result.output)
198
199                # Phase 3: Synthesize
200                with trace.span("supervisor.synthesize") as syn_span:
201                    final_output = await self._synthesize(task, subtask_list)
202                    syn_span.set_output(final_output)
203
204                self._log_finished(rounds=1, success=True)
205                trace.set_output(final_output)
206                return OrchestratorResult(
207                    final_output=final_output,
208                    agent_outputs=agent_outputs,
209                    subtask_outputs=subtask_list,
210                    rounds=1,
211                    success=True,
212                )
213
214            except Exception as exc:
215                self._logger.error("SupervisorOrchestrator failed", error=str(exc))
216                trace.set_error(exc)
217                return OrchestratorResult(
218                    final_output="",
219                    rounds=1,
220                    success=False,
221                    error=str(exc),
222                )

LLM-based supervisor that plans subtask assignment and synthesizes results.

Phase 1 — Plan: The LLM supervisor decomposes the task and assigns each subtask to the most appropriate worker agent. Phase 2 — Execute: All worker agents run (concurrently if not interdependent). Phase 3 — Synthesize: The supervisor LLM merges all results into a final answer.

Args: supervisor_gateway: The LLM gateway used by the supervisor. agents: Mapping of agent name → BaseAgent. agent_descriptions: Optional human-readable descriptions for the supervisor prompt. supervisor_model: LLM model for supervisor calls (optional). logger, metrics, tracer: Observability (optional).

SupervisorOrchestrator( supervisor_gateway: Any, agents: Optional[Dict[str, gmf_forge_ai_orchestration.BaseAgent]] = None, agent_descriptions: Optional[Dict[str, str]] = None, supervisor_model: Optional[str] = None, router: Optional[gmf_forge_ai_orchestration.BaseRouter] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
72    def __init__(
73        self,
74        supervisor_gateway: Any,
75        agents: Optional[Dict[str, BaseAgent]] = None,
76        agent_descriptions: Optional[Dict[str, str]] = None,
77        supervisor_model: Optional[str] = None,
78        router: Optional[BaseRouter] = None,
79        logger: Optional[BasicLogger] = None,
80        metrics: Optional[BasicMetricsCollector] = None,
81        tracer: Optional[TracingProvider] = None,
82    ) -> None:
83        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
84        self.supervisor_gateway = supervisor_gateway
85        self.agents: Dict[str, BaseAgent] = agents or {}
86        self.agent_descriptions: Dict[str, str] = agent_descriptions or {}
87        self.supervisor_model = supervisor_model
supervisor_gateway
agent_descriptions: Dict[str, str]
supervisor_model
async def run( self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
169    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
170        self._log_start(task)
171
172        with self._tracer.trace(
173            "supervisor.run", input=task, metadata={"agent_count": len(self.agents)}
174        ) as trace:
175            try:
176                # Phase 1: Plan
177                with trace.span("supervisor.plan", input=task) as plan_span:
178                    plan = await self._plan(task)
179                    plan_span.set_output({"plan": plan})
180
181                # Phase 2: Execute subtasks
182                agent_outputs: Dict[str, AgentResult] = {}
183                subtask_list: List[Tuple[str, str, AgentResult]] = []
184                for assignment in plan:
185                    agent_name = assignment["agent"]
186                    subtask = assignment["subtask"]
187                    agent = self.agents.get(agent_name)
188                    if agent is None:
189                        self._logger.warning("Unknown agent in plan", agent=agent_name)
190                        continue
191
192                    self._log_agent_dispatch(agent_name, subtask)
193                    with trace.span(f"agent.{agent_name}", input=subtask) as aspan:
194                        result = await agent.execute(subtask, context=context)
195                        agent_outputs[agent_name] = result
196                        subtask_list.append((agent_name, subtask, result))
197                        aspan.set_output(result.output)
198
199                # Phase 3: Synthesize
200                with trace.span("supervisor.synthesize") as syn_span:
201                    final_output = await self._synthesize(task, subtask_list)
202                    syn_span.set_output(final_output)
203
204                self._log_finished(rounds=1, success=True)
205                trace.set_output(final_output)
206                return OrchestratorResult(
207                    final_output=final_output,
208                    agent_outputs=agent_outputs,
209                    subtask_outputs=subtask_list,
210                    rounds=1,
211                    success=True,
212                )
213
214            except Exception as exc:
215                self._logger.error("SupervisorOrchestrator failed", error=str(exc))
216                trace.set_error(exc)
217                return OrchestratorResult(
218                    final_output="",
219                    rounds=1,
220                    success=False,
221                    error=str(exc),
222                )

Execute the multi-agent orchestration and return the aggregated result.

class PipelineOrchestrator(gmf_forge_ai_orchestration.multi_agent.BaseOrchestrator):
 14class PipelineOrchestrator(BaseOrchestrator):
 15    """
 16    Runs a sequence of agents where the output of each becomes the input to the next.
 17
 18    The first agent in the pipeline receives the original ``task``. Each
 19    subsequent agent receives the previous agent's ``output`` as its task.
 20
 21    Args:
 22        agents: Ordered list of :class:`BaseAgent` instances.
 23        pass_context: If True, the full context dict (including all prior outputs)
 24            is also forwarded to each agent (default: True).
 25        logger, metrics, tracer: Observability (optional).
 26
 27    Example::
 28
 29        pipeline = PipelineOrchestrator(
 30            agents=[search_agent, summarise_agent, translate_agent]
 31        )
 32        result = await pipeline.run("Summarise the latest AI news in Spanish")
 33    """
 34
 35    def __init__(
 36        self,
 37        agents: Optional[List[BaseAgent]] = None,
 38        pass_context: bool = True,
 39        router: Optional[BaseRouter] = None,
 40        logger: Optional[BasicLogger] = None,
 41        metrics: Optional[BasicMetricsCollector] = None,
 42        tracer: Optional[TracingProvider] = None,
 43    ) -> None:
 44        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
 45        self.agents: List[BaseAgent] = agents or []
 46        self.pass_context = pass_context
 47
 48    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 49        self._log_start(task)
 50
 51        agent_outputs: Dict[str, AgentResult] = {}
 52        current_task = task
 53        # Merge caller-supplied context (user_assertion, etc.) so it flows to agents;
 54        # step outputs are accumulated on top under step_N_output keys.
 55        pipeline_context: Dict[str, Any] = {"original_task": task, **(context or {})}
 56
 57        with self._tracer.trace(
 58            "pipeline.run",
 59            input=task,
 60            metadata={"pipeline_length": len(self.agents)},
 61        ) as trace:
 62            try:
 63                for i, agent in enumerate(self.agents):
 64                    # If a router is provided, let it pick the best agent from
 65                    # the pool for the current task rather than using fixed order.
 66                    if self.router:
 67                        available = [a.agent_id for a in self.agents]
 68                        decision = await self.router.route(
 69                            RoutingRequest(input=current_task, available_agents=available)
 70                        )
 71                        agent = next(
 72                            (a for a in self.agents if a.agent_id == decision.target),
 73                            self.agents[i],  # fallback to positional agent
 74                        )
 75                        self._logger.info(
 76                            "Router selected pipeline agent",
 77                            step=i,
 78                            agent=agent.agent_id,
 79                            confidence=decision.confidence,
 80                        )
 81                    agent_id = agent.agent_id or f"agent_{i}"
 82                    self._log_agent_dispatch(agent_id, current_task)
 83
 84                    with trace.span(f"pipeline.step_{i}.{agent_id}", input=current_task) as span:
 85                        # pass_context=True: full accumulated pipeline context (step outputs included)
 86                        # pass_context=False: only caller's original context (no step outputs)
 87                        exec_context = pipeline_context if self.pass_context else (context or None)
 88                        result = await agent.execute(current_task, context=exec_context)
 89                        agent_outputs[f"step_{i}_{agent_id}"] = result
 90                        pipeline_context[f"step_{i}_output"] = result.output
 91                        span.set_output(result.output)
 92
 93                        if not result.success:
 94                            self._logger.warning(
 95                                "Pipeline step failed",
 96                                step=i,
 97                                agent=agent_id,
 98                                error=result.error,
 99                            )
100
101                    # Output of this step becomes the task for the next
102                    current_task = result.output
103
104                self._log_finished(rounds=len(self.agents), success=True)
105                trace.set_output(current_task)
106                return OrchestratorResult(
107                    final_output=current_task,
108                    agent_outputs=agent_outputs,
109                    rounds=len(self.agents),
110                    success=True,
111                )
112
113            except Exception as exc:
114                self._logger.error("PipelineOrchestrator failed", error=str(exc))
115                trace.set_error(exc)
116                return OrchestratorResult(
117                    final_output=current_task,
118                    agent_outputs=agent_outputs,
119                    rounds=len(agent_outputs),
120                    success=False,
121                    error=str(exc),
122                )

Runs a sequence of agents where the output of each becomes the input to the next.

The first agent in the pipeline receives the original task. Each subsequent agent receives the previous agent's output as its task.

Args: agents: Ordered list of BaseAgent instances. pass_context: If True, the full context dict (including all prior outputs) is also forwarded to each agent (default: True). logger, metrics, tracer: Observability (optional).

Example::

pipeline = PipelineOrchestrator(
    agents=[search_agent, summarise_agent, translate_agent]
)
result = await pipeline.run("Summarise the latest AI news in Spanish")
PipelineOrchestrator( agents: Optional[List[gmf_forge_ai_orchestration.BaseAgent]] = None, pass_context: bool = True, router: Optional[gmf_forge_ai_orchestration.BaseRouter] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
35    def __init__(
36        self,
37        agents: Optional[List[BaseAgent]] = None,
38        pass_context: bool = True,
39        router: Optional[BaseRouter] = None,
40        logger: Optional[BasicLogger] = None,
41        metrics: Optional[BasicMetricsCollector] = None,
42        tracer: Optional[TracingProvider] = None,
43    ) -> None:
44        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
45        self.agents: List[BaseAgent] = agents or []
46        self.pass_context = pass_context
pass_context
async def run( self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 48    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 49        self._log_start(task)
 50
 51        agent_outputs: Dict[str, AgentResult] = {}
 52        current_task = task
 53        # Merge caller-supplied context (user_assertion, etc.) so it flows to agents;
 54        # step outputs are accumulated on top under step_N_output keys.
 55        pipeline_context: Dict[str, Any] = {"original_task": task, **(context or {})}
 56
 57        with self._tracer.trace(
 58            "pipeline.run",
 59            input=task,
 60            metadata={"pipeline_length": len(self.agents)},
 61        ) as trace:
 62            try:
 63                for i, agent in enumerate(self.agents):
 64                    # If a router is provided, let it pick the best agent from
 65                    # the pool for the current task rather than using fixed order.
 66                    if self.router:
 67                        available = [a.agent_id for a in self.agents]
 68                        decision = await self.router.route(
 69                            RoutingRequest(input=current_task, available_agents=available)
 70                        )
 71                        agent = next(
 72                            (a for a in self.agents if a.agent_id == decision.target),
 73                            self.agents[i],  # fallback to positional agent
 74                        )
 75                        self._logger.info(
 76                            "Router selected pipeline agent",
 77                            step=i,
 78                            agent=agent.agent_id,
 79                            confidence=decision.confidence,
 80                        )
 81                    agent_id = agent.agent_id or f"agent_{i}"
 82                    self._log_agent_dispatch(agent_id, current_task)
 83
 84                    with trace.span(f"pipeline.step_{i}.{agent_id}", input=current_task) as span:
 85                        # pass_context=True: full accumulated pipeline context (step outputs included)
 86                        # pass_context=False: only caller's original context (no step outputs)
 87                        exec_context = pipeline_context if self.pass_context else (context or None)
 88                        result = await agent.execute(current_task, context=exec_context)
 89                        agent_outputs[f"step_{i}_{agent_id}"] = result
 90                        pipeline_context[f"step_{i}_output"] = result.output
 91                        span.set_output(result.output)
 92
 93                        if not result.success:
 94                            self._logger.warning(
 95                                "Pipeline step failed",
 96                                step=i,
 97                                agent=agent_id,
 98                                error=result.error,
 99                            )
100
101                    # Output of this step becomes the task for the next
102                    current_task = result.output
103
104                self._log_finished(rounds=len(self.agents), success=True)
105                trace.set_output(current_task)
106                return OrchestratorResult(
107                    final_output=current_task,
108                    agent_outputs=agent_outputs,
109                    rounds=len(self.agents),
110                    success=True,
111                )
112
113            except Exception as exc:
114                self._logger.error("PipelineOrchestrator failed", error=str(exc))
115                trace.set_error(exc)
116                return OrchestratorResult(
117                    final_output=current_task,
118                    agent_outputs=agent_outputs,
119                    rounds=len(agent_outputs),
120                    success=False,
121                    error=str(exc),
122                )

Execute the multi-agent orchestration and return the aggregated result.

class DebateOrchestrator(gmf_forge_ai_orchestration.multi_agent.BaseOrchestrator):
 42class DebateOrchestrator(BaseOrchestrator):
 43    """
 44    Runs structured multi-agent debate to improve answer quality.
 45
 46    Each agent generates an initial position. Then for ``debate_rounds``
 47    rounds, each agent critiques other agents' positions and refines its own.
 48    A synthesis LLM call (or the first agent's gateway) produces the final answer.
 49
 50    Args:
 51        agents: List of debating :class:`BaseAgent` instances.
 52        debate_rounds: Number of critique/refine cycles (default: 1).
 53        synthesis_gateway: Optional separate LLM gateway for the final synthesis.
 54            If None, uses the first agent's gateway.
 55        synthesis_model: Model for synthesis (optional).
 56        logger, metrics, tracer: Observability (optional).
 57
 58    Example::
 59
 60        debate = DebateOrchestrator(
 61            agents=[agent_a, agent_b, agent_c],
 62            debate_rounds=2,
 63        )
 64        result = await debate.run("What is the best AI architecture for RAG?")
 65    """
 66
 67    def __init__(
 68        self,
 69        agents: Optional[List[BaseAgent]] = None,
 70        debate_rounds: int = 1,
 71        synthesis_gateway: Optional[Any] = None,
 72        synthesis_model: Optional[str] = None,
 73        router: Optional[BaseRouter] = None,
 74        logger: Optional[BasicLogger] = None,
 75        metrics: Optional[BasicMetricsCollector] = None,
 76        tracer: Optional[TracingProvider] = None,
 77    ) -> None:
 78        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
 79        self.agents: List[BaseAgent] = agents or []
 80        self.debate_rounds = debate_rounds
 81        self._synthesis_gateway = synthesis_gateway
 82        self.synthesis_model = synthesis_model
 83
 84    def _synthesis_gw(self) -> Any:
 85        if self._synthesis_gateway:
 86            return self._synthesis_gateway
 87        if self.agents:
 88            return self.agents[0].llm_gateway
 89        raise ValueError("DebateOrchestrator: no agents or synthesis_gateway provided.")
 90
 91    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 92        self._log_start(task)
 93
 94        with self._tracer.trace(
 95            "debate.run",
 96            input=task,
 97            metadata={"agents": len(self.agents), "rounds": self.debate_rounds},
 98        ) as trace:
 99            try:
100                # Phase 1: Initial positions
101                positions: Dict[str, str] = {}
102                agent_outputs: Dict[str, AgentResult] = {}
103
104                with trace.span("debate.initial_positions") as init_span:
105                    for agent in self.agents:
106                        prompt = _DEBATE_INITIAL_PROMPT.format(task=task)
107                        result = await agent.execute(prompt, context=context)
108                        positions[agent.agent_id] = result.output
109                        agent_outputs[agent.agent_id] = result
110
111                    init_span.set_output({"agents": list(positions.keys())})
112
113                # Phase 2: Critique rounds
114                for round_num in range(self.debate_rounds):
115                    self._logger.info("Debate round", round=round_num + 1)
116                    if self._metrics:
117                        self._metrics.increment("orchestrator.debate_rounds")
118
119                    with trace.span(f"debate.round_{round_num + 1}") as round_span:
120                        new_positions: Dict[str, str] = {}
121                        for agent in self.agents:
122                            others = "\n\n".join(
123                                f"[{aid}]: {pos}"
124                                for aid, pos in positions.items()
125                                if aid != agent.agent_id
126                            )
127                            critique_prompt = _DEBATE_CRITIQUE_PROMPT.format(
128                                task=task,
129                                positions=others,
130                                own_position=positions.get(agent.agent_id, ""),
131                            )
132                            result = await agent.execute(critique_prompt, context=context)
133                            new_positions[agent.agent_id] = result.output
134                            agent_outputs[agent.agent_id] = result
135
136                        positions = new_positions
137                        round_span.set_output({"positions_updated": len(positions)})
138
139                # Phase 3: Synthesis
140                with trace.span("debate.synthesis") as syn_span:
141                    all_positions = "\n\n".join(
142                        f"[{aid}] (round {self.debate_rounds}):\n{pos}"
143                        for aid, pos in positions.items()
144                    )
145                    synthesis_prompt = _DEBATE_SYNTHESIS_PROMPT.format(
146                        task=task,
147                        rounds=self.debate_rounds,
148                        all_positions=all_positions,
149                    )
150                    synthesis_response = await self._synthesis_gw().complete(
151                        synthesis_prompt, model=self.synthesis_model, temperature=0.1
152                    )
153                    final_output = synthesis_response.content.strip()
154                    syn_span.set_output(final_output)
155
156                self._log_finished(rounds=self.debate_rounds, success=True)
157                trace.set_output(final_output)
158                return OrchestratorResult(
159                    final_output=final_output,
160                    agent_outputs=agent_outputs,
161                    rounds=self.debate_rounds,
162                    success=True,
163                )
164
165            except Exception as exc:
166                self._logger.error("DebateOrchestrator failed", error=str(exc))
167                trace.set_error(exc)
168                return OrchestratorResult(
169                    final_output="",
170                    rounds=self.debate_rounds,
171                    success=False,
172                    error=str(exc),
173                )

Runs structured multi-agent debate to improve answer quality.

Each agent generates an initial position. Then for debate_rounds rounds, each agent critiques other agents' positions and refines its own. A synthesis LLM call (or the first agent's gateway) produces the final answer.

Args: agents: List of debating BaseAgent instances. debate_rounds: Number of critique/refine cycles (default: 1). synthesis_gateway: Optional separate LLM gateway for the final synthesis. If None, uses the first agent's gateway. synthesis_model: Model for synthesis (optional). logger, metrics, tracer: Observability (optional).

Example::

debate = DebateOrchestrator(
    agents=[agent_a, agent_b, agent_c],
    debate_rounds=2,
)
result = await debate.run("What is the best AI architecture for RAG?")
DebateOrchestrator( agents: Optional[List[gmf_forge_ai_orchestration.BaseAgent]] = None, debate_rounds: int = 1, synthesis_gateway: Optional[Any] = None, synthesis_model: Optional[str] = None, router: Optional[gmf_forge_ai_orchestration.BaseRouter] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
67    def __init__(
68        self,
69        agents: Optional[List[BaseAgent]] = None,
70        debate_rounds: int = 1,
71        synthesis_gateway: Optional[Any] = None,
72        synthesis_model: Optional[str] = None,
73        router: Optional[BaseRouter] = None,
74        logger: Optional[BasicLogger] = None,
75        metrics: Optional[BasicMetricsCollector] = None,
76        tracer: Optional[TracingProvider] = None,
77    ) -> None:
78        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
79        self.agents: List[BaseAgent] = agents or []
80        self.debate_rounds = debate_rounds
81        self._synthesis_gateway = synthesis_gateway
82        self.synthesis_model = synthesis_model
debate_rounds
synthesis_model
async def run( self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 91    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 92        self._log_start(task)
 93
 94        with self._tracer.trace(
 95            "debate.run",
 96            input=task,
 97            metadata={"agents": len(self.agents), "rounds": self.debate_rounds},
 98        ) as trace:
 99            try:
100                # Phase 1: Initial positions
101                positions: Dict[str, str] = {}
102                agent_outputs: Dict[str, AgentResult] = {}
103
104                with trace.span("debate.initial_positions") as init_span:
105                    for agent in self.agents:
106                        prompt = _DEBATE_INITIAL_PROMPT.format(task=task)
107                        result = await agent.execute(prompt, context=context)
108                        positions[agent.agent_id] = result.output
109                        agent_outputs[agent.agent_id] = result
110
111                    init_span.set_output({"agents": list(positions.keys())})
112
113                # Phase 2: Critique rounds
114                for round_num in range(self.debate_rounds):
115                    self._logger.info("Debate round", round=round_num + 1)
116                    if self._metrics:
117                        self._metrics.increment("orchestrator.debate_rounds")
118
119                    with trace.span(f"debate.round_{round_num + 1}") as round_span:
120                        new_positions: Dict[str, str] = {}
121                        for agent in self.agents:
122                            others = "\n\n".join(
123                                f"[{aid}]: {pos}"
124                                for aid, pos in positions.items()
125                                if aid != agent.agent_id
126                            )
127                            critique_prompt = _DEBATE_CRITIQUE_PROMPT.format(
128                                task=task,
129                                positions=others,
130                                own_position=positions.get(agent.agent_id, ""),
131                            )
132                            result = await agent.execute(critique_prompt, context=context)
133                            new_positions[agent.agent_id] = result.output
134                            agent_outputs[agent.agent_id] = result
135
136                        positions = new_positions
137                        round_span.set_output({"positions_updated": len(positions)})
138
139                # Phase 3: Synthesis
140                with trace.span("debate.synthesis") as syn_span:
141                    all_positions = "\n\n".join(
142                        f"[{aid}] (round {self.debate_rounds}):\n{pos}"
143                        for aid, pos in positions.items()
144                    )
145                    synthesis_prompt = _DEBATE_SYNTHESIS_PROMPT.format(
146                        task=task,
147                        rounds=self.debate_rounds,
148                        all_positions=all_positions,
149                    )
150                    synthesis_response = await self._synthesis_gw().complete(
151                        synthesis_prompt, model=self.synthesis_model, temperature=0.1
152                    )
153                    final_output = synthesis_response.content.strip()
154                    syn_span.set_output(final_output)
155
156                self._log_finished(rounds=self.debate_rounds, success=True)
157                trace.set_output(final_output)
158                return OrchestratorResult(
159                    final_output=final_output,
160                    agent_outputs=agent_outputs,
161                    rounds=self.debate_rounds,
162                    success=True,
163                )
164
165            except Exception as exc:
166                self._logger.error("DebateOrchestrator failed", error=str(exc))
167                trace.set_error(exc)
168                return OrchestratorResult(
169                    final_output="",
170                    rounds=self.debate_rounds,
171                    success=False,
172                    error=str(exc),
173                )

Execute the multi-agent orchestration and return the aggregated result.

class SwarmOrchestrator(gmf_forge_ai_orchestration.multi_agent.BaseOrchestrator):
 29class SwarmOrchestrator(BaseOrchestrator):
 30    """
 31    Dynamically routes work to the best available agent each round.
 32
 33    Each round:
 34    1. A coordinator LLM call decides whether the task is done or describes the
 35       next sub-task.
 36    2. The :class:`BaseRouter` selects the agent best suited for that sub-task.
 37    3. The selected agent executes the sub-task.
 38    4. Results accumulate until ``DONE`` is signalled or ``max_rounds`` is reached.
 39
 40    Args:
 41        coordinator_gateway: LLM gateway used by the coordinator.
 42        agents: Mapping of agent name → :class:`BaseAgent`.
 43        router: :class:`BaseRouter` that selects agents each round.
 44        max_rounds: Maximum dispatch rounds (default: 10).
 45        coordinator_model: Model for coordinator calls (optional).
 46        logger, metrics, tracer: Observability (optional).
 47    """
 48
 49    def __init__(
 50        self,
 51        coordinator_gateway: Any,
 52        agents: Optional[Dict[str, BaseAgent]] = None,
 53        router: Optional[BaseRouter] = None,
 54        max_rounds: int = 10,
 55        coordinator_model: Optional[str] = None,
 56        logger: Optional[BasicLogger] = None,
 57        metrics: Optional[BasicMetricsCollector] = None,
 58        tracer: Optional[TracingProvider] = None,
 59    ) -> None:
 60        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
 61        self.coordinator_gateway = coordinator_gateway
 62        self.agents: Dict[str, BaseAgent] = agents or {}
 63        self.max_rounds = max_rounds
 64        self.coordinator_model = coordinator_model
 65
 66    async def _next_step(self, task: str, history: List[str]) -> Optional[str]:
 67        """Returns None to signal DONE, otherwise the next sub-task description.
 68
 69        When a router is present the coordinator is asked to name an explicit agent
 70        using the structured ``AGENT: / TASK:`` format.  The router then matches on
 71        the agent name directly rather than performing keyword matching on free text.
 72        """
 73        history_text = "\n".join(
 74            f"Round {i+1}: {h}" for i, h in enumerate(history)
 75        ) or "Nothing yet."
 76        agents_list = ", ".join(self.agents.keys()) if self.agents else "(none)"
 77        prompt = _SWARM_CONTINUE_PROMPT.format(
 78            task=task, history=history_text, agents=agents_list
 79        )
 80        response = await self.coordinator_gateway.complete(
 81            prompt, model=self.coordinator_model, temperature=0.0
 82        )
 83        text = response.content.strip()
 84        if text.upper().startswith("DONE"):
 85            return None  # Task complete
 86        return text
 87
 88    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 89        self._log_start(task)
 90
 91        agent_outputs: Dict[str, AgentResult] = {}
 92        history: List[str] = []
 93        final_output = ""
 94        available = list(self.agents.keys())
 95
 96        with self._tracer.trace(
 97            "swarm.run", input=task, metadata={"max_rounds": self.max_rounds}
 98        ) as trace:
 99            try:
100                for round_num in range(self.max_rounds):
101                    with trace.span(f"swarm.round_{round_num + 1}") as round_span:
102                        # Coordinator decides next step
103                        next_task = await self._next_step(task, history)
104                        if next_task is None:
105                            # Extract final answer from DONE line
106                            if history:
107                                done_response = await self.coordinator_gateway.complete(
108                                    f"Summarise the work done:\n" + "\n".join(history),
109                                    model=self.coordinator_model,
110                                    temperature=0.1,
111                                )
112                                final_output = done_response.content.strip()
113                            round_span.set_output("DONE")
114                            break
115
116                        # Route to best agent
117                        routing_req = RoutingRequest(
118                            input=next_task, available_agents=available
119                        )
120                        if self.router:
121                            decision = await self.router.route(routing_req)
122                            agent_name = decision.target
123                        else:
124                            agent_name = available[round_num % len(available)]
125
126                        agent = self.agents.get(agent_name)
127                        if agent is None:
128                            self._logger.warning("Swarm: agent not found", agent=agent_name)
129                            continue
130
131                        # Extract just the task description from structured AGENT:/TASK: format
132                        agent_task = next_task
133                        for line in next_task.splitlines():
134                            stripped = line.strip()
135                            if stripped.upper().startswith("TASK:"):
136                                agent_task = stripped[5:].strip()
137                                break
138
139                        self._log_agent_dispatch(agent_name, agent_task)
140                        result = await agent.execute(agent_task, context=context)
141                        history.append(f"[{agent_name}]: {result.output[:300]}")
142                        agent_outputs[f"round_{round_num + 1}_{agent_name}"] = result
143                        final_output = result.output
144                        round_span.set_output(result.output[:200])
145
146                        if self._metrics:
147                            self._metrics.increment(
148                                "orchestrator.swarm_dispatches", agent=agent_name
149                            )
150
151                self._log_finished(rounds=len(history), success=True)
152                trace.set_output(final_output)
153                return OrchestratorResult(
154                    final_output=final_output,
155                    agent_outputs=agent_outputs,
156                    rounds=len(history),
157                    success=True,
158                )
159
160            except Exception as exc:
161                self._logger.error("SwarmOrchestrator failed", error=str(exc))
162                trace.set_error(exc)
163                return OrchestratorResult(
164                    final_output=final_output,
165                    agent_outputs=agent_outputs,
166                    rounds=len(history),
167                    success=False,
168                    error=str(exc),
169                )

Dynamically routes work to the best available agent each round.

Each round:

  1. A coordinator LLM call decides whether the task is done or describes the next sub-task.
  2. The BaseRouter selects the agent best suited for that sub-task.
  3. The selected agent executes the sub-task.
  4. Results accumulate until DONE is signalled or max_rounds is reached.

Args: coordinator_gateway: LLM gateway used by the coordinator. agents: Mapping of agent name → BaseAgent. router: BaseRouter that selects agents each round. max_rounds: Maximum dispatch rounds (default: 10). coordinator_model: Model for coordinator calls (optional). logger, metrics, tracer: Observability (optional).

SwarmOrchestrator( coordinator_gateway: Any, agents: Optional[Dict[str, gmf_forge_ai_orchestration.BaseAgent]] = None, router: Optional[gmf_forge_ai_orchestration.BaseRouter] = None, max_rounds: int = 10, coordinator_model: Optional[str] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
49    def __init__(
50        self,
51        coordinator_gateway: Any,
52        agents: Optional[Dict[str, BaseAgent]] = None,
53        router: Optional[BaseRouter] = None,
54        max_rounds: int = 10,
55        coordinator_model: Optional[str] = None,
56        logger: Optional[BasicLogger] = None,
57        metrics: Optional[BasicMetricsCollector] = None,
58        tracer: Optional[TracingProvider] = None,
59    ) -> None:
60        super().__init__(router=router, logger=logger, metrics=metrics, tracer=tracer)
61        self.coordinator_gateway = coordinator_gateway
62        self.agents: Dict[str, BaseAgent] = agents or {}
63        self.max_rounds = max_rounds
64        self.coordinator_model = coordinator_model
coordinator_gateway
max_rounds
coordinator_model
async def run( self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 88    async def run(self, task: str, context: Optional[Dict[str, Any]] = None) -> OrchestratorResult:
 89        self._log_start(task)
 90
 91        agent_outputs: Dict[str, AgentResult] = {}
 92        history: List[str] = []
 93        final_output = ""
 94        available = list(self.agents.keys())
 95
 96        with self._tracer.trace(
 97            "swarm.run", input=task, metadata={"max_rounds": self.max_rounds}
 98        ) as trace:
 99            try:
100                for round_num in range(self.max_rounds):
101                    with trace.span(f"swarm.round_{round_num + 1}") as round_span:
102                        # Coordinator decides next step
103                        next_task = await self._next_step(task, history)
104                        if next_task is None:
105                            # Extract final answer from DONE line
106                            if history:
107                                done_response = await self.coordinator_gateway.complete(
108                                    f"Summarise the work done:\n" + "\n".join(history),
109                                    model=self.coordinator_model,
110                                    temperature=0.1,
111                                )
112                                final_output = done_response.content.strip()
113                            round_span.set_output("DONE")
114                            break
115
116                        # Route to best agent
117                        routing_req = RoutingRequest(
118                            input=next_task, available_agents=available
119                        )
120                        if self.router:
121                            decision = await self.router.route(routing_req)
122                            agent_name = decision.target
123                        else:
124                            agent_name = available[round_num % len(available)]
125
126                        agent = self.agents.get(agent_name)
127                        if agent is None:
128                            self._logger.warning("Swarm: agent not found", agent=agent_name)
129                            continue
130
131                        # Extract just the task description from structured AGENT:/TASK: format
132                        agent_task = next_task
133                        for line in next_task.splitlines():
134                            stripped = line.strip()
135                            if stripped.upper().startswith("TASK:"):
136                                agent_task = stripped[5:].strip()
137                                break
138
139                        self._log_agent_dispatch(agent_name, agent_task)
140                        result = await agent.execute(agent_task, context=context)
141                        history.append(f"[{agent_name}]: {result.output[:300]}")
142                        agent_outputs[f"round_{round_num + 1}_{agent_name}"] = result
143                        final_output = result.output
144                        round_span.set_output(result.output[:200])
145
146                        if self._metrics:
147                            self._metrics.increment(
148                                "orchestrator.swarm_dispatches", agent=agent_name
149                            )
150
151                self._log_finished(rounds=len(history), success=True)
152                trace.set_output(final_output)
153                return OrchestratorResult(
154                    final_output=final_output,
155                    agent_outputs=agent_outputs,
156                    rounds=len(history),
157                    success=True,
158                )
159
160            except Exception as exc:
161                self._logger.error("SwarmOrchestrator failed", error=str(exc))
162                trace.set_error(exc)
163                return OrchestratorResult(
164                    final_output=final_output,
165                    agent_outputs=agent_outputs,
166                    rounds=len(history),
167                    success=False,
168                    error=str(exc),
169                )

Execute the multi-agent orchestration and return the aggregated result.