.env.example
# Multi-Agent System — Environment Variables
# Copy this file to .env and fill in your values.
# ─── Azure OpenAI (Supervisor LLM) ────────────────────────────────────────────
AZURE_OPENAI_ENDPOINT=https://<your-resource>.openai.azure.com/
AZURE_OPENAI_API_KEY=<your-api-key>
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o
# Preferred key (legacy AZURE_OPENAI_API_VERSION is also accepted)
AZURE_OPENAI_CHAT_API_VERSION=2024-02-15-preview
# OPENAI_SSL_CERT_PATH=/certs/ca-bundle.crt
# ─── Remote Task Agent URLs ────────────────────────────────────────────────────
# JSON array of base URLs. The supervisor calls GET /.well-known/agent.json on each
# at startup to discover agent_id and capability description automatically.
AGENT_URLS=["http://servicenow-agent:8080","http://hr-agent:8080","http://notif-agent:8080"]
REMOTE_AGENT_TIMEOUT=60.0
# ─── State Store ───────────────────────────────────────────────────────────────
# "memory" for local dev, "redis" for production
STATE_STORE_BACKEND=redis
REDIS_URL=redis://localhost:6379/0
Dockerfile
# syntax=docker/dockerfile:1
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Copy and install platform packages first (Docker layer cache)
# In production, these come from your private PyPI / Azure Artifacts feed.
COPY requirements*.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# Copy application source
COPY src/ .
# Non-root user for production security
RUN adduser --disabled-password --gecos "" appuser
USER appuser
EXPOSE 8080
# Healthcheck — liveness probe for orchestrators
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')"
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8080"]
README.md
# Multi-Agent System Application
Complete multi-agent system using GMF Forge AI packages.
## Installation
```bash
pip install -e .
```
## Dependencies
- gmf-forge-ai-shared-core
- gmf-forge-ai-orchestration
- gmf-forge-ai-data
- gmf-forge-ai-ux
## Running
```bash
python src/app.py
```
## Features
- Coordinator agent for task decomposition
- Specialist agents for specific domains
- Inter-agent communication
- Shared context and state management
- Supervisor-level execution checkpoints per chat request
- Response metadata includes `execution_id` and `checkpoint_id` for traceability
multi-agent-system.postman_collection.json
{
"info": {
"name": "Multi-Agent System",
"description": "Supervisor API — tests for POST /chat (multi-turn conversation) and GET /health.",
"_postman_id": "multi-agent-system-v1",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"variable": [
{
"key": "base_url",
"value": "{{supervisor_url}}",
"type": "string"
}
],
"item": [
{
"name": "Health",
"item": [
{
"name": "GET /health",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{supervisor_url}}/health",
"host": ["{{supervisor_url}}"],
"path": ["health"]
},
"description": "Liveness probe. Returns the list of discovered agent names when the supervisor has started."
},
"response": [],
"event": [
{
"listen": "test",
"script": {
"type": "text/javascript",
"exec": [
"pm.test('Status 200', () => pm.response.to.have.status(200));",
"pm.test('status ok', () => {",
" const body = pm.response.json();",
" pm.expect(body.status).to.eql('ok');",
"});",
"pm.test('agents is array', () => {",
" const body = pm.response.json();",
" pm.expect(body.agents).to.be.an('array');",
"});"
]
}
}
]
}
]
},
{
"name": "Chat",
"item": [
{
"name": "POST /chat — first turn",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{supervisor_url}}/chat",
"host": ["{{supervisor_url}}"],
"path": ["chat"]
},
"body": {
"mode": "raw",
"raw": "{\n \"prompt\": \"Search for policies related to network outages and incident response procedures for the London office\",\n \"conversation_id\": \"session-001\",\n \"locale\": \"en-US\",\n \"language\": \"en-us\"\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"description": "First turn in a conversation. Supervisor decomposes the task, routes to the appropriate task agent via A2AClient (POST /rpc), synthesizes the result, and stores conversation history."
},
"response": [],
"event": [
{
"listen": "test",
"script": {
"type": "text/javascript",
"exec": [
"pm.test('Status 200', () => pm.response.to.have.status(200));",
"pm.test('message present', () => {",
" const body = pm.response.json();",
" pm.expect(body.message).to.be.a('string').and.not.empty;",
"});",
"pm.test('conversation_id echoed', () => {",
" const body = pm.response.json();",
" pm.expect(body.conversation_id).to.eql('session-001');",
"});",
"pm.test('subtasks is array', () => {",
" const body = pm.response.json();",
" pm.expect(body.subtasks).to.be.an('array');",
"});"
]
}
}
]
},
{
"name": "POST /chat — follow-up turn (same session)",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{supervisor_url}}/chat",
"host": ["{{supervisor_url}}"],
"path": ["chat"]
},
"body": {
"mode": "raw",
"raw": "{\n \"prompt\": \"What was the incident number you just created?\",\n \"conversation_id\": \"session-001\",\n \"locale\": \"en-US\",\n \"language\": \"en-us\"\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"description": "Second turn using the same conversation_id. The supervisor retrieves history from the state store so it can answer in context."
},
"response": [],
"event": [
{
"listen": "test",
"script": {
"type": "text/javascript",
"exec": [
"pm.test('Status 200', () => pm.response.to.have.status(200));",
"pm.test('conversation_id matches', () => {",
" const body = pm.response.json();",
" pm.expect(body.conversation_id).to.eql('session-001');",
"});"
]
}
}
]
},
{
"name": "POST /chat — with user_assertion",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{supervisor_url}}/chat",
"host": ["{{supervisor_url}}"],
"path": ["chat"]
},
"body": {
"mode": "raw",
"raw": "{\n \"prompt\": \"Search for all open P1 incidents and summarize them\",\n \"conversation_id\": \"session-002\",\n \"user_assertion\": \"{{user_assertion_token}}\",\n \"locale\": \"en-US\",\n \"language\": \"en-us\"\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"description": "Passes user_assertion so the supervisor can forward it to task agents in the A2A message metadata. The supervisor itself does NOT perform an OBO exchange — that is each task agent's responsibility."
},
"response": [],
"event": [
{
"listen": "test",
"script": {
"type": "text/javascript",
"exec": [
"pm.test('Status 200', () => pm.response.to.have.status(200));",
"pm.test('message present', () => {",
" const body = pm.response.json();",
" pm.expect(body.message).to.be.a('string').and.not.empty;",
"});"
]
}
}
]
},
{
"name": "POST /chat — new session (no history)",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"url": {
"raw": "{{supervisor_url}}/chat",
"host": ["{{supervisor_url}}"],
"path": ["chat"]
},
"body": {
"mode": "raw",
"raw": "{\n \"prompt\": \"Hello, what agents are available to help me?\",\n \"conversation_id\": \"session-003\",\n \"locale\": \"en-US\",\n \"language\": \"en-us\"\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"description": "Fresh session — state store has no history for this conversation_id."
},
"response": [],
"event": [
{
"listen": "test",
"script": {
"type": "text/javascript",
"exec": [
"pm.test('Status 200', () => pm.response.to.have.status(200));",
"pm.test('subtasks populated', () => {",
" const body = pm.response.json();",
" pm.expect(body.subtasks).to.be.an('array');",
" body.subtasks.forEach(s => {",
" pm.expect(s.agent).to.be.a('string').and.not.empty;",
" pm.expect(typeof s.success).to.eql('boolean');",
" });",
"});"
]
}
}
]
}
]
}
]
}
setup.py
"""Multi-Agent System Application."""
from setuptools import setup, find_packages
setup(
name="multi-agent-system",
version="0.1.0",
packages=find_packages(where="src"),
package_dir={"": "src"},
python_requires=">=3.11",
install_requires=[
"gmf-forge-ai-shared-core>=1.0.0",
"gmf-forge-ai-orchestration>=1.0.0",
"fastapi>=0.110.0",
"uvicorn[standard]>=0.29.0",
"pydantic-settings>=2.0.0",
"httpx>=0.27.0",
],
entry_points={
"console_scripts": [
"multi-agent-system=app:main",
],
},
)
src/agent_factory.py
"""Builds the SupervisorOrchestrator with all remote agents and behaviors wired up.
Agent descriptions are NOT hardcoded here. At startup the supervisor fetches
GET /.well-known/agent.json (A2A Agent Card) on each configured agent URL to
discover agent name and description dynamically. This means each team owns their
agent's capability description in their own codebase — the supervisor has zero
knowledge of what individual agents do.
"""
import os
from typing import Any, Dict, List, Tuple
from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers.azure_openai_provider import AzureOpenAIProvider
from gmf_forge_ai_shared_core.registry import LLMProviderRegistry, PromptRegistry
from gmf_forge_ai_shared_core.observability import BasicLogger, BasicMetricsCollector
from gmf_forge_ai_orchestration.multi_agent.supervisor import SupervisorOrchestrator
from gmf_forge_ai_orchestration.multi_agent import supervisor as supervisor_module
from gmf_forge_ai_orchestration.protocols.a2a.a2a_client import A2AClient
from gmf_forge_ai_orchestration.protocols.a2a.models import AgentCard
from gmf_forge_ai_orchestration.routing.llm_router import LLMRouter
from gmf_forge_ai_orchestration.routing import llm_router as llm_router_module
from gmf_forge_ai_orchestration.behaviors.audit import AuditBehavior
from gmf_forge_ai_orchestration.behaviors.retry import RetryBehavior
from config import settings
_logger = BasicLogger("multi_agent_system.factory")
_POLICYHUB_DECOMPOSE_PROMPT = """You are a task decomposition expert for a policy-document assistant.
Task: {task}
Rules:
- This system can only search and summarize corporate policy documents.
- Generate ONLY policy-search subtasks that can be answered from policy docs.
- Never create action subtasks (no incident creation, no interviews, no emails, no API calls).
- If the user asks for actions outside policy search, convert them into policy questions (for example: required process, roles, escalation policy, compliance guidance).
- Keep decomposition minimal.
- If the request can be handled in one search, return exactly one subtask.
- Return at most 3 subtasks.
Respond with ONLY a JSON array of subtask strings:
["<subtask 1>", "<subtask 2>"]
"""
_POLICYHUB_ROUTER_PROMPT = """You are a routing assistant. Given a user request and a list of available
agents with descriptions, choose the single most appropriate agent.
Available agents:
{agent_descriptions}
User request: {input}
Routing rules:
- Prefer agents that can answer policy-document search and summarization questions.
- If the request asks for out-of-scope actions (incident creation, interviews, notifications),
still route to the best policy-search agent so it can provide policy/process guidance.
Respond with ONLY a JSON object:
{{"target": "<agent_name>", "confidence": <0.0-1.0>, "reasoning": "<brief reason>"}}
"""
# ---------------------------------------------------------------------------
# A2A Agent Card discovery
# ---------------------------------------------------------------------------
def _description_from_card(card: AgentCard) -> str:
"""Build a single description string from an A2A Agent Card.
Combines the top-level description with any skill descriptions so the
LLMRouter has the richest possible signal for routing decisions.
"""
if card.skills:
skill_descs = "; ".join(s.description or s.name for s in card.skills)
return f"{card.description} Skills: {skill_descs}"
return card.description
async def discover_agents(
agent_urls: List[str],
timeout: float,
) -> Tuple[Dict[str, A2AClient], Dict[str, str]]:
"""Fetch A2A Agent Cards and build the agent registry.
Calls ``GET /.well-known/agent.json`` on every configured agent URL. Uses
the Agent Card ``name`` as the agent identifier and combines ``description``
+ ``skills[*].description`` as the LLMRouter routing description.
Args:
agent_urls: list of base URLs (e.g. ["http://servicenow-agent:8080", ...])
timeout: HTTP timeout in seconds for both discovery and future /rpc calls
Returns:
(agents, agent_descriptions) — both keyed by agent Card ``name``
"""
agent_behaviors = [
AuditBehavior(logger=_logger),
RetryBehavior(max_retries=2, base_delay=1.0, logger=_logger),
]
agents: Dict[str, A2AClient] = {}
descriptions: Dict[str, str] = {}
for url in agent_urls:
try:
card = await A2AClient.fetch_agent_card(url, timeout=timeout)
except Exception as exc:
_logger.warning(
"Skipping unreachable agent at startup",
url=url,
error=str(exc),
)
continue
agent_id: str = card.name
description = _description_from_card(card)
_logger.info("Discovered agent via Agent Card", agent_id=agent_id, url=url)
agents[agent_id] = A2AClient(
endpoint_url=url,
timeout=timeout,
behaviors=list(agent_behaviors),
agent_id=agent_id,
logger=_logger,
)
descriptions[agent_id] = description
if not agents:
_logger.warning(
"No task agents available at startup. "
"Check AGENT_URLS and ensure task agents are running. "
"Supervisor will start but with no agents available."
)
return agents, descriptions
# ---------------------------------------------------------------------------
# Gateway factory
# ---------------------------------------------------------------------------
def _build_gateway() -> UnifiedLLMGateway:
ssl_cert_path = settings.openai_ssl_cert_path or None
if ssl_cert_path and not os.path.exists(ssl_cert_path):
raise FileNotFoundError(f"SSL certificate not found at {ssl_cert_path}")
provider = AzureOpenAIProvider(
endpoint=settings.azure_openai_endpoint,
api_key=settings.azure_openai_api_key,
deployment_name=settings.azure_openai_chat_deployment,
api_version=settings.azure_openai_chat_api_version,
ssl_cert_path=ssl_cert_path,
)
registry = LLMProviderRegistry()
registry.register(name="openai", provider=provider)
return UnifiedLLMGateway(provider_registry=registry)
def _configure_prompt_registry() -> PromptRegistry:
"""Register and apply prompt templates used by library router/supervisor."""
registry = PromptRegistry()
registry.register(
name="multi_agent.supervisor.plan",
template=supervisor_module._SUPERVISOR_PLAN_PROMPT,
description="Supervisor task planning prompt",
)
registry.register(
name="multi_agent.supervisor.synthesis",
template=supervisor_module._SUPERVISOR_SYNTHESIS_PROMPT,
description="Supervisor synthesis prompt",
)
registry.register(
name="multi_agent.supervisor.decompose",
template=_POLICYHUB_DECOMPOSE_PROMPT,
description="Supervisor decomposition prompt",
)
registry.register(
name="multi_agent.router.route",
template=_POLICYHUB_ROUTER_PROMPT,
description="LLM router selection prompt",
)
supervisor_module._SUPERVISOR_PLAN_PROMPT = registry.get(
"multi_agent.supervisor.plan"
).template
supervisor_module._SUPERVISOR_SYNTHESIS_PROMPT = registry.get(
"multi_agent.supervisor.synthesis"
).template
supervisor_module._SUPERVISOR_DECOMPOSE_PROMPT = registry.get(
"multi_agent.supervisor.decompose"
).template
llm_router_module._ROUTER_PROMPT = registry.get("multi_agent.router.route").template
return registry
# ---------------------------------------------------------------------------
# Main factory (async — called from FastAPI lifespan)
# ---------------------------------------------------------------------------
async def build_supervisor() -> SupervisorOrchestrator:
"""Discover agents via A2A Agent Cards and return a fully-wired supervisor.
Calls ``GET /.well-known/agent.json`` on each configured agent URL at startup.
Unreachable agents are logged and skipped.
"""
_configure_prompt_registry()
gateway = _build_gateway()
metrics = BasicMetricsCollector()
agents, descriptions = await discover_agents(
agent_urls=settings.agent_urls,
timeout=settings.remote_agent_timeout,
)
# Use the first discovered agent as the fallback for LLMRouter
fallback = next(iter(agents)) if agents else None
router = LLMRouter(
llm_gateway=gateway,
agent_descriptions=descriptions,
fallback_target=fallback,
logger=_logger,
metrics=metrics,
)
return SupervisorOrchestrator(
supervisor_gateway=gateway,
agents=agents,
agent_descriptions=descriptions,
router=router,
logger=_logger,
metrics=metrics,
)
src/app.py
"""Multi-Agent System — FastAPI application entry point."""
from contextlib import asynccontextmanager
from typing import Any, Dict
from uuid import uuid4
from fastapi import FastAPI, Request
from gmf_forge_ai_orchestration.state.factory import StateStoreFactory
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.multi_agent.supervisor import SupervisorOrchestrator
from gmf_forge_ai_orchestration.behaviors import AgentDiscoveryBehavior
from gmf_forge_ai_shared_core.observability import BasicLogger
from config import settings
from agent_factory import build_supervisor, discover_agents
from models import ChatMessage, ChatResponse, SubtaskResult
_logger = BasicLogger("multi_agent_system.app")
# ---------------------------------------------------------------------------
# FastAPI lifespan — supervisor is built async at startup so it can call
# GET /describe on each agent URL to discover agent_id and description.
# ---------------------------------------------------------------------------
_supervisor: SupervisorOrchestrator | None = None
_agent_discovery: AgentDiscoveryBehavior | None = None
@asynccontextmanager
async def lifespan(_: FastAPI):
global _supervisor, _agent_discovery
_logger.info("Supervisor starting — discovering agents via GET /.well-known/agent.json")
_supervisor = await build_supervisor()
_logger.info(
"Supervisor ready",
agents=list(_supervisor.agents.keys()),
)
_agent_discovery = AgentDiscoveryBehavior(
discovery_fn=lambda: discover_agents(
agent_urls=settings.agent_urls,
timeout=settings.remote_agent_timeout,
),
interval_seconds=30.0,
logger=_logger,
)
await _agent_discovery.start(_supervisor)
_logger.info("Periodic agent discovery started (interval: 30s)")
try:
yield
finally:
# Stop periodic discovery on shutdown
if _agent_discovery is not None:
await _agent_discovery.stop()
_logger.info("Periodic agent discovery stopped")
_supervisor = None
_agent_discovery = None
_logger.info("Supervisor stopped")
app = FastAPI(title="Multi-Agent System", version="0.1.0", lifespan=lifespan)
_store_kwargs = {"url": settings.redis_url} if settings.state_store_backend == "redis" else {}
state_store = StateStoreFactory.create(settings.state_store_backend, **_store_kwargs)
checkpoint_manager = CheckpointManager(state_store, default_ttl=settings.checkpoint_ttl)
def _extract_subtask_text(agent_result, fallback: str) -> str:
"""Best-effort extraction of the executed subtask text for API responses."""
metadata_task = agent_result.metadata.get("task") if agent_result.metadata else None
if isinstance(metadata_task, str) and metadata_task.strip():
return metadata_task.strip()
for step in agent_result.steps or []:
action_input = step.action_input or {}
if not isinstance(action_input, dict):
continue
direct_task = action_input.get("task")
if isinstance(direct_task, str) and direct_task.strip():
return direct_task.strip()
query = action_input.get("query")
if isinstance(query, str) and query.strip():
return query.strip()
raw = action_input.get("raw")
if isinstance(raw, str) and raw.strip():
return raw.strip()
return fallback
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: Request, message: ChatMessage):
if _supervisor is None:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Supervisor not initialised",
)
if not _supervisor.agents:
from fastapi import HTTPException, status
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No task agents available yet. Waiting for periodic discovery to find active agents.",
)
session_id = message.conversation_id
execution_id = str(uuid4())
# Retrieve conversation history
history: list = await state_store.get(session_id) or []
# ── Build execution context ────────────────────────────────────────────
# This dict travels with every subtask through the entire call chain:
#
# POST /chat
# → supervisor.run(task, context)
# → agent.execute(subtask, context) # ReActAgent or A2AClient
# → A2AClient._call_remote(task, context) # remote agent path
# → JSON-RPC message.metadata # serialised over the wire
# → task-agent POST /rpc # deserialised by A2AAdapter
# → build_agent(cfg, context) # available to behaviors
# → OBOTokenBehavior.before_execute # reads user_assertion
#
# Keys already understood by the platform:
# user_assertion — end-user Bearer token; OBOTokenBehavior exchanges it
# for a downstream OBO token before the agent executes
# obo_token — pre-exchanged OBO token (set by OBOTokenBehavior)
#
# Keys developers can add (via ChatMessage.context in the request body):
# locale — user locale for localised responses (e.g. "en-US")
# correlation_id — trace/request ID for cross-service debugging
# tenant_id — multi-tenant routing hint for downstream services
# <anything> — any extra key is forwarded without modification
run_context: Dict[str, Any] = {**(message.context or {})}
# Extract the user's Bearer token from the Authorization header.
# Tokens must not be sent in the request body — the header is the
# correct transport so the token is never inadvertently logged.
auth_header = request.headers.get("Authorization", "")
bearer_token = auth_header.removeprefix("Bearer ").strip() or None
if bearer_token:
run_context["user_assertion"] = bearer_token
result = await _supervisor.run(
message.prompt,
context=run_context or None,
)
if not result.success:
answer = (
"I was unable to complete your request. "
"Please try rephrasing or breaking it into smaller steps."
)
else:
answer = result.final_output
# Persist updated history
history.append({"role": "user", "content": message.prompt})
history.append({"role": "assistant", "content": answer})
await state_store.set(session_id, history)
agent_execution_ids = {
name: (res.metadata.get("execution_id") if res.metadata else None)
for name, res in result.agent_outputs.items()
}
checkpoint_id = await checkpoint_manager.save(
agent_id="supervisor",
execution_id=execution_id,
state={
"execution_id": execution_id,
"session_id": session_id,
"prompt": message.prompt,
"answer": answer,
"success": result.success,
"agent_execution_ids": agent_execution_ids,
},
metadata={"conversation_id": session_id},
)
# Build per-subtask detail for callers that want to inspect agent outputs
if result.subtask_outputs:
subtasks = [
SubtaskResult(
agent=agent_name,
task=subtask_text,
output=agent_result.output,
success=agent_result.success,
steps=[
{
"thought": s.thought,
"action": s.action,
"action_input": s.action_input,
"observation": s.observation,
}
for s in agent_result.steps
],
error=agent_result.error,
)
for agent_name, subtask_text, agent_result in result.subtask_outputs
]
else:
subtasks = [
SubtaskResult(
agent=agent_name,
task=_extract_subtask_text(agent_result, message.prompt),
output=agent_result.output,
success=agent_result.success,
steps=[
{
"thought": s.thought,
"action": s.action,
"action_input": s.action_input,
"observation": s.observation,
}
for s in agent_result.steps
],
error=agent_result.error,
)
for agent_name, agent_result in result.agent_outputs.items()
]
return ChatResponse(
message=answer,
conversation_id=session_id,
subtasks=subtasks,
execution_id=execution_id,
checkpoint_id=checkpoint_id,
)
@app.get("/health")
async def health():
agents = list(_supervisor.agents.keys()) if _supervisor else []
return {"status": "ok", "agents": agents}
def main():
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=8080, reload=True)
if __name__ == "__main__":
main()
src/config.py
from pydantic import AliasChoices, Field
from pydantic_settings import BaseSettings
from typing import List, Optional
from pathlib import Path
_ENV_PATH = Path(__file__).parent.parent / ".env"
class Settings(BaseSettings):
# Azure OpenAI — supervisor LLM
azure_openai_endpoint: str = ""
azure_openai_api_key: str = ""
azure_openai_chat_deployment: str = "gpt-4o"
azure_openai_chat_api_version: str = Field(
default="2024-02-15-preview",
validation_alias=AliasChoices(
"AZURE_OPENAI_CHAT_API_VERSION",
"AZURE_OPENAI_API_VERSION",
),
)
openai_ssl_cert_path: Optional[str] = None
# Remote task agent base URLs — JSON array of strings.
# The supervisor calls GET /.well-known/agent.json on each URL at startup to discover
# agent_id and description. No hardcoded agent names here.
# Example: AGENT_URLS=["http://servicenow-agent:8080","http://hr-agent:8080"]
agent_urls: List[str] = []
# Remote agent HTTP timeout (seconds) — used for both /describe and /execute
remote_agent_timeout: float = 60.0
# State store backend: "memory" (local dev) or "redis" (production)
state_store_backend: str = "redis"
# Redis
redis_url: str = "redis://localhost:6379/0"
# TTL for checkpoint data keys (seconds). None = no expiry (default for supervisor).
# Set via CHECKPOINT_TTL env var; use 0 or -1 to disable.
checkpoint_ttl: Optional[int] = None
class Config:
env_file = str(_ENV_PATH)
env_file_encoding = "utf-8"
settings = Settings()
src/models.py
from pydantic import BaseModel
from typing import Any, Dict, List, Optional
class ChatMessage(BaseModel):
prompt: str
conversation_id: str
locale: str = "Global"
language: str = "en-us"
context: Optional[Dict[str, Any]] = None
"""
Optional free-form context forwarded to every task agent.
Any key placed here is merged into the execution context and travels
all the way to the remote task agents via A2AClient message.metadata.
The user's access token (user_assertion) is NOT accepted here — it must
be supplied in the ``Authorization: Bearer <token>`` request header so
it is never logged in the request body.
Example extra keys developers may include::
{
"locale": "en-US",
"tenant_id": "us-west",
"correlation_id": "abc-123"
}
"""
class SubtaskResult(BaseModel):
agent: str
task: str
output: Any
success: bool
steps: List[Dict[str, Any]] = []
error: Optional[str] = None
class ChatResponse(BaseModel):
message: str
conversation_id: str
subtasks: List[SubtaskResult] = []
execution_id: Optional[str] = None
checkpoint_id: Optional[str] = None