.env.example

# =============================================================================
# Task Agent Template — Environment Variables
# Copy this file to .env and fill in the values marked as REQUIRED.
# =============================================================================

# =============================================================================
# Agent identity  (REQUIRED)
# =============================================================================
AGENT_ID=policyhub-agent
AGENT_VERSION=0.1.0
AGENT_DESCRIPTION=Searches policy documents using keyword, vector, and hybrid search.

# =============================================================================
# Azure OpenAI  (REQUIRED)
# =============================================================================
AZURE_OPENAI_ENDPOINT=https://openai2-policychatbot.openai.azure.com/
AZURE_OPENAI_API_KEY=
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini-2policychatbot
AZURE_OPENAI_API_VERSION=2025-01-01-preview

# =============================================================================
# MCP Server  (REQUIRED)
# URL of the MCP sidecar that provides tools for this agent's backend system.
# For local testing, start the policyhub MCP server:
#   cd mcp-servers/policyhub-mcp-server/src && python main.py
# It binds to http://127.0.0.1:8000 by default.
# =============================================================================
MCP_SERVER_URL=http://127.0.0.1:8000

# =============================================================================
# ReAct agent tuning  (optional)
# =============================================================================
AGENT_MAX_STEPS=15
AGENT_TEMPERATURE=0.0

# =============================================================================
# State + checkpoints  (optional)
# "memory" for local dev, "redis" for shared/persistent checkpoints.
# =============================================================================
STATE_STORE_BACKEND=redis
REDIS_URL=redis://localhost:6379/0

# =============================================================================
# API server  (optional — defaults shown)
# =============================================================================
API_HOST=0.0.0.0
API_PORT=8080

# =============================================================================
# OBO token exchange  (optional — leave OBO_PROVIDER blank to skip)
# OBO_PROVIDER: entra | okta | (empty = no exchange, forward user_assertion as-is)
# =============================================================================
OBO_PROVIDER=

# Microsoft Entra ID OBO (OBO_PROVIDER=entra)
ENTRA_TENANT_ID=
ENTRA_CLIENT_ID=
ENTRA_CLIENT_SECRET=
ENTRA_SCOPES=

# Okta RFC 8693 token exchange (OBO_PROVIDER=okta)
OKTA_DOMAIN=
OKTA_CLIENT_ID=
OKTA_CLIENT_SECRET=
OKTA_SCOPES=
OKTA_AUTHORIZATION_SERVER_ID=default

# =============================================================================
# SSL  (optional — set for corporate networks with SSL inspection)
# =============================================================================
SSL_CERT_PATH=

Dockerfile

# syntax=docker/dockerfile:1
FROM python:3.11-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Copy and install platform packages first (Docker layer cache)
# In production, these come from your private PyPI / Azure Artifacts feed.
# For local builds, mount them with --build-arg or use a requirements file.
COPY requirements*.txt ./
RUN pip install --no-cache-dir -r requirements.txt

# Copy application source
COPY src/ .

# Non-root user for production security
RUN adduser --disabled-password --gecos "" appuser
USER appuser

EXPOSE 8080

# Healthcheck — liveness probe for orchestrators
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
    CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')"

CMD ["python", "app.py"]

README.md

# Task Agent Template

Reusable FastAPI application template for independently deployable remote task agents in the multi-agent system.

Each remote task agent exposes `GET /.well-known/agent.json` (A2A Agent Card) and `POST /rpc` (A2A JSON-RPC 2.0), matching the contract expected by `A2AClient` in the orchestration-layer, so that any `SupervisorOrchestrator` can discover and dispatch to it without modification.

## Architecture

```
SupervisorOrchestrator
    │  POST /rpc (JSON-RPC 2.0 tasks/send) + GET /.well-known/agent.json

Task Agent (this template)
    │  OBOTokenBehavior (optional — if agent does its own OBO exchange)
    │  ReActAgent — Thought / Action / Observation loop
    │  ToolRegistry — wired to MCP server sidecar

MCP Server sidecar  →  Backend API (ServiceNow, HR, Notifications, ...)
```

## Endpoints

| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/health` | Liveness probe — always returns 200 |
| `GET` | `/.well-known/agent.json` | A2A Agent Card — capability discovery |
| `POST` | `/rpc` | A2A JSON-RPC 2.0 — run the ReActAgent on a task |

### POST /rpc

**Request (JSON-RPC 2.0 tasks/send):**
```json
{
    "jsonrpc": "2.0",
    "id": "<request-uuid>",
    "method": "tasks/send",
    "params": {
        "id": "<task-uuid>",
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": "Create a ServiceNow incident for the network outage"}],
            "metadata": {
                "user_assertion": "<user-access-token>",
                "obo_token": "<supervisor-pre-exchanged-token>",
                "locale": "en-US",
                "session_id": "<uuid>"
            }
        }
    }
}
```

**Response (A2A Task):**
```json
{
    "jsonrpc": "2.0",
    "id": "<request-uuid>",
    "result": {
        "id": "<task-uuid>",
        "status": {"state": "completed"},
        "artifacts": [{"name": "result", "parts": [{"type": "text", "text": "Incident INC0012345 created successfully."}]}],
        "metadata": {
            "steps": [
                {
                    "thought": "I need to create a ServiceNow incident...",
                    "action": "create_incident",
                    "action_input": {"short_description": "Network outage", "urgency": "1"},
                    "observation": "{\"sys_id\": \"abc123\", \"number\": \"INC0012345\"}",
                    "metadata": {}
                }
            ]
        }
    }
}
```

## Environment Variables

| Variable | Required | Description |
|----------|----------|-------------|
| `AGENT_ID` | ✅ | Stable agent name (e.g. `policyhub-agent`) |
| `AZURE_OPENAI_ENDPOINT` | ✅ | Azure OpenAI endpoint URL |
| `AZURE_OPENAI_API_KEY` | ✅ | Azure OpenAI API key |
| `AZURE_OPENAI_CHAT_DEPLOYMENT` | ✅ | Chat deployment name (e.g. `gpt-4o`) |
| `MCP_SERVER_URL` | ✅ | URL of the MCP server sidecar |
| `OBO_PROVIDER` | | `entra` or `okta` — if this agent does its own OBO exchange. Leave empty to use the supervisor's pre-exchanged `obo_token`. |
| `ENTRA_TENANT_ID` | | Required if `OBO_PROVIDER=entra` |
| `ENTRA_CLIENT_ID` | | Required if `OBO_PROVIDER=entra` |
| `ENTRA_CLIENT_SECRET` | | Required if `OBO_PROVIDER=entra` |
| `ENTRA_SCOPES` | | Comma-separated downstream scopes, e.g. `api://servicenow/.default` |
| `OKTA_DOMAIN` | | Required if `OBO_PROVIDER=okta`, e.g. `company.okta.com` |
| `OKTA_CLIENT_ID` | | Required if `OBO_PROVIDER=okta` |
| `OKTA_CLIENT_SECRET` | | Required if `OBO_PROVIDER=okta` |
| `OKTA_SCOPES` | | Comma-separated scopes, e.g. `servicenow.write` |
| `OKTA_AUTHORIZATION_SERVER_ID` | | Defaults to `default` |
| `AZURE_OPENAI_API_VERSION` | | Defaults to `2024-02-01` |
| `AGENT_MAX_STEPS` | | ReAct loop max steps. Defaults to `15` |
| `AGENT_TEMPERATURE` | | LLM sampling temperature. Defaults to `0.0` |
| `STATE_STORE_BACKEND` | | State backend for session + checkpoints: `memory` or `redis`. Defaults to `memory` |
| `REDIS_URL` | | Redis connection URL when `STATE_STORE_BACKEND=redis`. Defaults to `redis://localhost:6379/0` |
| `API_HOST` | | Server host. Defaults to `0.0.0.0` |
| `API_PORT` | | Server port. Defaults to `8080` |

## File Structure

```
task-agent-template/
├── src/
│   ├── app.py        FastAPI server with /health, /.well-known/agent.json, /rpc endpoints
│   ├── agent.py      ReActAgent factory — builds agent per request
│   ├── models.py     ExecuteRequest, ExecuteResponse, HealthResponse
│   ├── config.py     AppConfig loaded from environment variables
│   └── tools.py      MCP tool wrappers — register backend tools here
├── setup.py
├── Dockerfile
└── README.md
```

## How to Adapt This Template

1. **Copy** this directory to `apps/<your-agent-name>/`
2. **Replace** `example_tool` in `src/tools.py` with real MCP tool wrappers for your backend
3. **Set** `AGENT_ID` to a stable unique name for your agent (e.g. `policyhub-agent`)
4. **Set** `MCP_SERVER_URL` to point to your backend's MCP server sidecar
5. **Configure** `OBO_PROVIDER` if your agent needs to exchange tokens for its own backend scope
6. **Register** the agent with the supervisor's `LLMRouter` using `agent_id` + a description

## OBO Token Convention

The supervisor propagates two auth fields in `context`:

- `user_assertion` — the user's raw access token (JWT). Remote agents can do their own OBO exchange from this for their specific backend scope (recommended for multi-scope scenarios).
- `obo_token` — a pre-exchanged token from the supervisor. Agents can use this directly if the scope matches their backend.

**Recommended default:** set `OBO_PROVIDER=entra` (or `okta`) so each agent exchanges from `user_assertion` into its own backend scope. This handles token expiry and multi-scope scenarios cleanly.

## Local Development

```bash
cp .env.example .env  # fill in your values
pip install -e .
python src/app.py
```

## Docker

```bash
docker build -t my-task-agent .
docker run -p 8080:8080 --env-file .env my-task-agent
```

setup.py

"""Task Agent Application."""

from setuptools import setup, find_packages

setup(
    name="task-agent-template",
    version="0.1.0",
    packages=find_packages(where="src"),
    package_dir={"": "src"},
    python_requires=">=3.11",
    install_requires=[
        "gmf-forge-ai-shared-core>=1.0.0",
        "gmf-forge-ai-orchestration>=1.0.0",
        "fastapi>=0.111.0",
        "uvicorn[standard]>=0.29.0",
        "pydantic>=2.0.0",
        "python-dotenv>=1.0.0",
        "fastmcp>=0.1.0",
    ],
    entry_points={
        "console_scripts": [
            "task-agent=app:main",
        ],
    },
)

task-agent-template.postman_collection.json

{
  "info": {
    "name": "Task Agent Template",
    "description": "A2A-compliant task agent — tests for GET /health, GET /.well-known/agent.json, and POST /rpc (tasks/send).",
    "_postman_id": "task-agent-template-v1",
    "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
  },
  "variable": [
    {
      "key": "base_url",
      "value": "{{task_agent_url}}",
      "type": "string"
    }
  ],
  "item": [
    {
      "name": "Health",
      "item": [
        {
          "name": "GET /health",
          "request": {
            "method": "GET",
            "header": [],
            "url": {
              "raw": "{{task_agent_url}}/health",
              "host": ["{{task_agent_url}}"],
              "path": ["health"]
            },
            "description": "Liveness probe. Returns 200 with agent_id and version when the server is up."
          },
          "response": [],
          "event": [
            {
              "listen": "test",
              "script": {
                "type": "text/javascript",
                "exec": [
                  "pm.test('Status 200', () => pm.response.to.have.status(200));",
                  "pm.test('status is ok', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.status).to.eql('ok');",
                  "});",
                  "pm.test('agent_id present', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.agent_id).to.be.a('string').and.not.empty;",
                  "});"
                ]
              }
            }
          ]
        }
      ]
    },
    {
      "name": "Agent Card",
      "item": [
        {
          "name": "GET /.well-known/agent.json",
          "request": {
            "method": "GET",
            "header": [],
            "url": {
              "raw": "{{task_agent_url}}/.well-known/agent.json",
              "host": ["{{task_agent_url}}"],
              "path": [".well-known", "agent.json"]
            },
            "description": "A2A Agent Card — returns the agent's name, description, URL, version, and skills. The supervisor calls this at startup for discovery."
          },
          "response": [],
          "event": [
            {
              "listen": "test",
              "script": {
                "type": "text/javascript",
                "exec": [
                  "pm.test('Status 200', () => pm.response.to.have.status(200));",
                  "pm.test('has name', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.name).to.be.a('string').and.not.empty;",
                  "});",
                  "pm.test('has description', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.description).to.be.a('string');",
                  "});",
                  "pm.test('has url', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.url).to.be.a('string').and.not.empty;",
                  "});",
                  "pm.test('has version', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.version).to.be.a('string').and.not.empty;",
                  "});",
                  "pm.test('skills is array', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.skills).to.be.an('array');",
                  "});",
                  "// Save agent name for use in other requests",
                  "const body = pm.response.json();",
                  "pm.collectionVariables.set('agent_name', body.name);"
                ]
              }
            }
          ]
        }
      ]
    },
    {
      "name": "RPC — tasks/send",
      "item": [
        {
          "name": "tasks/send — happy path",
          "request": {
            "method": "POST",
            "header": [
              {
                "key": "Content-Type",
                "value": "application/json"
              }
            ],
            "url": {
              "raw": "{{task_agent_url}}/rpc",
              "host": ["{{task_agent_url}}"],
              "path": ["rpc"]
            },
            "body": {
              "mode": "raw",
              "raw": "{\n  \"jsonrpc\": \"2.0\",\n  \"id\": \"req-001\",\n  \"method\": \"tasks/send\",\n  \"params\": {\n    \"id\": \"task-001\",\n    \"message\": {\n      \"role\": \"user\",\n      \"parts\": [\n        {\n          \"type\": \"text\",\n          \"text\": \"Hello, what can you do?\"\n        }\n      ],\n      \"metadata\": {}\n    }\n  }\n}",
              "options": {
                "raw": {
                  "language": "json"
                }
              }
            },
            "description": "Standard A2A tasks/send call. Expect jsonrpc 2.0 result with artifacts and metadata.steps."
          },
          "response": [],
          "event": [
            {
              "listen": "test",
              "script": {
                "type": "text/javascript",
                "exec": [
                  "pm.test('Status 200', () => pm.response.to.have.status(200));",
                  "pm.test('jsonrpc 2.0', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.jsonrpc).to.eql('2.0');",
                  "});",
                  "pm.test('id echoed', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.id).to.eql('req-001');",
                  "});",
                  "pm.test('result present', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.result).to.be.an('object');",
                  "});",
                  "pm.test('result.status.state is completed or failed', () => {",
                  "    const state = pm.response.json().result.status.state;",
                  "    pm.expect(['completed', 'failed']).to.include(state);",
                  "});",
                  "pm.test('result.metadata.steps is array', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.result.metadata.steps).to.be.an('array');",
                  "});"
                ]
              }
            }
          ]
        },
        {
          "name": "tasks/send — with user_assertion",
          "request": {
            "method": "POST",
            "header": [
              {
                "key": "Content-Type",
                "value": "application/json"
              }
            ],
            "url": {
              "raw": "{{task_agent_url}}/rpc",
              "host": ["{{task_agent_url}}"],
              "path": ["rpc"]
            },
            "body": {
              "mode": "raw",
              "raw": "{\n  \"jsonrpc\": \"2.0\",\n  \"id\": \"req-002\",\n  \"method\": \"tasks/send\",\n  \"params\": {\n    \"id\": \"task-002\",\n    \"message\": {\n      \"role\": \"user\",\n      \"parts\": [\n        {\n          \"type\": \"text\",\n          \"text\": \"Create a ServiceNow incident for a network outage affecting the London office\"\n        }\n      ],\n      \"metadata\": {\n        \"user_assertion\": \"{{user_assertion_token}}\",\n        \"locale\": \"en-US\",\n        \"session_id\": \"session-001\"\n      }\n    }\n  }\n}",
              "options": {
                "raw": {
                  "language": "json"
                }
              }
            },
            "description": "Passes user_assertion in message.metadata so the agent can perform its own OBO token exchange if configured."
          },
          "response": [],
          "event": [
            {
              "listen": "test",
              "script": {
                "type": "text/javascript",
                "exec": [
                  "pm.test('Status 200', () => pm.response.to.have.status(200));",
                  "pm.test('no error field', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.error).to.be.undefined;",
                  "});"
                ]
              }
            }
          ]
        },
        {
          "name": "tasks/send — unknown method (expect -32601)",
          "request": {
            "method": "POST",
            "header": [
              {
                "key": "Content-Type",
                "value": "application/json"
              }
            ],
            "url": {
              "raw": "{{task_agent_url}}/rpc",
              "host": ["{{task_agent_url}}"],
              "path": ["rpc"]
            },
            "body": {
              "mode": "raw",
              "raw": "{\n  \"jsonrpc\": \"2.0\",\n  \"id\": \"req-003\",\n  \"method\": \"tasks/get\",\n  \"params\": {}\n}",
              "options": {
                "raw": {
                  "language": "json"
                }
              }
            },
            "description": "Sends an unknown JSON-RPC method. Expects error code -32601 (Method not found)."
          },
          "response": [],
          "event": [
            {
              "listen": "test",
              "script": {
                "type": "text/javascript",
                "exec": [
                  "pm.test('Status 200', () => pm.response.to.have.status(200));",
                  "pm.test('error code -32601', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.error).to.be.an('object');",
                  "    pm.expect(body.error.code).to.eql(-32601);",
                  "});"
                ]
              }
            }
          ]
        },
        {
          "name": "tasks/send — malformed JSON (expect -32700)",
          "request": {
            "method": "POST",
            "header": [
              {
                "key": "Content-Type",
                "value": "application/json"
              }
            ],
            "url": {
              "raw": "{{task_agent_url}}/rpc",
              "host": ["{{task_agent_url}}"],
              "path": ["rpc"]
            },
            "body": {
              "mode": "raw",
              "raw": "{ this is not valid json }",
              "options": {
                "raw": {
                  "language": "text"
                }
              }
            },
            "description": "Sends malformed JSON. Expects error code -32700 (Parse error)."
          },
          "response": [],
          "event": [
            {
              "listen": "test",
              "script": {
                "type": "text/javascript",
                "exec": [
                  "pm.test('error code -32700', () => {",
                  "    const body = pm.response.json();",
                  "    pm.expect(body.error.code).to.eql(-32700);",
                  "});"
                ]
              }
            }
          ]
        }
      ]
    }
  ]
}

src/agent.py

"""Agent factory — builds and configures the ReActAgent for this task agent.

Separates agent construction from the FastAPI app so it can be tested
independently and reused across different entry points (HTTP server, CLI).
"""

from typing import Any, Dict, List, Optional

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.registry.tool_registry import ToolRegistry
from llm_registry import llm_registry

from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.agents.base import AgentResult
from gmf_forge_ai_orchestration.behaviors.retry import RetryBehavior
from gmf_forge_ai_orchestration.state.base import BaseStateStore
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.factory import StateStoreFactory

from config import AgentConfig
from tools import register_tools


_state_store: BaseStateStore | None = None
_checkpoint_manager: CheckpointManager | None = None
_tool_registry: ToolRegistry | None = None


def _get_state_store(config: AgentConfig) -> BaseStateStore:
    global _state_store
    if _state_store is None:
        kwargs = {"url": config.redis_url} if config.state_store_backend == "redis" else {}
        _state_store = StateStoreFactory.create(config.state_store_backend, **kwargs)
    return _state_store


def _get_checkpoint_manager(config: AgentConfig) -> CheckpointManager:
    global _checkpoint_manager
    if _checkpoint_manager is None:
        _checkpoint_manager = CheckpointManager(_get_state_store(config), default_ttl=config.checkpoint_ttl)
    return _checkpoint_manager


def _get_tool_registry() -> ToolRegistry:
    global _tool_registry
    if _tool_registry is None:
        _tool_registry = ToolRegistry()
        register_tools(_tool_registry)
    return _tool_registry


def _build_obo_behavior(config: AgentConfig):
    """Build an OBOTokenBehavior if this agent is configured to do its own OBO exchange.

    There are two patterns for OBO token exchange in the platform:

    Pattern A — Supervisor exchanges (centralised):
        The supervisor holds credentials and attaches OBOTokenBehavior to itself.
        The exchanged ``obo_token`` is forwarded to task agents via
        ``A2AClient.execute(subtask, context={"obo_token": ...})``.
        Task agents read it directly from ``context["obo_token"]`` inside tools.
        Use when all task agents call the same downstream resource.

    Pattern B — Task agent exchanges (this pattern):
        Each task agent holds its own service-principal credentials and attaches
        OBOTokenBehavior to its local ReActAgent.  The supervisor forwards the
        raw ``user_assertion`` (Bearer token) via A2AClient message.metadata, and
        each task agent independently exchanges it for a resource-scoped token.
        Use when different task agents call different downstream resources.

    This function implements Pattern B.  Set the ``OBO_PROVIDER`` environment
    variable to ``entra`` or ``okta`` to enable it; leave it unset to disable.
    """
    if not config.obo_provider:
        return None

    from gmf_forge_ai_orchestration.behaviors.obo_token import (
        OBOTokenBehavior,
        EntraOBOProvider,
        OktaOBOProvider,
    )

    if config.obo_provider == "entra":
        if not all([config.entra_tenant_id, config.entra_client_id, config.entra_client_secret]):
            raise ValueError(
                "OBO_PROVIDER=entra requires ENTRA_TENANT_ID, ENTRA_CLIENT_ID, "
                "ENTRA_CLIENT_SECRET to be set."
            )
        provider = EntraOBOProvider(
            tenant_id=config.entra_tenant_id,
            client_id=config.entra_client_id,
            client_secret=config.entra_client_secret,
            scopes=config.entra_scopes,
        )
    elif config.obo_provider == "okta":
        if not all([config.okta_domain, config.okta_client_id, config.okta_client_secret]):
            raise ValueError(
                "OBO_PROVIDER=okta requires OKTA_DOMAIN, OKTA_CLIENT_ID, "
                "OKTA_CLIENT_SECRET to be set."
            )
        provider = OktaOBOProvider(
            domain=config.okta_domain,
            client_id=config.okta_client_id,
            client_secret=config.okta_client_secret,
            scopes=config.okta_scopes,
            authorization_server_id=config.okta_authorization_server_id,
        )
    else:
        raise ValueError(
            f"Unknown OBO_PROVIDER '{config.obo_provider}'. "
            "Expected 'entra' or 'okta'."
        )

    return OBOTokenBehavior(provider=provider)


_SYSTEM_PROMPT = """\
You are a policy document search assistant. You help users find information \
in corporate policy documents.

You have access to the following tools:
{tool_descriptions}

IMPORTANT: Your tools can ONLY search and retrieve policy documents. You \
cannot create tickets, send emails, make API calls to external systems, or \
perform any actions outside of document search. If the user asks you to \
perform an action you cannot do (e.g. create a ServiceNow incident, send a \
notification, call an API), immediately use Final Answer to explain what you \
can and cannot do — do NOT repeat searches.

Respond using EXACTLY this format for each step:
Thought: <your reasoning>
Action: <tool name or "Final Answer">
Action Input: <JSON object with tool arguments, or your final answer string>

When you have enough information, use:
Action: Final Answer
Action Input: {{"answer": "<your complete answer>"}}
"""


def build_llm_gateway(config: AgentConfig) -> UnifiedLLMGateway:
    """Build the LLM gateway from the provider registry."""
    return UnifiedLLMGateway(provider_registry=llm_registry)


async def build_agent(
    config: AgentConfig,
    context: Dict[str, Any],
    task_text: str = "",
    llm_gateway: Optional[UnifiedLLMGateway] = None,
):
    """Build and configure a ReActAgent for a single request.

    A new agent is built per request so that the OBO token and other
    context values are properly scoped to the current execution.

    NOTE: This function should be called within an open_mcp_session() context
    so that the tool wrappers can reuse the pooled MCP client. See app.py.

    Args:
        config: Application configuration.
        context: The execution context from the POST /rpc request.
            Must contain ``obo_token`` (or ``user_assertion`` if this agent
            does its own OBO exchange).
        llm_gateway: Optional pre-built gateway (for testing / reuse).

    Returns:
        The result of agent.execute() after the agent completes.
    """
    gateway = llm_gateway or build_llm_gateway(config)
    state_store = _get_state_store(config)
    checkpoint_manager = _get_checkpoint_manager(config)
    registry = _get_tool_registry()

    # Build behaviors
    behaviors = []
    obo_behavior = _build_obo_behavior(config)
    if obo_behavior:
        behaviors.append(obo_behavior)
    behaviors.append(RetryBehavior(max_retries=2))

    # Execute agent with pre-registered tools.
    # Tools reuse the MCP session set up by open_mcp_session() in app.py.
    result = await ReActAgent(
        llm_gateway=gateway,
        tool_registry=registry,
        behaviors=behaviors,
        state_store=state_store,
        checkpoint_manager=checkpoint_manager,
        agent_id=config.agent_id,
        max_steps=config.max_steps,
        temperature=config.temperature,
        system_prompt=_SYSTEM_PROMPT,
    ).execute(task_text, context=context)

    # Optional: inspect the OBO token after execution.
    if obo_behavior:
      print(obo_behavior.debug_token())            # full token
      print(obo_behavior.debug_token(mask=True))   # eyJhbGci...a1b2

    return result

src/app.py

"""Task Agent — FastAPI server (A2A protocol).

Endpoints:
    GET  /health                     liveness probe — no auth required
    GET  /.well-known/agent.json     A2A Agent Card — capability discovery
    POST /rpc                        A2A JSON-RPC 2.0 — tasks/send dispatcher

This app is the reusable template for any remote task agent in the multi-agent
system.  It implements the Agent2Agent (A2A) protocol so that any A2A-compliant
supervisor (including our SupervisorOrchestrator via A2AClient) can discover
and invoke this agent without modification.

Agent Card (GET /.well-known/agent.json):
    {
        "name": "policyhub-agent",
        "description": "Searches policy documents using keyword, vector, and hybrid search.",
        "url": "http://policyhub-agent:8080",
        "version": "0.1.0",
        "skills": []
    }

tasks/send request (POST /rpc):
    {
        "jsonrpc": "2.0",
        "id": "<request-uuid>",
        "method": "tasks/send",
        "params": {
            "id": "<task-uuid>",
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Find network outage policy guidance"}],
                "metadata": {
                    "user_assertion": "<user-access-token>",
                    "obo_token": "<supervisor-pre-exchanged-token>",
                    "locale": "en-US",
                    "session_id": "<uuid>"
                }
            }
        }
    }

tasks/send response:
    {
        "jsonrpc": "2.0",
        "id": "<request-uuid>",
        "result": {
            "id": "<task-uuid>",
            "status": {"state": "completed"},
            "artifacts": [{"name": "result", "parts": [{"type": "text", "text": "INC0012345 created."}]}],
            "metadata": {"steps": [...], "a2a_task_id": "..."}
        }
    }
"""

import traceback
from contextlib import asynccontextmanager
from typing import Any

import uvicorn
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse

from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_orchestration.protocols.a2a.a2a_adapter import A2AAdapter
from gmf_forge_ai_orchestration.protocols.a2a import (
    AgentCard,
    JsonRpcError,
    JsonRpcErrorResponse,
)

from agent import build_agent
from config import AgentConfig, load_config
from models import HealthResponse

logger = BasicLogger(__name__)

# ── App-level singleton state ──────────────────────────────────────────────────

_config: AgentConfig | None = None
_adapter: A2AAdapter | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global _config, _adapter
    _config = load_config()
    _adapter = A2AAdapter(
        agent_id=_config.agent_id,
        description=_config.agent_description,
        url=f"http://{_config.api_host}:{_config.api_port}",
        version=_config.agent_version,
        logger=logger,
    )
    logger.info(
        "Task agent starting",
        agent_id=_config.agent_id,
        version=_config.agent_version,
        mcp_server_url=_config.mcp_server_url,
    )
    yield
    logger.info("Task agent stopped", agent_id=_config.agent_id if _config else "unknown")


app = FastAPI(
    title="Task Agent",
    description="A2A-compliant remote task agent — POST /rpc (tasks/send) to invoke",
    version="0.1.0",
    lifespan=lifespan,
)


# ── Exception handler ──────────────────────────────────────────────────────────


@app.exception_handler(Exception)
async def _unhandled(request: Request, exc: Exception) -> JSONResponse:
    logger.error("Unhandled exception", error=str(exc), traceback=traceback.format_exc())
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content={"detail": "Internal server error"},
    )


# ── Endpoints ──────────────────────────────────────────────────────────────────


@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
    """Liveness probe — always returns 200 if the server is up."""
    cfg = _config
    return HealthResponse(
        status="ok",
        agent_id=cfg.agent_id if cfg else "unknown",
        version=cfg.agent_version if cfg else "unknown",
    )


@app.get("/.well-known/agent.json", response_model=AgentCard)
async def agent_card() -> AgentCard:
    """A2A Agent Card — describes this agent's identity and capabilities.

    The supervisor calls this at startup to discover what each agent can do
    and populate the LLMRouter without hardcoding descriptions centrally.
    Complies with GET /.well-known/agent.json in the A2A specification.
    """
    adapter = _adapter
    if adapter is None:
        return JSONResponse(  # type: ignore[return-value]
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"detail": "Agent not initialised"},
        )
    return adapter.agent_card()


@app.post("/rpc")
async def rpc(request: Request) -> JSONResponse:
    """A2A JSON-RPC 2.0 endpoint — dispatches tasks/send through the adapter.

    Accepts ``tasks/send`` and returns a completed A2A Task with the agent's
    output in ``artifacts[0].parts[0].text`` and ReAct steps in
    ``metadata["steps"]``.  All other JSON-RPC methods return -32601 (Method
    not found).  Parse errors return -32700.
    """
    cfg = _config
    adapter = _adapter
    if cfg is None or adapter is None:
        return JSONResponse(
            content=JsonRpcErrorResponse(
                error=JsonRpcError(code=-32603, message="Agent not initialised"),
            ).model_dump(),
        )

    # Parse JSON-RPC envelope
    try:
        body = await request.json()
    except Exception:
        return JSONResponse(
            content=JsonRpcErrorResponse(
                error=JsonRpcError(code=-32700, message="Parse error"),
            ).model_dump(),
        )

    async def _execute(task_text: str, context: dict[str, Any]):
        # context is the deserialized message.metadata from the A2A JSON-RPC
        # payload, forwarded verbatim from the supervisor's execution context.
        #
        # It contains whatever the upstream caller placed in their context dict
        # (see multi-agent-system/src/app.py for the canonical example), plus
        # any keys added by behaviors that ran on the supervisor side (e.g. an
        # obo_token pre-exchanged by a supervisor-level OBOTokenBehavior).
        #
        # Standard keys this agent understands:
        #   user_assertion — end-user Bearer token; consumed by OBOTokenBehavior
        #                    (when OBO_PROVIDER env var is set) to obtain a
        #                    downstream OBO token before the agent runs.
        #   obo_token      — pre-exchanged token written by OBOTokenBehavior;
        #                    tool implementations read this to call downstream APIs.
        #
        # To add custom context, extend ChatMessage.context in the API layer or
        # add keys directly to the context dict in supervisor.run() callers.
        return await build_agent(cfg, context=context, task_text=task_text)

    payload = await adapter.handle_rpc(body, execute=_execute)
    return JSONResponse(content=payload)


# ── Entry point ────────────────────────────────────────────────────────────────


def main() -> None:
    cfg = load_config()

    uvicorn.run(
        "app:app",
        host=cfg.api_host,
        port=cfg.api_port,
        reload=False,
    )


if __name__ == "__main__":
    main()

src/config.py

"""Configuration for the task agent — loaded once from environment variables.

Copy this file and the surrounding src/ directory to create a new task agent.
Replace placeholders (marked TODO) with the concrete backend's settings.
"""

import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional

from dotenv import load_dotenv

from gmf_forge_ai_shared_core.observability import BasicLogger

logger = BasicLogger(__name__)

_ENV_PATH = Path(__file__).parent.parent / ".env"


@dataclass
class AgentConfig:
    # ── Agent identity ─────────────────────────────────────────────────────────
    agent_id: str                  # Stable name used in logs and A2AClient registration
    agent_version: str = "0.1.0"

    # ── Azure OpenAI ───────────────────────────────────────────────────────────
    openai_endpoint: str = ""
    openai_api_key: str = ""
    chat_deployment: str = ""      # e.g. "gpt-4o"
    chat_api_version: str = "2024-02-01"

    # ── MCP server ────────────────────────────────────────────────────────────
    # URL of the MCP server sidecar providing tools for this agent's backend.
    # TODO: Replace with the concrete backend's MCP server URL.
    mcp_server_url: str = ""

    # ── OBO — own token exchange (optional) ───────────────────────────────────
    # Set these if THIS agent does its own OBO exchange from user_assertion
    # rather than relying on the supervisor's pre-exchanged obo_token.
    # OBO_PROVIDER: "entra" | "okta" | "" (empty = use supervisor's obo_token as-is)
    obo_provider: str = ""

    # Entra OBO provider settings (used when obo_provider="entra")
    entra_tenant_id: str = ""
    entra_client_id: str = ""
    entra_client_secret: str = ""
    # TODO: Replace with the concrete backend's downstream resource scopes.
    entra_scopes: List[str] = field(default_factory=list)

    # Okta OBO provider settings (used when obo_provider="okta")
    okta_domain: str = ""
    okta_client_id: str = ""
    okta_client_secret: str = ""
    okta_scopes: List[str] = field(default_factory=list)
    okta_authorization_server_id: str = "default"

    # ── Agent description — returned by GET /.well-known/agent.json (Agent Card) ──
    # The supervisor fetches this at startup to build the LLMRouter descriptions.
    # TODO: Replace with a meaningful description of this agent's capabilities.
    agent_description: str = "A remote task agent."

    # ── ReAct agent tuning ─────────────────────────────────────────────────────
    max_steps: int = 15
    temperature: float = 0.0

    # ── State + checkpoints ────────────────────────────────────────────────────
    # "memory" for local dev, "redis" for shared/persistent checkpoints.
    state_store_backend: str = "redis"
    redis_url: str = "redis://localhost:6379/0"
    # TTL for checkpoint data keys (seconds). None = no expiry.
    checkpoint_ttl: Optional[int] = 86400

    # ── API server ────────────────────────────────────────────────────────────
    api_host: str = "0.0.0.0"
    api_port: int = 8080

    # ── SSL ───────────────────────────────────────────────────────────────────
    ssl_cert_path: Optional[str] = None


def load_config() -> AgentConfig:
    """Load and validate AgentConfig from environment variables.

    Raises:
        ValueError: if any required environment variable is missing.
    """
    load_dotenv(_ENV_PATH)

    missing = []

    def _require(name: str) -> str:
        val = os.getenv(name, "").strip()
        if not val:
            missing.append(name)
        return val

    def _optional(name: str, default: str = "") -> str:
        return os.getenv(name, default).strip()

    def _optional_int(name: str, default: Optional[int]) -> Optional[int]:
        """Return int from env var, default if unset, None if set to '0' or '-1'."""
        raw = os.getenv(name)
        if raw is None:
            return default
        stripped = raw.strip()
        if not stripped or stripped in ("0", "-1"):
            return None
        return int(stripped)

    def _scopes(name: str) -> List[str]:
        raw = os.getenv(name, "").strip()
        return [s.strip() for s in raw.split(",") if s.strip()]

    def _optional_path(name: str) -> Optional[str]:
        raw = os.getenv(name, "").strip()
        if not raw:
            return None

        # Support common .env patterns such as ~/... and ${HOME}/...
        expanded = Path(os.path.expandvars(raw)).expanduser()
        if not expanded.exists():
            raise ValueError(
                f"Environment variable {name} points to a missing file: {expanded}"
            )
        return str(expanded)

    agent_id = _require("AGENT_ID")
    openai_endpoint = _require("AZURE_OPENAI_ENDPOINT")
    openai_api_key = _require("AZURE_OPENAI_API_KEY")
    chat_deployment = _require("AZURE_OPENAI_CHAT_DEPLOYMENT")
    mcp_server_url = _require("MCP_SERVER_URL")

    if missing:
        raise ValueError(f"Missing required environment variables: {', '.join(missing)}")

    obo_provider = _optional("OBO_PROVIDER")

    return AgentConfig(
        agent_id=agent_id,
        agent_version=_optional("AGENT_VERSION", "0.1.0"),
        openai_endpoint=openai_endpoint,
        openai_api_key=openai_api_key,
        chat_deployment=chat_deployment,
        chat_api_version=_optional("AZURE_OPENAI_API_VERSION", "2024-02-01"),
        mcp_server_url=mcp_server_url,
        obo_provider=obo_provider,
        entra_tenant_id=_optional("ENTRA_TENANT_ID"),
        entra_client_id=_optional("ENTRA_CLIENT_ID"),
        entra_client_secret=_optional("ENTRA_CLIENT_SECRET"),
        entra_scopes=_scopes("ENTRA_SCOPES"),
        okta_domain=_optional("OKTA_DOMAIN"),
        okta_client_id=_optional("OKTA_CLIENT_ID"),
        okta_client_secret=_optional("OKTA_CLIENT_SECRET"),
        okta_scopes=_scopes("OKTA_SCOPES"),
        okta_authorization_server_id=_optional("OKTA_AUTHORIZATION_SERVER_ID", "default"),
        agent_description=_optional(
            "AGENT_DESCRIPTION", "A remote task agent."
        ),
        max_steps=int(_optional("AGENT_MAX_STEPS", "15")),
        temperature=float(_optional("AGENT_TEMPERATURE", "0.0")),
        state_store_backend=_optional("STATE_STORE_BACKEND", "redis"),
        redis_url=_optional("REDIS_URL", "redis://localhost:6379/0"),
        checkpoint_ttl=_optional_int("CHECKPOINT_TTL", 86400),
        api_host=_optional("API_HOST", "0.0.0.0"),
        api_port=int(_optional("API_PORT", "8080")),
        ssl_cert_path=_optional_path("SSL_CERT_PATH"),
    )

src/llm_registry.py

"""LLM Provider registry for the task agent.

Centralises provider registration so agent.py retrieves the gateway
via the registry rather than constructing it directly.
"""

from gmf_forge_ai_shared_core.registry import LLMProviderRegistry
from gmf_forge_ai_shared_core.llm_gateway.providers.azure_openai_provider import AzureOpenAIProvider

from config import load_config

_config = load_config()

llm_registry = LLMProviderRegistry()

provider_kwargs = {
    "endpoint": _config.openai_endpoint,
    "api_key": _config.openai_api_key,
    "deployment_name": _config.chat_deployment,
    "api_version": _config.chat_api_version,
}
if _config.ssl_cert_path:
    provider_kwargs["ssl_cert_path"] = _config.ssl_cert_path

azure_openai_provider = AzureOpenAIProvider(**provider_kwargs)

llm_registry.register(
    name="openai",
    provider=azure_openai_provider,
)

src/models.py

"""App-specific models for the task agent.

A2A protocol wire types (AgentCard, JsonRpcRequest/Response, A2ATask, etc.)
live in :mod:`gmf_forge_ai_orchestration.protocols.a2a` — import them from there.
"""

from pydantic import BaseModel


class HealthResponse(BaseModel):
    """Response for GET /health."""

    status: str = "ok"
    agent_id: str
    version: str

src/tools.py

"""Tool wrappers for the task agent's backend MCP server.

This module bridges the agent's ToolRegistry with the MCP server that
provides backend-specific tools (e.g. PolicyHub document search).

Each tool call opens its own MCP connection.  If an OBO token is present
in the current async context (set automatically by OBOTokenBehavior after
exchange) the connection carries an ``Authorization: Bearer`` header so the
MCP server can authenticate on behalf of the user.
"""

import json
from typing import Any, Dict, Optional

from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.registry.tool_registry import ToolRegistry

logger = BasicLogger(__name__)


# ---------------------------------------------------------------------------
# Helper
# ---------------------------------------------------------------------------


async def _call_mcp_tool(tool_name: str, arguments: Dict[str, Any]) -> Any:
    """Call an MCP tool on a fresh per-call connection.

    If an OBO token is present in the current async context (set automatically
    by OBOTokenBehavior after exchange) the connection carries an
    ``Authorization: Bearer`` header so the MCP server can authenticate on
    behalf of the user.

    Prefers raw JSON text from result.content, falls back to structured_content.
    """
    from fastmcp import Client
    from fastmcp.client.auth import BearerAuth
    from config import load_config
    from gmf_forge_ai_orchestration.behaviors.obo_token import current_obo_token

    config = load_config()
    token = current_obo_token.get()
    if token:
        logger.debug("_call_mcp_tool: attaching OBO token to MCP call", tool=tool_name)

    auth = BearerAuth(token) if token else None
    async with Client(config.mcp_server_url, auth=auth) as client:
        result = await client.call_tool(tool_name, arguments)

    # Prefer raw JSON text from content array
    if result.content:
        for item in result.content:
            text = getattr(item, "text", None)
            if text:
                try:
                    return json.loads(text)
                except (json.JSONDecodeError, TypeError):
                    return text

    if result.structured_content:
        return result.structured_content

    return result.data


# ---------------------------------------------------------------------------
# Tool wrappers — one per MCP server tool
# ---------------------------------------------------------------------------


async def keyword_search(query: str, top_k: int = 5) -> str:
    """Keyword search over policy documents.

    Args:
        query: The search query string.
        top_k: Maximum number of results to return (default 5).

    Returns:
        JSON-serialised list of matching policy document chunks.
    """
    result = await _call_mcp_tool(
        "keyword_search",
        {"query": query, "top_k": top_k},
    )
    if isinstance(result, (dict, list)):
        return json.dumps(result)
    return str(result)


async def vector_search(query: str, top_k: int = 5) -> str:
    """Semantic (vector) search over policy documents.

    Args:
        query: The search query string.
        top_k: Maximum number of results to return (default 5).

    Returns:
        JSON-serialised list of semantically similar policy document chunks.
    """
    result = await _call_mcp_tool(
        "vector_search",
        {"query": query, "top_k": top_k},
    )
    if isinstance(result, (dict, list)):
        return json.dumps(result)
    return str(result)


async def hybrid_search(query: str, top_k: int = 5) -> str:
    """Hybrid (keyword + vector) search over policy documents.

    Args:
        query: The search query string.
        top_k: Maximum number of results to return (default 5).

    Returns:
        JSON-serialised list of policy document chunks ranked by combined score.
    """
    result = await _call_mcp_tool(
        "hybrid_search",
        {"query": query, "top_k": top_k},
    )
    if isinstance(result, (dict, list)):
        return json.dumps(result)
    return str(result)


async def filter_search(
    query: str,
    top_k: int = 5,
    language: Optional[str] = None,
    locale: Optional[str] = None,
) -> str:
    """Hybrid search with optional language and locale filters.

    Args:
        query: The search query string.
        top_k: Maximum number of results to return (default 5).
        language: Filter by language — one of 'en-us', 'spanish', 'french', 'portuguese'. Omit to search all languages.
        locale: Filter by locale — one of 'US', 'CA', 'Global'. Omit to search all locales.

    Returns:
        JSON-serialised list of matching policy document chunks.
    """
    args: Dict[str, Any] = {"query": query, "top_k": top_k}
    if language:
        args["language"] = language
    if locale:
        args["locale"] = locale
    result = await _call_mcp_tool("filter_search", args)
    if isinstance(result, (dict, list)):
        return json.dumps(result)
    return str(result)


async def get_document(document_id: str) -> list:
    """Retrieve all chunks of a policy document by its documentId.

    Args:
        document_id: The 'documentId' field (long base64 string) from any search result.
            Do NOT use the short numeric 'document_id' field.

    Returns:
        List of document chunks in page order.
    """
    return await _call_mcp_tool("get_document", {"document_id": document_id})


def register_tools(registry: ToolRegistry) -> None:
    """Register all MCP tool wrappers with the agent's ToolRegistry.
    
    Called at module load time to pre-register tools so the ReActAgent
    can discover and use them. The actual MCP calls will reuse the
    session-level client set up by open_mcp_session().
    """
    registry.register(
        name="keyword_search",
        description="Keyword search over policy documents. Use when the query contains specific terms or phrases.",
        function=keyword_search,
    )
    registry.register(
        name="vector_search",
        description="Semantic vector search over policy documents. Use for conceptual or meaning-based queries.",
        function=vector_search,
    )
    registry.register(
        name="hybrid_search",
        description="Hybrid keyword + semantic search over policy documents. Best default choice for most queries.",
        function=hybrid_search,
    )
    registry.register(
        name="filter_search",
        description=(
            "Hybrid search with optional language and locale filters. "
            "Use when the user specifies a language ('en-us', 'spanish', 'french', 'portuguese') "
            "or locale ('US', 'CA', 'Global')."
        ),
        function=filter_search,
    )
    registry.register(
        name="get_document",
        description=(
            "Retrieve all chunks of a policy document by ID. "
            "The argument `document_id` must be the value of the `documentId` field "
            "(camelCase, the long base64 string starting with 'aHR0cH') from search results. "
            "Do NOT use the numeric `document_id` field from search results."
        ),
        function=get_document,
    )