gmf_forge_ai_orchestration.routing

Routing — LLM-based, semantic, rule-based, and load-balancing routers.

 1"""Routing — LLM-based, semantic, rule-based, and load-balancing routers."""
 2
 3from gmf_forge_ai_orchestration.routing.base import BaseRouter, RoutingDecision, RoutingRequest
 4from gmf_forge_ai_orchestration.routing.llm_router import LLMRouter
 5from gmf_forge_ai_orchestration.routing.semantic_router import SemanticRouter
 6from gmf_forge_ai_orchestration.routing.rule_router import RuleBasedRouter
 7from gmf_forge_ai_orchestration.routing.load_balance_router import LoadBalancingRouter
 8
 9__all__ = [
10    "BaseRouter",
11    "RoutingDecision",
12    "RoutingRequest",
13    "LLMRouter",
14    "SemanticRouter",
15    "RuleBasedRouter",
16    "LoadBalancingRouter",
17]
class BaseRouter(abc.ABC):
35class BaseRouter(ABC):
36    """
37    Abstract router — selects the appropriate agent for a given request.
38
39    Args:
40        logger: Optional :class:`BasicLogger`.
41        metrics: Optional :class:`BasicMetricsCollector`.
42        tracer: Optional :class:`TracingProvider`. Falls back to ``get_tracer()``.
43    """
44
45    def __init__(
46        self,
47        logger: Optional[BasicLogger] = None,
48        metrics: Optional[BasicMetricsCollector] = None,
49        tracer: Optional[TracingProvider] = None,
50    ) -> None:
51        self._logger = logger or BasicLogger(f"gmf_forge_ai.router.{self.__class__.__name__}")
52        self._metrics = metrics
53        self._tracer = tracer or get_tracer()
54
55    @abstractmethod
56    async def route(self, request: RoutingRequest) -> RoutingDecision:
57        """Select a target agent for the given request."""
58
59    def _record_decision(self, decision: RoutingDecision) -> None:
60        self._logger.info(
61            "Routing decision",
62            target=decision.target,
63            confidence=decision.confidence,
64            reasoning=decision.reasoning,
65        )
66        if self._metrics:
67            self._metrics.increment(
68                "router.decisions",
69                router=self.__class__.__name__,
70                target=decision.target,
71            )

Abstract router — selects the appropriate agent for a given request.

Args: logger: Optional BasicLogger. metrics: Optional BasicMetricsCollector. tracer: Optional TracingProvider. Falls back to get_tracer().

@abstractmethod
async def route( self, request: RoutingRequest) -> RoutingDecision:
55    @abstractmethod
56    async def route(self, request: RoutingRequest) -> RoutingDecision:
57        """Select a target agent for the given request."""

Select a target agent for the given request.

@dataclass
class RoutingDecision:
25@dataclass
26class RoutingDecision:
27    """The result of a routing decision."""
28
29    target: str
30    confidence: float
31    reasoning: str
32    metadata: Dict[str, Any] = field(default_factory=dict)

The result of a routing decision.

RoutingDecision( target: str, confidence: float, reasoning: str, metadata: Dict[str, Any] = <factory>)
target: str
confidence: float
reasoning: str
metadata: Dict[str, Any]
@dataclass
class RoutingRequest:
15@dataclass
16class RoutingRequest:
17    """Encapsulates a routing decision request."""
18
19    input: str
20    available_agents: List[str]
21    context: Dict[str, Any] = field(default_factory=dict)
22    metadata: Dict[str, Any] = field(default_factory=dict)

Encapsulates a routing decision request.

RoutingRequest( input: str, available_agents: List[str], context: Dict[str, Any] = <factory>, metadata: Dict[str, Any] = <factory>)
input: str
available_agents: List[str]
context: Dict[str, Any]
metadata: Dict[str, Any]
class LLMRouter(gmf_forge_ai_orchestration.routing.BaseRouter):
 26class LLMRouter(BaseRouter):
 27    """
 28    Uses the :class:`UnifiedLLMGateway` to select an agent based on descriptions.
 29
 30    Args:
 31        llm_gateway: The LLM gateway to use for routing.
 32        agent_descriptions: Mapping of agent name → description shown to the LLM.
 33        model: LLM model name (optional).
 34        fallback_target: Agent to use if the LLM response cannot be parsed.
 35        logger, metrics, tracer: Observability (optional).
 36
 37    Example::
 38
 39        router = LLMRouter(
 40            llm_gateway=gateway,
 41            agent_descriptions={
 42                "search_agent": "Searches the web for information.",
 43                "code_agent": "Writes and explains code.",
 44            },
 45            fallback_target="search_agent",
 46        )
 47    """
 48
 49    def __init__(
 50        self,
 51        llm_gateway: Any,
 52        agent_descriptions: Optional[Dict[str, str]] = None,
 53        model: Optional[str] = None,
 54        fallback_target: Optional[str] = None,
 55        logger: Optional[BasicLogger] = None,
 56        metrics: Optional[BasicMetricsCollector] = None,
 57        tracer: Optional[TracingProvider] = None,
 58    ) -> None:
 59        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
 60        self.llm_gateway = llm_gateway
 61        self.agent_descriptions: Dict[str, str] = agent_descriptions or {}
 62        self.model = model
 63        self.fallback_target = fallback_target
 64
 65    async def route(self, request: RoutingRequest) -> RoutingDecision:
 66        # Build description block from provided map + request.available_agents
 67        descriptions = {}
 68        for name in request.available_agents:
 69            descriptions[name] = self.agent_descriptions.get(name, "No description provided.")
 70
 71        desc_block = "\n".join(
 72            f"- {name}: {desc}" for name, desc in descriptions.items()
 73        )
 74        prompt = _ROUTER_PROMPT.format(agent_descriptions=desc_block, input=request.input)
 75
 76        with self._tracer.trace("llm_router.route", input=request.input) as trace:
 77            response = await self.llm_gateway.complete(prompt, model=self.model, temperature=0.0)
 78            raw = response.content.strip()
 79
 80            # Extract JSON even if wrapped in markdown
 81            match = re.search(r"\{.*\}", raw, re.DOTALL)
 82            decision: RoutingDecision
 83            if match:
 84                try:
 85                    data = json.loads(match.group())
 86                    target = data.get("target", "")
 87                    if target not in request.available_agents:
 88                        target = self._fallback(request)
 89                    decision = RoutingDecision(
 90                        target=target,
 91                        confidence=float(data.get("confidence", 0.5)),
 92                        reasoning=str(data.get("reasoning", "")),
 93                    )
 94                except (json.JSONDecodeError, ValueError):
 95                    decision = RoutingDecision(
 96                        target=self._fallback(request),
 97                        confidence=0.0,
 98                        reasoning="LLM response could not be parsed.",
 99                    )
100            else:
101                decision = RoutingDecision(
102                    target=self._fallback(request),
103                    confidence=0.0,
104                    reasoning="No JSON found in LLM response.",
105                )
106
107            self._record_decision(decision)
108            trace.set_output({"target": decision.target, "confidence": decision.confidence})
109        return decision
110
111    def _fallback(self, request: RoutingRequest) -> str:
112        if self.fallback_target and self.fallback_target in request.available_agents:
113            return self.fallback_target
114        return request.available_agents[0] if request.available_agents else ""

Uses the UnifiedLLMGateway to select an agent based on descriptions.

Args: llm_gateway: The LLM gateway to use for routing. agent_descriptions: Mapping of agent name → description shown to the LLM. model: LLM model name (optional). fallback_target: Agent to use if the LLM response cannot be parsed. logger, metrics, tracer: Observability (optional).

Example::

router = LLMRouter(
    llm_gateway=gateway,
    agent_descriptions={
        "search_agent": "Searches the web for information.",
        "code_agent": "Writes and explains code.",
    },
    fallback_target="search_agent",
)
LLMRouter( llm_gateway: Any, agent_descriptions: Optional[Dict[str, str]] = None, model: Optional[str] = None, fallback_target: Optional[str] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
49    def __init__(
50        self,
51        llm_gateway: Any,
52        agent_descriptions: Optional[Dict[str, str]] = None,
53        model: Optional[str] = None,
54        fallback_target: Optional[str] = None,
55        logger: Optional[BasicLogger] = None,
56        metrics: Optional[BasicMetricsCollector] = None,
57        tracer: Optional[TracingProvider] = None,
58    ) -> None:
59        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
60        self.llm_gateway = llm_gateway
61        self.agent_descriptions: Dict[str, str] = agent_descriptions or {}
62        self.model = model
63        self.fallback_target = fallback_target
llm_gateway
agent_descriptions: Dict[str, str]
model
fallback_target
async def route( self, request: RoutingRequest) -> RoutingDecision:
 65    async def route(self, request: RoutingRequest) -> RoutingDecision:
 66        # Build description block from provided map + request.available_agents
 67        descriptions = {}
 68        for name in request.available_agents:
 69            descriptions[name] = self.agent_descriptions.get(name, "No description provided.")
 70
 71        desc_block = "\n".join(
 72            f"- {name}: {desc}" for name, desc in descriptions.items()
 73        )
 74        prompt = _ROUTER_PROMPT.format(agent_descriptions=desc_block, input=request.input)
 75
 76        with self._tracer.trace("llm_router.route", input=request.input) as trace:
 77            response = await self.llm_gateway.complete(prompt, model=self.model, temperature=0.0)
 78            raw = response.content.strip()
 79
 80            # Extract JSON even if wrapped in markdown
 81            match = re.search(r"\{.*\}", raw, re.DOTALL)
 82            decision: RoutingDecision
 83            if match:
 84                try:
 85                    data = json.loads(match.group())
 86                    target = data.get("target", "")
 87                    if target not in request.available_agents:
 88                        target = self._fallback(request)
 89                    decision = RoutingDecision(
 90                        target=target,
 91                        confidence=float(data.get("confidence", 0.5)),
 92                        reasoning=str(data.get("reasoning", "")),
 93                    )
 94                except (json.JSONDecodeError, ValueError):
 95                    decision = RoutingDecision(
 96                        target=self._fallback(request),
 97                        confidence=0.0,
 98                        reasoning="LLM response could not be parsed.",
 99                    )
100            else:
101                decision = RoutingDecision(
102                    target=self._fallback(request),
103                    confidence=0.0,
104                    reasoning="No JSON found in LLM response.",
105                )
106
107            self._record_decision(decision)
108            trace.set_output({"target": decision.target, "confidence": decision.confidence})
109        return decision

Select a target agent for the given request.

class SemanticRouter(gmf_forge_ai_orchestration.routing.BaseRouter):
 28class SemanticRouter(BaseRouter):
 29    """
 30    Routes by measuring cosine similarity between the embedded input and
 31    pre-computed route descriptor embeddings.
 32
 33    The caller provides an ``embed_fn`` — any async function that accepts a
 34    string and returns ``List[float]``. This keeps the router independent of
 35    any specific embedding provider.
 36
 37    Args:
 38        embed_fn: ``async (text: str) -> List[float]``.
 39        route_embeddings: Mapping of agent name → pre-computed embedding vector.
 40            If not provided upfront, call :meth:`add_route` to register routes.
 41        route_descriptions: Optional text descriptions (stored for reference).
 42        fallback_target: Agent to use if similarity is below threshold.
 43        similarity_threshold: Minimum cosine similarity to accept a route (default 0.0).
 44        logger, metrics, tracer: Observability (optional).
 45
 46    Example::
 47
 48        router = SemanticRouter(embed_fn=my_embed)
 49        await router.add_route("search_agent", "web search and information retrieval")
 50        await router.add_route("code_agent", "code generation debugging and review")
 51        decision = await router.route(request)
 52    """
 53
 54    def __init__(
 55        self,
 56        embed_fn: Any,
 57        route_embeddings: Optional[Dict[str, List[float]]] = None,
 58        route_descriptions: Optional[Dict[str, str]] = None,
 59        fallback_target: Optional[str] = None,
 60        similarity_threshold: float = 0.0,
 61        logger: Optional[BasicLogger] = None,
 62        metrics: Optional[BasicMetricsCollector] = None,
 63        tracer: Optional[TracingProvider] = None,
 64    ) -> None:
 65        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
 66        self._embed_fn = embed_fn
 67        self._route_embeddings: Dict[str, List[float]] = route_embeddings or {}
 68        self._route_descriptions: Dict[str, str] = route_descriptions or {}
 69        self.fallback_target = fallback_target
 70        self.similarity_threshold = similarity_threshold
 71
 72    async def add_route(self, agent_name: str, description: str) -> None:
 73        """Embed ``description`` and store it as the routing vector for ``agent_name``."""
 74        embedding = await self._embed_fn(description)
 75        self._route_embeddings[agent_name] = embedding
 76        self._route_descriptions[agent_name] = description
 77
 78    async def route(self, request: RoutingRequest) -> RoutingDecision:
 79        candidates = [a for a in request.available_agents if a in self._route_embeddings]
 80
 81        with self._tracer.trace("semantic_router.route", input=request.input) as trace:
 82            if not candidates:
 83                fallback = self.fallback_target or (
 84                    request.available_agents[0] if request.available_agents else ""
 85                )
 86                decision = RoutingDecision(
 87                    target=fallback,
 88                    confidence=0.0,
 89                    reasoning="No route embeddings available for candidates.",
 90                )
 91                self._record_decision(decision)
 92                trace.set_output({"target": decision.target})
 93                return decision
 94
 95            input_embedding: List[float] = await self._embed_fn(request.input)
 96
 97            best_target = ""
 98            best_score = -1.0
 99            for agent_name in candidates:
100                route_emb = self._route_embeddings[agent_name]
101                score = _cosine(input_embedding, route_emb)
102                if score > best_score:
103                    best_score = score
104                    best_target = agent_name
105
106            if best_score < self.similarity_threshold:
107                best_target = self.fallback_target or candidates[0]
108
109            desc = self._route_descriptions.get(best_target, "")
110            decision = RoutingDecision(
111                target=best_target,
112                confidence=round(best_score, 4),
113                reasoning=f"Highest cosine similarity ({best_score:.4f}) to '{desc}'",
114            )
115            self._record_decision(decision)
116            trace.set_output({"target": decision.target, "confidence": decision.confidence})
117        return decision

Routes by measuring cosine similarity between the embedded input and pre-computed route descriptor embeddings.

The caller provides an embed_fn — any async function that accepts a string and returns List[float]. This keeps the router independent of any specific embedding provider.

Args: embed_fn: async (text: str) -> List[float]. route_embeddings: Mapping of agent name → pre-computed embedding vector. If not provided upfront, call add_route() to register routes. route_descriptions: Optional text descriptions (stored for reference). fallback_target: Agent to use if similarity is below threshold. similarity_threshold: Minimum cosine similarity to accept a route (default 0.0). logger, metrics, tracer: Observability (optional).

Example::

router = SemanticRouter(embed_fn=my_embed)
await router.add_route("search_agent", "web search and information retrieval")
await router.add_route("code_agent", "code generation debugging and review")
decision = await router.route(request)
SemanticRouter( embed_fn: Any, route_embeddings: Optional[Dict[str, List[float]]] = None, route_descriptions: Optional[Dict[str, str]] = None, fallback_target: Optional[str] = None, similarity_threshold: float = 0.0, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
54    def __init__(
55        self,
56        embed_fn: Any,
57        route_embeddings: Optional[Dict[str, List[float]]] = None,
58        route_descriptions: Optional[Dict[str, str]] = None,
59        fallback_target: Optional[str] = None,
60        similarity_threshold: float = 0.0,
61        logger: Optional[BasicLogger] = None,
62        metrics: Optional[BasicMetricsCollector] = None,
63        tracer: Optional[TracingProvider] = None,
64    ) -> None:
65        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
66        self._embed_fn = embed_fn
67        self._route_embeddings: Dict[str, List[float]] = route_embeddings or {}
68        self._route_descriptions: Dict[str, str] = route_descriptions or {}
69        self.fallback_target = fallback_target
70        self.similarity_threshold = similarity_threshold
fallback_target
similarity_threshold
async def add_route(self, agent_name: str, description: str) -> None:
72    async def add_route(self, agent_name: str, description: str) -> None:
73        """Embed ``description`` and store it as the routing vector for ``agent_name``."""
74        embedding = await self._embed_fn(description)
75        self._route_embeddings[agent_name] = embedding
76        self._route_descriptions[agent_name] = description

Embed description and store it as the routing vector for agent_name.

async def route( self, request: RoutingRequest) -> RoutingDecision:
 78    async def route(self, request: RoutingRequest) -> RoutingDecision:
 79        candidates = [a for a in request.available_agents if a in self._route_embeddings]
 80
 81        with self._tracer.trace("semantic_router.route", input=request.input) as trace:
 82            if not candidates:
 83                fallback = self.fallback_target or (
 84                    request.available_agents[0] if request.available_agents else ""
 85                )
 86                decision = RoutingDecision(
 87                    target=fallback,
 88                    confidence=0.0,
 89                    reasoning="No route embeddings available for candidates.",
 90                )
 91                self._record_decision(decision)
 92                trace.set_output({"target": decision.target})
 93                return decision
 94
 95            input_embedding: List[float] = await self._embed_fn(request.input)
 96
 97            best_target = ""
 98            best_score = -1.0
 99            for agent_name in candidates:
100                route_emb = self._route_embeddings[agent_name]
101                score = _cosine(input_embedding, route_emb)
102                if score > best_score:
103                    best_score = score
104                    best_target = agent_name
105
106            if best_score < self.similarity_threshold:
107                best_target = self.fallback_target or candidates[0]
108
109            desc = self._route_descriptions.get(best_target, "")
110            decision = RoutingDecision(
111                target=best_target,
112                confidence=round(best_score, 4),
113                reasoning=f"Highest cosine similarity ({best_score:.4f}) to '{desc}'",
114            )
115            self._record_decision(decision)
116            trace.set_output({"target": decision.target, "confidence": decision.confidence})
117        return decision

Select a target agent for the given request.

class RuleBasedRouter(gmf_forge_ai_orchestration.routing.BaseRouter):
19class RuleBasedRouter(BaseRouter):
20    """
21    Routes based on an ordered list of ``(condition_fn, target_agent)`` rules.
22
23    Rules are evaluated in order; the first rule whose condition returns ``True``
24    wins. Falls back to ``fallback_target`` if no rule matches.
25
26    Args:
27        rules: Ordered list of ``(condition_fn, agent_name)`` tuples.
28            Conditions may be sync or async callables accepting a
29            :class:`RoutingRequest` and returning ``bool``.
30        fallback_target: Agent to use if no rule matches.
31        logger, metrics, tracer: Observability (optional).
32
33    Example::
34
35        router = RuleBasedRouter(
36            rules=[
37                (lambda r: "code" in r.input.lower(), "code_agent"),
38                (lambda r: "search" in r.input.lower(), "search_agent"),
39            ],
40            fallback_target="general_agent",
41        )
42    """
43
44    def __init__(
45        self,
46        rules: Optional[List[Tuple[ConditionFn, str]]] = None,
47        fallback_target: Optional[str] = None,
48        logger: Optional[BasicLogger] = None,
49        metrics: Optional[BasicMetricsCollector] = None,
50        tracer: Optional[TracingProvider] = None,
51    ) -> None:
52        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
53        self._rules: List[Tuple[ConditionFn, str]] = rules or []
54        self.fallback_target = fallback_target
55
56    def add_rule(self, condition: ConditionFn, target: str) -> None:
57        """Append a rule to the end of the list."""
58        self._rules.append((condition, target))
59
60    async def route(self, request: RoutingRequest) -> RoutingDecision:
61        import asyncio, inspect
62
63        with self._tracer.trace("rule_router.route", input=request.input) as trace:
64            for i, (condition, target) in enumerate(self._rules):
65                if target not in request.available_agents:
66                    continue
67                result = condition(request)
68                if inspect.isawaitable(result):
69                    matched: bool = await result
70                else:
71                    matched = bool(result)
72
73                if matched:
74                    decision = RoutingDecision(
75                        target=target,
76                        confidence=1.0,
77                        reasoning=f"Rule {i} matched.",
78                    )
79                    self._record_decision(decision)
80                    trace.set_output({"target": target, "rule_index": i})
81                    return decision
82
83            # No rule matched — use fallback
84            fallback = self.fallback_target or (
85                request.available_agents[0] if request.available_agents else ""
86            )
87            decision = RoutingDecision(
88                target=fallback,
89                confidence=0.0,
90                reasoning="No rule matched; using fallback target.",
91            )
92            self._record_decision(decision)
93            trace.set_output({"target": fallback, "fallback": True})
94        return decision

Routes based on an ordered list of (condition_fn, target_agent) rules.

Rules are evaluated in order; the first rule whose condition returns True wins. Falls back to fallback_target if no rule matches.

Args: rules: Ordered list of (condition_fn, agent_name) tuples. Conditions may be sync or async callables accepting a RoutingRequest and returning bool. fallback_target: Agent to use if no rule matches. logger, metrics, tracer: Observability (optional).

Example::

router = RuleBasedRouter(
    rules=[
        (lambda r: "code" in r.input.lower(), "code_agent"),
        (lambda r: "search" in r.input.lower(), "search_agent"),
    ],
    fallback_target="general_agent",
)
RuleBasedRouter( rules: Optional[List[Tuple[Union[Callable[[RoutingRequest], bool], Callable[[RoutingRequest], Awaitable[bool]]], str]]] = None, fallback_target: Optional[str] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
44    def __init__(
45        self,
46        rules: Optional[List[Tuple[ConditionFn, str]]] = None,
47        fallback_target: Optional[str] = None,
48        logger: Optional[BasicLogger] = None,
49        metrics: Optional[BasicMetricsCollector] = None,
50        tracer: Optional[TracingProvider] = None,
51    ) -> None:
52        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
53        self._rules: List[Tuple[ConditionFn, str]] = rules or []
54        self.fallback_target = fallback_target
fallback_target
def add_rule( self, condition: Union[Callable[[RoutingRequest], bool], Callable[[RoutingRequest], Awaitable[bool]]], target: str) -> None:
56    def add_rule(self, condition: ConditionFn, target: str) -> None:
57        """Append a rule to the end of the list."""
58        self._rules.append((condition, target))

Append a rule to the end of the list.

async def route( self, request: RoutingRequest) -> RoutingDecision:
60    async def route(self, request: RoutingRequest) -> RoutingDecision:
61        import asyncio, inspect
62
63        with self._tracer.trace("rule_router.route", input=request.input) as trace:
64            for i, (condition, target) in enumerate(self._rules):
65                if target not in request.available_agents:
66                    continue
67                result = condition(request)
68                if inspect.isawaitable(result):
69                    matched: bool = await result
70                else:
71                    matched = bool(result)
72
73                if matched:
74                    decision = RoutingDecision(
75                        target=target,
76                        confidence=1.0,
77                        reasoning=f"Rule {i} matched.",
78                    )
79                    self._record_decision(decision)
80                    trace.set_output({"target": target, "rule_index": i})
81                    return decision
82
83            # No rule matched — use fallback
84            fallback = self.fallback_target or (
85                request.available_agents[0] if request.available_agents else ""
86            )
87            decision = RoutingDecision(
88                target=fallback,
89                confidence=0.0,
90                reasoning="No rule matched; using fallback target.",
91            )
92            self._record_decision(decision)
93            trace.set_output({"target": fallback, "fallback": True})
94        return decision

Select a target agent for the given request.

class LoadBalancingRouter(gmf_forge_ai_orchestration.routing.BaseRouter):
15class LoadBalancingRouter(BaseRouter):
16    """
17    Routes in round-robin order across the available agent list.
18
19    Call counts are tracked in an :class:`InMemoryStateStore` so the counter
20    survives across multiple ``route()`` calls within the same process.
21
22    Args:
23        agent_weights: Optional mapping of agent name → relative weight for
24            weighted round-robin. All weights default to 1 if not specified.
25        logger, metrics, tracer: Observability (optional).
26
27    Example::
28
29        router = LoadBalancingRouter()
30        # or with weights:
31        router = LoadBalancingRouter(agent_weights={"heavy_agent": 2, "light_agent": 1})
32    """
33
34    def __init__(
35        self,
36        agent_weights: Optional[Dict[str, int]] = None,
37        logger: Optional[BasicLogger] = None,
38        metrics: Optional[BasicMetricsCollector] = None,
39        tracer: Optional[TracingProvider] = None,
40    ) -> None:
41        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
42        self._weights = agent_weights or {}
43        self._store = InMemoryStateStore()
44        self._call_total: int = 0
45
46    def _expand_agents(self, agents: List[str]) -> List[str]:
47        """Expand agents by weight into a weighted list for round-robin."""
48        expanded: List[str] = []
49        for agent in agents:
50            weight = self._weights.get(agent, 1)
51            expanded.extend([agent] * weight)
52        return expanded
53
54    async def route(self, request: RoutingRequest) -> RoutingDecision:
55        agents = request.available_agents
56        if not agents:
57            return RoutingDecision(
58                target="", confidence=0.0, reasoning="No agents available."
59            )
60
61        with self._tracer.trace("lb_router.route", input=request.input) as trace:
62            # Load counter from store (survives if store is shared)
63            counter: int = await self._store.get(_COUNTER_KEY) or 0
64            expanded = self._expand_agents(agents)
65            target = expanded[counter % len(expanded)]
66            counter += 1
67            await self._store.set(_COUNTER_KEY, counter)
68
69            if self._metrics:
70                self._metrics.increment("router.lb.calls", target=target)
71
72            decision = RoutingDecision(
73                target=target,
74                confidence=1.0,
75                reasoning=f"Round-robin selection (call #{counter}).",
76                metadata={"call_count": counter},
77            )
78            self._record_decision(decision)
79            trace.set_output({"target": target, "call_count": counter})
80        return decision

Routes in round-robin order across the available agent list.

Call counts are tracked in an InMemoryStateStore so the counter survives across multiple route() calls within the same process.

Args: agent_weights: Optional mapping of agent name → relative weight for weighted round-robin. All weights default to 1 if not specified. logger, metrics, tracer: Observability (optional).

Example::

router = LoadBalancingRouter()
# or with weights:
router = LoadBalancingRouter(agent_weights={"heavy_agent": 2, "light_agent": 1})
LoadBalancingRouter( agent_weights: Optional[Dict[str, int]] = None, logger: Optional[gmf_forge_ai_shared_core.observability.BasicLogger] = None, metrics: Optional[gmf_forge_ai_shared_core.observability.BasicMetricsCollector] = None, tracer: Optional[gmf_forge_ai_shared_core.observability.TracingProvider] = None)
34    def __init__(
35        self,
36        agent_weights: Optional[Dict[str, int]] = None,
37        logger: Optional[BasicLogger] = None,
38        metrics: Optional[BasicMetricsCollector] = None,
39        tracer: Optional[TracingProvider] = None,
40    ) -> None:
41        super().__init__(logger=logger, metrics=metrics, tracer=tracer)
42        self._weights = agent_weights or {}
43        self._store = InMemoryStateStore()
44        self._call_total: int = 0
async def route( self, request: RoutingRequest) -> RoutingDecision:
54    async def route(self, request: RoutingRequest) -> RoutingDecision:
55        agents = request.available_agents
56        if not agents:
57            return RoutingDecision(
58                target="", confidence=0.0, reasoning="No agents available."
59            )
60
61        with self._tracer.trace("lb_router.route", input=request.input) as trace:
62            # Load counter from store (survives if store is shared)
63            counter: int = await self._store.get(_COUNTER_KEY) or 0
64            expanded = self._expand_agents(agents)
65            target = expanded[counter % len(expanded)]
66            counter += 1
67            await self._store.set(_COUNTER_KEY, counter)
68
69            if self._metrics:
70                self._metrics.increment("router.lb.calls", target=target)
71
72            decision = RoutingDecision(
73                target=target,
74                confidence=1.0,
75                reasoning=f"Round-robin selection (call #{counter}).",
76                metadata={"call_count": counter},
77            )
78            self._record_decision(decision)
79            trace.set_output({"target": target, "call_count": counter})
80        return decision

Select a target agent for the given request.