a2a_adapter_example.py

"""
A2AAdapter Example
==================

Demonstrates A2AAdapter — the **server-side** (receiver) half of the
Agent2Agent (A2A) protocol.  Use A2AAdapter in any agent service you deploy so
that other services (A2AClient, orchestrators) can call it over HTTP using the
standard A2A JSON-RPC 2.0 wire format.

Companion: a2a_client_example.py shows the **caller** side.

What A2AAdapter does
--------------------
- Constructs and serves the A2A Agent Card (``GET /.well-known/agent.json``)
- Parses incoming ``POST /rpc`` JSON-RPC 2.0 ``tasks/send`` requests
- Extracts the task text and context metadata from the request body
- Calls YOUR execution callback with ``(task_text, context)``
- Formats the ``AgentResult`` you return into a valid A2A JSON-RPC response
- Formats protocol-level errors (unknown method, bad payload, etc.)

What A2AAdapter does NOT do
---------------------------
- It does not know about tools, checkpoints, routing, or any other
  orchestration concern.  Those belong to the agent behind it.
- It does not start an HTTP server — you bind it to whatever HTTP framework
  you use (aiohttp, FastAPI, stdlib http.server, Django, etc.).

Wire protocol summary
---------------------

    Discovery::

        GET /.well-known/agent.json
        → {"name": "...", "description": "...", "url": "...",
           "version": "...", "skills": [...]}

    Invocation::

        POST /rpc
        Content-Type: application/json

        {"jsonrpc": "2.0", "id": "<uuid>", "method": "tasks/send",
         "params": {"id": "<task-uuid>",
                    "message": {"role": "user",
                                "parts": [{"type": "text", "text": "<task>"}],
                                "metadata": {"obo_token": "...", "locale": "en"}}}}

        Response:
        {"jsonrpc": "2.0", "id": "<uuid>",
         "result": {"id": "<task-uuid>",
                    "status": {"state": "completed"},
                    "artifacts": [{"parts": [{"type": "text", "text": "<output>"}]}],
                    "metadata": {"steps": [...]}}}

Examples in this file
---------------------
1. A2AAdapter in isolation — no HTTP server needed (test the protocol logic directly)
2. Building and serialising the Agent Card
3. Parsing ``tasks/send`` requests and building responses manually
4. Error handling — malformed requests, unknown methods, execution failures
5. Minimal stdlib HTTP server (no extra dependencies) using A2AAdapter
6. Integration test helper — simulate A2AClient ↔ A2AAdapter round-trip in memory

Prerequisites::

    pip install -e packages/orchestration-layer
    pip install -e packages/shared-core

Usage::

    cd packages/orchestration-layer
    python examples/a2a_adapter_example.py
"""

import asyncio
import json
import os
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Dict
from uuid import uuid4

from dotenv import load_dotenv

from gmf_forge_ai_shared_core.observability import BasicLogger, BasicMetricsCollector

from gmf_forge_ai_orchestration.agents.base import AgentResult, AgentStep
from gmf_forge_ai_orchestration.protocols.a2a.a2a_adapter import A2AAdapter, A2AAdapterError

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
logger = BasicLogger("a2a_adapter_example")
metrics = BasicMetricsCollector()

# Service identity — what the adapter advertises to callers
AGENT_SERVICE_URL = "http://localhost:8080"
AGENT_VERSION = "1.0.0"


# ---------------------------------------------------------------------------
# Shared execution callback
# ---------------------------------------------------------------------------

async def my_agent_execute(task: str, context: Dict[str, Any]) -> AgentResult:
    """
    The business-logic callback that A2AAdapter calls after parsing a request.

    In production this would call a ReActAgent, PlanExecuteAgent, or any other
    agent.  Here we simulate a simple echo + context-aware response.

    Args:
        task: The plain-text task extracted from the A2A request body.
        context: The ``message.metadata`` dict forwarded by the caller.
                 May contain obo_token, user_assertion, locale, session_id, etc.
    """
    logger.info(
        "Agent executing task",
        task_preview=task[:80],
        context_keys=list(context.keys()),
    )

    # Simulate work — in production: agent.execute(task, context)
    output = (
        f"Task received: '{task}'. "
        f"Processing with context keys: {list(context.keys())}. "
        "Result: analysis complete."
    )

    # Build a realistic AgentResult with steps
    steps = [
        AgentStep(
            thought="Analysing the incoming task and available context.",
            action="analyse_task",
            action_input={"task": task, "context_keys": list(context.keys())},
            observation="Task analysis complete. No tool calls required for this request.",
        ),
        AgentStep(
            thought="Composing the final answer.",
            action="Final Answer",
            action_input={"answer": output},
            observation=output,
        ),
    ]

    return AgentResult(
        output=output,
        steps=steps,
        success=True,
        metadata={
            "agent_id": "my-example-agent",
            "task_length": len(task),
        },
    )


# ---------------------------------------------------------------------------
# Example 1: A2AAdapter in isolation
#             — call handle_rpc directly, no HTTP server needed
# ---------------------------------------------------------------------------

async def example_adapter_in_isolation() -> None:
    """
    Test the A2AAdapter protocol logic without any HTTP server.

    This is the most useful pattern for unit tests — construct the JSON-RPC
    body yourself, call handle_rpc, and assert on the JSON-RPC response.
    No port binding, no network calls.
    """
    logger.info("=" * 60)
    logger.info("Example 1: A2AAdapter in isolation (no HTTP server)")
    logger.info("=" * 60)

    adapter = A2AAdapter(
        agent_id="example-agent",
        description="Example A2A agent for demonstration purposes.",
        url=AGENT_SERVICE_URL,
        version=AGENT_VERSION,
        skills=[
            {"id": "analysis", "name": "Document Analysis",
             "description": "Analyse documents and answer questions about them."},
            {"id": "summary", "name": "Summarisation",
             "description": "Produce concise summaries of long content."},
        ],
        logger=logger,
    )

    # Construct a valid tasks/send JSON-RPC request body
    request_body = {
        "jsonrpc": "2.0",
        "id": str(uuid4()),
        "method": "tasks/send",
        "params": {
            "id": str(uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Analyse the Q1 financial report."}],
                "metadata": {
                    "locale": "en-US",
                    "session_id": "sess-001",
                    "obo_token": "eyJhbGci...(truncated)",
                },
            },
        },
    }

    # handle_rpc parses the request, calls our execution callback, and
    # returns a complete JSON-RPC response dict ready to serialise.
    response = await adapter.handle_rpc(request_body, my_agent_execute)

    # The response is a plain dict — serialise it as JSON for HTTP
    response_json = json.dumps(response, indent=2)
    logger.info("Protocol response received", method="tasks/send")
    print("\n--- JSON-RPC response (Example 1) ---")
    print(response_json[:800])
    print("--- End response ---\n")

    # Validate the response structure
    assert response.get("jsonrpc") == "2.0", "jsonrpc version mismatch"
    assert "result" in response, "Expected 'result' key in successful response"
    task_result = response["result"]
    assert task_result["status"]["state"] in ("completed", "failed"), "Unexpected task state"
    logger.info("Response validation passed", state=task_result["status"]["state"])


# ---------------------------------------------------------------------------
# Example 2: Agent Card
#             — advertise capabilities to callers via discovery
# ---------------------------------------------------------------------------

async def example_agent_card() -> None:
    """
    Agent Card construction and serialisation.

    Callers (A2AClient, orchestrators, service registries) send
    GET /.well-known/agent.json to discover the agent's identity and skills.

    The AgentCard is a Pydantic model — call .model_dump() to get a plain dict
    ready for JSON serialisation or HTTP response bodies.
    """
    logger.info("=" * 60)
    logger.info("Example 2: Agent Card")
    logger.info("=" * 60)

    adapter = A2AAdapter(
        agent_id="policy-review-agent",
        description="Reviews corporate policies for compliance with EU AI Act and GDPR.",
        url="https://policy-review.internal.company.com",
        version="2.1.0",
        skills=[
            {
                "id": "eu_ai_act",
                "name": "EU AI Act compliance check",
                "description": "Assess policies against EU AI Act requirements.",
            },
            {
                "id": "gdpr_review",
                "name": "GDPR review",
                "description": "Identify GDPR obligations and data subject rights.",
            },
            {
                "id": "risk_classification",
                "name": "AI risk classification",
                "description": "Classify AI system risk level (minimal, limited, high, unacceptable).",
            },
        ],
        logger=logger,
    )

    # agent_card() returns a fully populated AgentCard Pydantic model.
    card = adapter.agent_card()
    card_dict = card.model_dump()

    logger.info("Agent Card built", agent_id=card.name, skills=len(card.skills))
    print("\n--- Agent Card (GET /.well-known/agent.json) ---")
    print(json.dumps(card_dict, indent=2))
    print("--- End agent card ---\n")

    # In an HTTP handler:
    #   response.body = json.dumps(card_dict)
    #   response.headers["Content-Type"] = "application/json"
    assert card.name == "policy-review-agent"
    assert len(card.skills) == 3
    logger.info("Agent card assertions passed")


# ---------------------------------------------------------------------------
# Example 3: Manual parse + build response
#             — fine-grained control over each step
# ---------------------------------------------------------------------------

async def example_manual_parse_and_build() -> None:
    """
    Manual request parsing and response construction.

    Use parse_tasks_send() + build_task_response() directly when you need
    to inspect the parsed values (e.g. log the task_id, check the context,
    apply request-level auth) before calling your agent.

    Equivalent to calling handle_rpc() in two steps.
    """
    logger.info("=" * 60)
    logger.info("Example 3: Manual parse and build")
    logger.info("=" * 60)

    adapter = A2AAdapter(
        agent_id="manual-example-agent",
        description="Demo adapter for manual parse/build example.",
        url=AGENT_SERVICE_URL,
        version=AGENT_VERSION,
        logger=logger,
    )

    request_body = {
        "jsonrpc": "2.0",
        "id": "req-123",
        "method": "tasks/send",
        "params": {
            "id": "task-456",
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Summarise the AML policy document."}],
                "metadata": {
                    "user_assertion": "bearer-token-xyz",
                    "locale": "en-GB",
                },
            },
        },
    }

    # Step 1 — Parse the JSON-RPC body.
    #   Returns: (rpc_id, task_id, task_text, context)
    rpc_id, task_id, task_text, context = adapter.parse_tasks_send(request_body)
    logger.info(
        "Parsed tasks/send",
        rpc_id=rpc_id,
        task_id=task_id,
        task_preview=task_text[:60],
        context_keys=list(context.keys()),
    )

    # Step 2 — Run the agent (or apply any pre-execution logic here).
    #   e.g. validate the user_assertion from context before executing:
    user_assertion = context.get("user_assertion", "")
    if not user_assertion:
        logger.warning("No user_assertion in context — proceeding anyway for demo")

    result = await my_agent_execute(task_text, context)

    # Step 3 — Build the JSON-RPC response dict.
    response = adapter.build_task_response(rpc_id, task_id, result)
    logger.info(
        "Response built",
        state=response["result"]["status"]["state"],
        artifact_count=len(response["result"].get("artifacts", [])),
    )

    assert response["result"]["id"] == task_id, "Task ID mismatch in response"
    logger.info("Manual parse/build assertions passed")


# ---------------------------------------------------------------------------
# Example 4: Error handling
#             — malformed requests, unknown methods, execution failures
# ---------------------------------------------------------------------------

async def example_error_handling() -> None:
    """
    A2AAdapter error handling.

    A2AAdapter converts protocol-level errors into well-formed JSON-RPC error
    responses rather than raising exceptions to the HTTP handler.

    Error codes follow JSON-RPC 2.0 conventions:
    - -32601 : Method not found
    - -32602 : Invalid params (bad payload shape)
    - -32603 : Internal error (agent execution failure)
    """
    logger.info("=" * 60)
    logger.info("Example 4: Error handling")
    logger.info("=" * 60)

    adapter = A2AAdapter(
        agent_id="error-demo-agent",
        description="Agent for error handling examples.",
        url=AGENT_SERVICE_URL,
        version=AGENT_VERSION,
        logger=logger,
    )

    # --- 4a: Unsupported method ---
    unsupported_method = {
        "jsonrpc": "2.0",
        "id": "req-unsupported",
        "method": "tasks/cancel",   # A2AAdapter only handles tasks/send
        "params": {},
    }
    response = await adapter.handle_rpc(unsupported_method, my_agent_execute)
    logger.info("4a — Unsupported method", error=response.get("error"))
    assert "error" in response, "Expected error response for unsupported method"
    assert response["error"]["code"] == -32601

    # --- 4b: Malformed request (params is not a dict/Mapping) ---
    # Missing params defaults to {}, which is valid. To trigger the params
    # validation branch, pass params as a non-Mapping value (e.g. a string).
    malformed = {
        "jsonrpc": "2.0",
        "id": "req-malformed",
        "method": "tasks/send",
        "params": "this-is-not-a-dict",  # must be a Mapping
    }
    response = await adapter.handle_rpc(malformed, my_agent_execute)
    logger.info("4b — Malformed request", error=response.get("error"))
    assert "error" in response, "Expected error response for malformed request"

    # --- 4c: Agent execution failure ---
    async def failing_execute(task: str, context: Dict[str, Any]) -> AgentResult:
        raise RuntimeError("Agent internal error: database connection lost")

    valid_request = {
        "jsonrpc": "2.0",
        "id": "req-exec-fail",
        "method": "tasks/send",
        "params": {
            "id": str(uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Any task"}],
                "metadata": {},
            },
        },
    }
    response = await adapter.handle_rpc(valid_request, failing_execute)
    logger.info("4c — Execution failure", error=response.get("error"))
    assert "error" in response, "Expected error response when agent raises"
    assert response["error"]["code"] == -32603

    # --- 4d: Manually build a JSON-RPC error response ---
    error_response = adapter.build_jsonrpc_error(
        rpc_id="req-custom-error",
        code=-32700,
        message="Parse error: request body was not valid JSON",
        data={"received_content_type": "text/plain"},
    )
    logger.info("4d — Custom error response built", error=error_response.get("error"))

    logger.info("All error handling assertions passed")


# ---------------------------------------------------------------------------
# Example 5: Minimal stdlib HTTP server
#             — wire A2AAdapter to Python's built-in http.server
# ---------------------------------------------------------------------------

def run_minimal_http_server(adapter: A2AAdapter, host: str = "localhost", port: int = 8080) -> HTTPServer:
    """
    Create and return a minimal HTTP server that handles the two A2A endpoints:

    - ``GET /.well-known/agent.json`` → serves the Agent Card
    - ``POST /rpc``                   → processes tasks/send JSON-RPC requests

    Returns the HTTPServer instance so the caller can start/stop it.

    In production use a production-grade ASGI framework (FastAPI, aiohttp,
    Starlette) instead of http.server.  This example exists purely to show the
    A2AAdapter integration without any extra dependencies.
    """
    loop = asyncio.new_event_loop()

    class A2ARequestHandler(BaseHTTPRequestHandler):
        def log_message(self, format: str, *args: Any) -> None:  # noqa: A002
            logger.info(f"HTTP {format % args}")

        def do_GET(self) -> None:  # noqa: N802
            if self.path == "/.well-known/agent.json":
                card = adapter.agent_card().model_dump()
                body = json.dumps(card).encode()
                self.send_response(200)
                self.send_header("Content-Type", "application/json")
                self.send_header("Content-Length", str(len(body)))
                self.end_headers()
                self.wfile.write(body)
            else:
                self.send_response(404)
                self.end_headers()

        def do_POST(self) -> None:  # noqa: N802
            if self.path != "/rpc":
                self.send_response(404)
                self.end_headers()
                return

            content_length = int(self.headers.get("Content-Length", 0))
            raw_body = self.rfile.read(content_length)

            try:
                request_body = json.loads(raw_body)
            except json.JSONDecodeError:
                self.send_response(400)
                self.end_headers()
                return

            # Run the async handle_rpc in the event loop created for this server.
            response = loop.run_until_complete(
                adapter.handle_rpc(request_body, my_agent_execute)
            )

            body = json.dumps(response).encode()
            self.send_response(200)
            self.send_header("Content-Type", "application/json")
            self.send_header("Content-Length", str(len(body)))
            self.end_headers()
            self.wfile.write(body)

    return HTTPServer((host, port), A2ARequestHandler)


async def example_minimal_http_server() -> None:
    """
    Minimal stdlib HTTP server.

    Starts a temporary server on localhost:18080, fires a few A2A requests
    against it using httpx (same library used by A2AClient), then shuts down.

    The server is started in a background thread so the asyncio event loop
    can continue running the client calls on the main thread.
    """
    logger.info("=" * 60)
    logger.info("Example 5: Minimal stdlib HTTP server")
    logger.info("=" * 60)

    import httpx

    adapter = A2AAdapter(
        agent_id="http-demo-agent",
        description="Agent served via minimal stdlib HTTP server.",
        url="http://localhost:18080",
        version="1.0.0",
        skills=[{"id": "demo", "name": "Demo", "description": "Demo skill."}],
        logger=logger,
    )

    server = run_minimal_http_server(adapter, host="localhost", port=18080)
    server_thread = threading.Thread(target=server.serve_forever, daemon=True)
    server_thread.start()
    logger.info("HTTP server started on http://localhost:18080")

    # Small delay for server readiness
    await asyncio.sleep(0.1)

    async with httpx.AsyncClient() as client:
        # 5a — Agent card discovery
        resp = await client.get("http://localhost:18080/.well-known/agent.json")
        card = resp.json()
        logger.info("Agent card received", name=card.get("name"), url=card.get("url"))
        assert card["name"] == "http-demo-agent"

        # 5b — tasks/send
        payload = {
            "jsonrpc": "2.0",
            "id": str(uuid4()),
            "method": "tasks/send",
            "params": {
                "id": str(uuid4()),
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": "What is the weather today?"}],
                    "metadata": {"locale": "en"},
                },
            },
        }
        resp = await client.post(
            "http://localhost:18080/rpc",
            json=payload,
            headers={"Content-Type": "application/json"},
        )
        result = resp.json()
        logger.info(
            "tasks/send response",
            state=result["result"]["status"]["state"],
            output_preview=result["result"]["artifacts"][0]["parts"][0]["text"][:80],
        )
        assert result["result"]["status"]["state"] == "completed"

    server.shutdown()
    logger.info("HTTP server shut down cleanly")


# ---------------------------------------------------------------------------
# Example 6: In-memory A2AClient ↔ A2AAdapter round-trip
#             — integration test helper
# ---------------------------------------------------------------------------

async def example_in_memory_round_trip() -> None:
    """
    Full A2AClient ↔ A2AAdapter round-trip without a real HTTP server.

    This pattern is useful for integration tests: instead of starting a server,
    intercept the HTTP layer with ``respx`` (already in dev requirements) so you
    can verify that the full JSON-RPC serialisation/deserialisation cycle works
    end-to-end.

    This example shows the pattern using raw A2AAdapter calls to simulate what
    A2AClient would send and receive, proving protocol correctness without network I/O.
    """
    logger.info("=" * 60)
    logger.info("Example 6: In-memory round-trip simulation")
    logger.info("=" * 60)

    adapter = A2AAdapter(
        agent_id="roundtrip-agent",
        description="Agent for round-trip simulation.",
        url=AGENT_SERVICE_URL,
        version="1.0.0",
        logger=logger,
    )

    # Simulate what A2AClient._call_remote would POST as its JSON-RPC payload.
    # (A2AClient constructs this using JsonRpcRequest Pydantic model internally.)
    rpc_request_id = str(uuid4())
    a2a_task_id = str(uuid4())

    simulated_client_payload = {
        "jsonrpc": "2.0",
        "id": rpc_request_id,
        "method": "tasks/send",
        "params": {
            "id": a2a_task_id,
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": "Generate a risk summary for project Alpha."}],
                "metadata": {
                    "obo_token": "Bearer obo-token-placeholder",
                    "user_assertion": "original-access-token",
                    "locale": "en-US",
                },
            },
        },
    }

    # Pass through A2AAdapter — returns the JSON-RPC response dict
    response = await adapter.handle_rpc(simulated_client_payload, my_agent_execute)

    logger.info(
        "Round-trip complete",
        jsonrpc=response.get("jsonrpc"),
        response_id=response.get("id"),
        task_state=response["result"]["status"]["state"],
    )

    # Verify the response carries back the same rpc_id and task_id
    assert response["id"] == rpc_request_id, "RPC ID must be echoed back"
    assert response["result"]["id"] == a2a_task_id, "Task ID must be preserved"

    # Verify the output text is present in artifacts
    artifacts = response["result"].get("artifacts", [])
    assert len(artifacts) > 0, "Expected at least one artifact"
    assert artifacts[0]["parts"][0]["type"] == "text"
    output_text = artifacts[0]["parts"][0]["text"]
    logger.info("Output from adapter", output_preview=output_text[:80])

    # Verify steps are propagated in metadata
    steps = response["result"]["metadata"].get("steps", [])
    logger.info("Steps propagated", count=len(steps))
    assert len(steps) > 0, "Expected steps in metadata"

    logger.info("All round-trip assertions passed")


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    await example_adapter_in_isolation()
    await example_agent_card()
    await example_manual_parse_and_build()
    await example_error_handling()
    await example_minimal_http_server()
    await example_in_memory_round_trip()

    logger.info("All A2AAdapter examples completed.")


if __name__ == "__main__":
    asyncio.run(main())

a2a_client_example.py

"""
A2AClient Example

Demonstrates how to use A2AClient to call separately-deployed A2A-compliant agent
services over HTTP, while mixing them transparently with local agents inside any
orchestrator.

1. Single A2AClient — execute a task against a remote A2A service
2. A2AClient with behaviors — retry + audit applied transparently
3. Mixed pipeline — local ReActAgent + A2AClient in PipelineOrchestrator
4. Error handling — connection errors and HTTP failures
7. Router + A2AClient — RuleBasedRouter selects which agent to call at runtime

A2AClient protocol (what the remote service must expose):

    GET /.well-known/agent.json
    → {"name": "...", "description": "...", "url": "...", "version": "...", "skills": [...]}

    POST /rpc
    Content-Type: application/json

    Request  — JSON-RPC 2.0 tasks/send:
        {"jsonrpc": "2.0", "id": "...", "method": "tasks/send",
         "params": {"id": "...", "message": {"role": "user",
                    "parts": [{"type": "text", "text": "..."}],
                    "metadata": {...}}}}
    Response — A2A Task:
        {"jsonrpc": "2.0", "id": "...",
         "result": {"id": "...", "status": {"state": "completed"},
                    "artifacts": [{"name": "result",
                                   "parts": [{"type": "text", "text": "..."}]}],
                    "metadata": {"steps": [...]}}}

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer
    pip install -e packages/shared-core
    # A running A2A agent service at the URL you configure (see example_single_a2a below)
    # OR run in demo mode — examples catch connection errors and explain what would happen

Usage::

    cd packages/orchestration-layer
    python examples/a2a_client_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import json
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.protocols.a2a.a2a_client import A2AClient, A2AClientError
from gmf_forge_ai_orchestration.behaviors.retry import RetryBehavior
from gmf_forge_ai_orchestration.behaviors.audit import AuditBehavior
from gmf_forge_ai_orchestration.multi_agent.pipeline import PipelineOrchestrator
from gmf_forge_ai_orchestration.routing import RuleBasedRouter, RoutingRequest

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

# Remote service URLs — replace with real deployed A2A endpoints
SEARCH_AGENT_URL = os.getenv("SEARCH_AGENT_URL", "http://localhost:8081")
ANALYSIS_AGENT_URL = os.getenv("ANALYSIS_AGENT_URL", "http://localhost:8082")

logger = BasicLogger("a2a_client_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


def create_gateway() -> UnifiedLLMGateway:
    """Build real LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs: dict = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


# ---------------------------------------------------------------------------
# Example 1: Single A2AClient
# ---------------------------------------------------------------------------

async def example_single_a2a() -> None:
    logger.info("=" * 60)
    logger.info("Example 1: Single A2AClient — basic A2A JSON-RPC delegation")
    logger.info("=" * 60)

    # A2AClient requires no llm_gateway — it delegates entirely over HTTP.
    # The remote service owns its own gateway and reasoning loop.
    agent = A2AClient(
        endpoint_url=SEARCH_AGENT_URL,
        agent_id="search_agent",
        timeout=30.0,
        logger=logger,
        metrics=metrics,
    )

    logger.info("Calling remote A2A agent", url=SEARCH_AGENT_URL)
    try:
        result = await agent.execute("Find the latest AI safety research papers from 2025")
        logger.info("Remote result", success=result.success, output=result.output[:200])
        logger.info("Steps returned", count=len(result.steps))
        for i, step in enumerate(result.steps):
            logger.info(f"  Step {i+1}", thought=step.thought[:60], action=step.action)
    except A2AClientError as e:
        logger.info(
            "Remote agent unreachable (expected in demo — deploy an A2A service at SEARCH_AGENT_URL)",
            error=str(e),
        )


# ---------------------------------------------------------------------------
# Example 2: A2AClient with behaviors
# ---------------------------------------------------------------------------

async def example_a2a_with_behaviors() -> None:
    logger.info("=" * 60)
    logger.info("Example 2: A2AClient with RetryBehavior + AuditBehavior")
    logger.info("=" * 60)

    # Behaviors stack on A2AClient exactly like local agents.
    # RetryBehavior retries the HTTP call on transient errors.
    # AuditBehavior logs every invocation regardless of agent type.
    agent = A2AClient(
        endpoint_url=SEARCH_AGENT_URL,
        agent_id="search_agent_with_retry",
        timeout=10.0,
        behaviors=[
            RetryBehavior(max_retries=3, base_delay=0.5, backoff_factor=2.0),
            AuditBehavior(logger=logger, metrics=metrics),
        ],
        logger=logger,
        metrics=metrics,
    )

    logger.info("Calling A2A agent with retry + audit", url=SEARCH_AGENT_URL)
    try:
        result = await agent.execute("Summarise recent developments in large language models")
        logger.info("Result", success=result.success, output=result.output[:200])
    except A2AClientError as e:
        logger.info(
            "All retries exhausted after 3 attempts (expected in demo)",
            error=str(e),
        )


# ---------------------------------------------------------------------------
# Example 3: A2AClient with auth headers
# ---------------------------------------------------------------------------

async def example_a2a_with_auth() -> None:
    logger.info("=" * 60)
    logger.info("Example 3: A2AClient with Authorization header")
    logger.info("=" * 60)

    api_token = os.getenv("AGENT_SERVICE_TOKEN", "demo-token")

    agent = A2AClient(
        endpoint_url=SEARCH_AGENT_URL,
        agent_id="authenticated_agent",
        timeout=30.0,
        headers={"Authorization": f"Bearer {api_token}"},
        logger=logger,
        metrics=metrics,
    )

    logger.info(
        "A2AClient configured with auth header",
        url=SEARCH_AGENT_URL,
        token_prefix=api_token[:8] + "...",
    )
    try:
        result = await agent.execute("What is the current status of AI regulation in the EU?")
        logger.info("Authenticated call result", success=result.success)
    except A2AClientError as e:
        logger.info("Auth call failed (expected in demo)", error=str(e))


# ---------------------------------------------------------------------------
# Example 4: Mixed pipeline — local + remote agents
# ---------------------------------------------------------------------------

async def example_mixed_pipeline() -> None:
    logger.info("=" * 60)
    logger.info("Example 4: PipelineOrchestrator — local + remote agents mixed")
    logger.info("=" * 60)

    # Deployment topology:
    #   search_agent  → deployed as a separate A2A service (A2AClient)
    #   analysis_agent → deployed as a separate A2A service (A2AClient)
    #   report_agent   → runs in-process (ReActAgent with local gateway)

    try:
        gateway = create_gateway()
    except EnvironmentError:
        logger.info("Skipping mixed pipeline — Azure credentials not set")
        return

    pipeline = PipelineOrchestrator(
        agents=[
            A2AClient(
                endpoint_url=SEARCH_AGENT_URL,
                agent_id="search_agent",
                timeout=30.0,
                logger=logger,
                metrics=metrics,
            ),
            A2AClient(
                endpoint_url=ANALYSIS_AGENT_URL,
                agent_id="analysis_agent",
                timeout=30.0,
                logger=logger,
                metrics=metrics,
            ),
            ReActAgent(
                llm_gateway=gateway,
                agent_id="report_agent",
                logger=logger,
                metrics=metrics,
            ),
        ],
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    logger.info(
        "Pipeline topology",
        step_1="search_agent (A2AClient → http://localhost:8081)",
        step_2="analysis_agent (A2AClient → http://localhost:8082)",
        step_3="report_agent (ReActAgent — in-process)",
    )

    # PipelineOrchestrator catches agent errors internally and returns a failed
    # OrchestratorResult (success=False) rather than propagating A2AClientError.
    # Check result.success to detect failure — do not rely on catching A2AClientError here.
    result = await pipeline.run(
        "Research the impact of transformer architectures on NLP benchmarks and write a report"
    )
    if result.success:
        logger.info("Pipeline result", success=result.success, rounds=result.rounds)
        logger.info("Final output", output=result.final_output[:300])
    else:
        logger.info(
            "Pipeline halted — remote agent unreachable (expected in demo — deploy A2A services)",
            success=result.success,
            rounds=result.rounds,
        )


# ---------------------------------------------------------------------------
# Mock server helpers — shared by Examples 5 and 6
#
# _run_mock_error_server  → used by Example 5 (error handling)
# _run_mock_a2a_server    → used by Example 6 (successful execution)
# ---------------------------------------------------------------------------

async def _run_mock_error_server(
    host: str, port: int, status: int, stop_event: asyncio.Event
) -> None:
    """Minimal HTTP server that always returns a given HTTP error status."""
    status_text = {503: "Service Unavailable", 500: "Internal Server Error"}.get(status, "Error")
    http_response = (
        f"HTTP/1.1 {status} {status_text}\r\n"
        f"Content-Length: 0\r\n"
        f"Connection: close\r\n"
        f"\r\n"
    ).encode()

    async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        try:
            await reader.read(4096)
        except Exception:
            pass
        writer.write(http_response)
        await writer.drain()
        writer.close()
        await writer.wait_closed()

    server = await asyncio.start_server(_handle, host, port)
    async with server:
        await stop_event.wait()


async def _run_mock_a2a_server(
    host: str, port: int, stop_event: asyncio.Event
) -> None:
    """Minimal HTTP server that returns a real-looking A2A JSON-RPC 2.0 response."""
    output_text = (
        "Transformer architectures revolutionised NLP by replacing recurrence with "
        "self-attention. Key milestones: BERT (2018) set new GLUE/SQuAD records; "
        "GPT-2/3 demonstrated few-shot generalisation; T5 unified NLP tasks under "
        "a text-to-text framework; and GPT-4 achieved human-level performance on "
        "several professional benchmarks."
    )
    body = json.dumps({
        "jsonrpc": "2.0",
        "id": "req-1",
        "result": {
            "id": "task-mock-1",
            "status": {"state": "completed"},
            "artifacts": [
                {
                    "name": "result",
                    "parts": [{"type": "text", "text": output_text}],
                }
            ],
            "metadata": {
                "steps": [
                    {
                        "thought": "I need to retrieve information about transformer impact on NLP benchmarks.",
                        "action": "search",
                        "action_input": {"query": "transformer architectures NLP benchmarks impact"},
                        "observation": "Found 3 relevant papers and 2 survey articles.",
                        "metadata": {"source": "mock_search"},
                    },
                    {
                        "thought": "I have enough information to answer the question.",
                        "action": "Final Answer",
                        "action_input": {"answer": "Transformers transformed NLP benchmarks."},
                        "observation": "",
                        "metadata": {},
                    },
                ]
            },
        },
    }).encode()

    http_response = (
        b"HTTP/1.1 200 OK\r\n"
        b"Content-Type: application/json\r\n"
        + f"Content-Length: {len(body)}\r\n".encode()
        + b"Connection: close\r\n"
        + b"\r\n"
        + body
    )

    async def _handle(reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
        try:
            await reader.read(4096)   # consume request — we always respond with success
        except Exception:
            pass
        writer.write(http_response)
        await writer.drain()
        writer.close()
        await writer.wait_closed()

    server = await asyncio.start_server(_handle, host, port)
    async with server:
        await stop_event.wait()

# ---------------------------------------------------------------------------
# Example 5: A2AClient error handling
#
# Shows the two main failure modes A2AClientError wraps:
#   1. Unreachable host (port 1 — nothing listening) → connection / timeout error
#   2. HTTP non-200 response → uses _run_mock_error_server to return HTTP 503
# ---------------------------------------------------------------------------

async def example_error_handling() -> None:
    logger.info("=" * 60)
    logger.info("Example 5: A2AClient error handling")
    logger.info("=" * 60)

    # Unreachable host — raises ConnectError or TimeoutException depending on OS/firewall
    agent = A2AClient(
        endpoint_url="http://127.0.0.1:1",   # nothing listening here
        agent_id="unreachable_agent",
        timeout=2.0,
        logger=logger,
        metrics=metrics,
    )
    try:
        await agent.execute("task")
    except A2AClientError as e:
        logger.info("Unreachable host caught as A2AClientError", error=str(e)[:80])

    # HTTP non-200 — in-process mock server that always returns 503
    host_err, port_err = "127.0.0.1", 18082
    stop_err = asyncio.Event()
    err_server_task = asyncio.create_task(
        _run_mock_error_server(host_err, port_err, 503, stop_err)
    )
    await asyncio.sleep(0.05)
    try:
        agent_error = A2AClient(
            endpoint_url=f"http://{host_err}:{port_err}",
            agent_id="error_agent",
            timeout=5.0,
            logger=logger,
            metrics=metrics,
        )
        await agent_error.execute("task")
    except A2AClientError as e:
        logger.info("HTTP 503 caught as A2AClientError", error=str(e)[:80])
    finally:
        stop_err.set()
        await err_server_task

# ---------------------------------------------------------------------------
# Example 6: Successful remote execution — in-process mock A2A server
#
# Spins up _run_mock_a2a_server in-process so Example 6 works with no
# external services. The mock returns a full A2A JSON-RPC 2.0 response
# including two reasoning steps, demonstrating that A2AClient parses them.
# ---------------------------------------------------------------------------

async def example_successful_execution() -> None:
    logger.info("=" * 60)
    logger.info("Example 6: Successful remote execution — in-process mock A2A server")
    logger.info("=" * 60)

    host, port = "127.0.0.1", 18081
    stop_event = asyncio.Event()

    # Start the mock A2A server in the background
    server_task = asyncio.create_task(_run_mock_a2a_server(host, port, stop_event))
    await asyncio.sleep(0.05)  # give it a moment to bind

    try:
        agent = A2AClient(
            endpoint_url=f"http://{host}:{port}",
            agent_id="mock_research_agent",
            timeout=5.0,
            logger=logger,
            metrics=metrics,
        )

        result = await agent.execute(
            "Explain the impact of transformer architectures on NLP benchmarks"
        )

        logger.info("Remote call succeeded", success=result.success)
        logger.info("Output preview", output=result.output[:120])
        logger.info("Steps returned by remote agent", count=len(result.steps))
        for i, step in enumerate(result.steps):
            logger.info(
                f"  Step {i + 1}",
                thought=step.thought[:70],
                action=step.action,
            )
    finally:
        stop_event.set()
        await server_task


# ---------------------------------------------------------------------------
# Example 7: Router + A2AClient — let the library decide which agent to call
# ---------------------------------------------------------------------------

async def example_routing_with_a2a() -> None:
    logger.info("=" * 60)
    logger.info("Example 7: RuleBasedRouter + A2AClient — library-driven routing")
    logger.info("=" * 60)

    # Two mock A2A servers represent two independently deployed task agents.
    summarizer_host, summarizer_port = "127.0.0.1", 18083
    search_host, search_port = "127.0.0.1", 18084

    def _make_mock_body(agent_name: str, output: str) -> bytes:
        body = json.dumps({
            "jsonrpc": "2.0",
            "id": "req-1",
            "result": {
                "id": f"task-{agent_name}-1",
                "status": {"state": "completed"},
                "artifacts": [{"name": "result", "parts": [{"type": "text", "text": output}]}],
                "metadata": {"steps": []},
            },
        }).encode()
        return (
            b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n"
            + f"Content-Length: {len(body)}\r\nConnection: close\r\n\r\n".encode()
            + body
        )

    async def _serve(host: str, port: int, response: bytes, stop: asyncio.Event) -> None:
        async def _h(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
            try:
                await r.read(4096)
            except Exception:
                pass
            w.write(response)
            await w.drain()
            w.close()
            await w.wait_closed()
        srv = await asyncio.start_server(_h, host, port)
        async with srv:
            await stop.wait()

    stop1, stop2 = asyncio.Event(), asyncio.Event()
    t1 = asyncio.create_task(
        _serve(
            summarizer_host, summarizer_port,
            _make_mock_body("summarizer", "Summary: Transformers use self-attention to process sequences in parallel."),
            stop1,
        )
    )
    t2 = asyncio.create_task(
        _serve(
            search_host, search_port,
            _make_mock_body("search", "Found 5 papers on transformer architectures."),
            stop2,
        )
    )
    await asyncio.sleep(0.05)

    # Registry mapping agent name → deployed URL
    agent_registry = {
        "summarizer-agent": f"http://{summarizer_host}:{summarizer_port}",
        "search-agent": f"http://{search_host}:{search_port}",
        "general-agent": f"http://{search_host}:{search_port}",  # reuse mock for demo
    }

    # RuleBasedRouter: developer declares intent as rules, not if/else chains.
    # Rules are evaluated in order; first match wins.
    # Other routers available: LLMRouter (LLM picks), SemanticRouter (embeddings),
    # LoadBalancingRouter (round-robin across replicas).
    router = RuleBasedRouter(
        rules=[
            (lambda r: "summarize" in r.input.lower() or "summary" in r.input.lower(), "summarizer-agent"),
            (lambda r: "search" in r.input.lower() or "find" in r.input.lower(), "search-agent"),
        ],
        fallback_target="general-agent",
    )

    tasks = [
        "Summarize the key ideas in the transformer paper",
        "Search for recent papers on attention mechanisms",
        "What is the capital of France?",  # no rule matches → fallback
    ]

    try:
        for task in tasks:
            decision = await router.route(
                RoutingRequest(
                    input=task,
                    available_agents=list(agent_registry.keys()),
                )
            )
            logger.info(
                "Router decision",
                task=task[:55],
                target=decision.target,
                confidence=decision.confidence,
                reasoning=decision.reasoning,
            )

            agent = A2AClient(
                endpoint_url=agent_registry[decision.target],
                agent_id=decision.target,
                timeout=5.0,
                logger=logger,
                metrics=metrics,
            )
            result = await agent.execute(task)
            logger.info("  → result", success=result.success, output=result.output[:80])
    finally:
        stop1.set()
        stop2.set()
        await asyncio.gather(t1, t2)


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("A2AClient Example starting")
    metrics.increment("example.a2a_client.started")

    await example_single_a2a()
    await example_a2a_with_behaviors()
    await example_a2a_with_auth()
    await example_mixed_pipeline()
    await example_error_handling()
    await example_successful_execution()
    await example_routing_with_a2a()

    logger.info("A2AClient examples complete")
    metrics.increment("example.a2a_client.completed")


if __name__ == "__main__":
    asyncio.run(main())

behaviors_example.py

"""
Behaviors Example

Demonstrates all behavior types that can be attached to any agent:

1. RetryBehavior       — exponential backoff on transient failures
2. GuardrailBehavior   — block forbidden content in inputs/outputs
3. CircuitBreakerBehavior — stop calling a failing agent after threshold
4. RateLimitBehavior   — token bucket rate limiting
5. HumanInLoopBehavior — pause and require human approval
6. AuditBehavior       — log every invocation with full context
7. OBOTokenBehavior    — On-Behalf-Of token exchange (Entra ID + Okta)

Behaviors are composable — stack as many as needed on a single agent.

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer

Usage::

    cd packages/orchestration-layer
    python examples/behaviors_example.py
"""

import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)

from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.behaviors.retry import RetryBehavior
from gmf_forge_ai_orchestration.behaviors.guardrail import GuardrailBehavior, GuardrailRule
from gmf_forge_ai_orchestration.behaviors.circuit_breaker import (
    CircuitBreakerBehavior,
    CircuitOpenError,
)
from gmf_forge_ai_orchestration.behaviors.rate_limit import RateLimitBehavior, RateLimitExceededError
from gmf_forge_ai_orchestration.behaviors.human_in_loop import (
    HumanInLoopBehavior,
    HumanApprovalRequired,
    PendingApproval,
)
from gmf_forge_ai_orchestration.behaviors.audit import AuditBehavior
from gmf_forge_ai_orchestration.behaviors.base import BehaviorContext
from gmf_forge_ai_orchestration.behaviors.obo_token import (
    OBOTokenBehavior,
    EntraOBOProvider,
    OktaOBOProvider,
    OBOTokenError,
)

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("behaviors_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()


def create_gateway() -> UnifiedLLMGateway:
    """Build real LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


def create_failing_gateway() -> UnifiedLLMGateway:
    """
    Build a gateway that always fails — points to an unreachable local address.
    Used to demonstrate RetryBehavior exhaustion and CircuitBreakerBehavior tripping open.
    """
    provider = AzureOpenAIProvider(
        endpoint="http://127.0.0.1:1",
        api_key="dummy",
        deployment_name="gpt-4",
        api_version="2024-02-15-preview",
    )
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


# ---------------------------------------------------------------------------
# Example 1: RetryBehavior — automatic retry with backoff
# ---------------------------------------------------------------------------

async def example_retry() -> None:
    logger.info("=" * 60)
    logger.info("Example 1: RetryBehavior — exponential backoff")
    logger.info("=" * 60)

    # Gateway points to an unreachable endpoint — connection errors trigger retries.
    # In production, transient API errors (rate limits, timeouts) are retried the same way.
    agent = ReActAgent(
        llm_gateway=create_failing_gateway(),
        behaviors=[
            RetryBehavior(
                max_retries=2,
                base_delay=0.1,       # short for demo
                backoff_factor=2.0,
                logger=logger,
            )
        ],
        agent_id="retry-demo",
        logger=logger,
        metrics=metrics,
    )

    try:
        await agent.execute("Test retry behavior")
    except Exception as e:
        logger.info("Retries exhausted as expected", attempts=3, error=type(e).__name__)


# ---------------------------------------------------------------------------
# Example 2: GuardrailBehavior — content validation
# ---------------------------------------------------------------------------

async def example_guardrail() -> None:
    logger.info("=" * 60)
    logger.info("Example 2: GuardrailBehavior — input/output validation")
    logger.info("=" * 60)

    agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[
            GuardrailBehavior(rules=[
                GuardrailRule(
                    name="no_pii_keywords",
                    blocked_words=["ssn", "social security number", "password"],
                    apply_to="both",
                ),
                GuardrailRule(
                    name="max_input_length",
                    max_length=500,
                    apply_to="input",
                ),
            ])
        ],
        agent_id="guardrail-demo",
        logger=logger,
        metrics=metrics,
    )

    # Clean input — passes
    result = await agent.execute("What is the weather in New York?")
    logger.info("Clean input passed", success=result.success)

    # Input with blocked word — raises GuardrailViolationError
    from gmf_forge_ai_orchestration.behaviors.guardrail import GuardrailViolationError
    try:
        await agent.execute("What is my ssn if my name is John?")
    except GuardrailViolationError as e:
        logger.info("Guardrail blocked input as expected", rule=e.rule_name)


# ---------------------------------------------------------------------------
# Example 3: CircuitBreakerBehavior — fail fast after threshold
# ---------------------------------------------------------------------------

async def example_circuit_breaker() -> None:
    logger.info("=" * 60)
    logger.info("Example 3: CircuitBreakerBehavior — CLOSED -> OPEN -> HALF_OPEN")
    logger.info("=" * 60)

    # Gateway points to an unreachable endpoint — connection errors trip the circuit.
    always_failing = create_failing_gateway()

    cb = CircuitBreakerBehavior(
        failure_threshold=3,
        recovery_timeout=0.1,    # short for demo
        logger=logger,
        metrics=metrics,
    )

    agent = ReActAgent(
        llm_gateway=always_failing,
        behaviors=[cb],
        agent_id="cb-demo",
        logger=logger,
        metrics=metrics,
    )

    # Trip the circuit after threshold failures — each call raises, not returns failure
    for i in range(3):
        try:
            await agent.execute(f"Call {i+1}")
        except Exception as e:
            logger.info(
                f"Call {i+1} failed",
                error_type=type(e).__name__,
                circuit_state=cb._state.value,
            )

    # Circuit is OPEN — next call fails fast without hitting the LLM
    try:
        await agent.execute("This should be blocked by open circuit")
    except CircuitOpenError:
        logger.info("Circuit is OPEN — call blocked as expected")

    # Wait for recovery timeout, then a probe is allowed (HALF_OPEN)
    await asyncio.sleep(0.15)
    logger.info("Recovery timeout elapsed, circuit entering HALF_OPEN state")


# ---------------------------------------------------------------------------
# Example 4: RateLimitBehavior — token bucket
# ---------------------------------------------------------------------------

async def example_rate_limit() -> None:
    logger.info("=" * 60)
    logger.info("Example 4: RateLimitBehavior — token bucket rate limiting")
    logger.info("=" * 60)

    # calls_per_second=0.1 means 1 token every 10s — bucket refills very slowly.
    # burst=2 means the first 2 calls are allowed immediately from the burst budget.
    # By the time Request 3 is attempted, even after the LLM call (~1-2s), only
    # 0.1–0.2 tokens have refilled — not enough for a full call → rate limited.
    agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[
            RateLimitBehavior(
                calls_per_second=0.1,
                burst=2,
                wait_on_limit=False,   # raise immediately if no tokens
            )
        ],
        agent_id="rate-limit-demo",
        logger=logger,
        metrics=metrics,
    )

    # First 2 calls: consumed from burst budget
    for i in range(2):
        result = await agent.execute(f"Request {i+1}")
        logger.info(f"Request {i+1} allowed", success=result.success)

    # 3rd immediate call: bucket empty, raises RateLimitExceededError
    try:
        await agent.execute("Request 3 — should be rate limited")
    except RateLimitExceededError:
        logger.info("Request 3 rate-limited as expected")


# ---------------------------------------------------------------------------
# Example 5: HumanInLoopBehavior — approval gate (sync and async/offline modes)
# ---------------------------------------------------------------------------

async def example_human_in_loop() -> None:
    logger.info("=" * 60)
    logger.info("Example 5a: HumanInLoopBehavior — sync mode (inline reviewer)")
    logger.info("=" * 60)

    # ------------------------------------------------------------------
    # Sync mode: review_callback blocks until the human decides.
    # Suitable for short-lived approvals (interactive CLI, quick UI).
    # ------------------------------------------------------------------
    approvals = {"high_risk": False, "low_risk": True}

    async def review_callback(ctx: BehaviorContext, result) -> bool:
        """Simulate a human reviewing the result before it is returned."""
        is_high_risk = "high risk" in ctx.task.lower()
        approved = approvals.get("high_risk" if is_high_risk else "low_risk", True)
        logger.info("Human review (sync)", task=ctx.task[:60], approved=approved)
        return approved

    sync_agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[HumanInLoopBehavior(review_callback=review_callback)],
        agent_id="hitl-sync-demo",
        logger=logger,
        metrics=metrics,
    )

    # Low-risk task — approved by inline reviewer
    result = await sync_agent.execute("Summarise the quarterly report")
    logger.info("Low-risk result approved", output=result.output)

    # High-risk task — rejected by inline reviewer
    try:
        await sync_agent.execute("Perform a high risk transfer of $1,000,000")
    except HumanApprovalRequired:
        logger.info("High-risk action rejected by human reviewer as expected")

    # ------------------------------------------------------------------
    # Async / offline mode: submit_callback fires the request and returns
    # an approval_id immediately — the agent does NOT block.
    #
    # Pattern:
    #   1. Agent finishes and calls submit_callback → gets approval_id
    #   2. PendingApproval is raised — caller stores (approval_id, result)
    #   3. Human visits the approval page at their own time and decides
    #   4. Your webhook endpoint receives the decision, looks up stored
    #      result by approval_id, and delivers or discards it
    # ------------------------------------------------------------------
    logger.info("=" * 60)
    logger.info("Example 5b: HumanInLoopBehavior — async/offline mode")
    logger.info("=" * 60)

    # Simulated in-memory approval store (in production: a database)
    pending_store: dict = {}

    async def submit_callback(ctx: BehaviorContext, result) -> str:
        """
        Post the approval request to your REST API and return the approval_id.
        The agent terminates immediately after this — no blocking.

        In production this would be something like:
            resp = await http_client.post("/approvals", json={...})
            return resp.json()["approval_id"]
        """
        approval_id = f"appr-{ctx.agent_id}-{id(result)}"
        logger.info(
            "Approval request submitted (async)",
            approval_id=approval_id,
            task=ctx.task[:60],
        )
        # Simulate what your REST API would persist
        pending_store[approval_id] = {"task": ctx.task, "status": "pending"}
        return approval_id

    async_agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[HumanInLoopBehavior(submit_callback=submit_callback)],
        agent_id="hitl-async-demo",
        logger=logger,
        metrics=metrics,
    )

    try:
        await async_agent.execute("Generate the monthly compliance report")
    except PendingApproval as pa:
        # Caller persists the result — agent has already returned
        pending_store[pa.approval_id]["result"] = pa.result
        logger.info(
            "Agent terminated — approval pending",
            approval_id=pa.approval_id,
            task=pa.task[:60],
        )

    # --- Simulate human approving via webhook at a later time ---
    # In production: your FastAPI/Flask webhook endpoint does this
    for approval_id, record in pending_store.items():
        record["status"] = "approved"               # human clicked Approve
        approved_result = record["result"]
        logger.info(
            "Webhook received: human approved",
            approval_id=approval_id,
            output=approved_result.output,
        )


# ---------------------------------------------------------------------------
# Example 6: AuditBehavior — automatic audit logging
# ---------------------------------------------------------------------------

async def example_audit() -> None:
    logger.info("=" * 60)
    logger.info("Example 6: AuditBehavior — automatic audit trail")
    logger.info("=" * 60)

    agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[AuditBehavior(logger=logger, metrics=metrics)],
        agent_id="audit-demo",
        logger=logger,
        metrics=metrics,
    )

    result = await agent.execute("Process customer request #12345")
    logger.info("Audited result", output=result.output)
    # AuditBehavior emits two structured log lines per call:
    #   "Agent task started"   — agent_id, task, attempt (look for it above)
    #   "Agent task completed" — agent_id, task, duration_ms, success, output_preview
    # These are distinct from the tracer ([trace]/[generation]/[span] lines) and the
    # agent's own execution logs. AuditBehavior targets compliance/audit log streams
    # and also increments metrics: behavior.audit.invocations, behavior.audit.duration_ms


# ---------------------------------------------------------------------------
# Example 7: OBOTokenBehavior — On-Behalf-Of token exchange (Entra ID + Okta)
# ---------------------------------------------------------------------------

# Required env vars for 7a (Entra):
#   ENTRA_TENANT_ID, ENTRA_CLIENT_ID, ENTRA_CLIENT_SECRET
#   ENTRA_SCOPES          — comma-separated, e.g. "api://servicenow/.default"
#   USER_ASSERTION_TOKEN  — a valid user access token to exchange
#
# Required env vars for 7b (Okta):
#   OKTA_DOMAIN, OKTA_CLIENT_ID, OKTA_CLIENT_SECRET
#   OKTA_SCOPES                  — comma-separated, e.g. "servicenow.write"
#   OKTA_AUTHORIZATION_SERVER_ID — defaults to "default"
#   USER_ASSERTION_TOKEN         — a valid user access token to exchange

def _require_obo_env(keys: list[str]) -> dict[str, str]:
    """Return a dict of env var values; raise EnvironmentError if any are missing."""
    missing = [k for k in keys if not os.getenv(k, "").strip()]
    if missing:
        raise EnvironmentError(
            f"Set these env vars to run this example: {', '.join(missing)}"
        )
    return {k: os.environ[k].strip() for k in keys}


async def example_obo_token() -> None:
    """
    Three sub-examples:
      7a: Microsoft Entra ID OBO — MSAL acquire_token_on_behalf_of
      7b: Okta OBO               — RFC 8693 token-exchange grant
      7c: Missing user_assertion — graceful ValueError

    Requires real credentials in .env (see variable list above).
    Sub-examples 7a and 7b are skipped with a clear message when their
    required env vars are absent so the rest of the script keeps running.
    """
    # ── 7a: Microsoft Entra ID ────────────────────────────────────────────
    logger.info("=" * 60)
    logger.info("Example 7a: OBOTokenBehavior — Microsoft Entra ID (MSAL OBO)")
    logger.info("=" * 60)

    try:
        entra_env = _require_obo_env([
            "ENTRA_TENANT_ID", "ENTRA_CLIENT_ID", "ENTRA_CLIENT_SECRET",
            "ENTRA_SCOPES", "USER_ASSERTION_TOKEN",
        ])
    except EnvironmentError as exc:
        logger.info(f"Skipping 7a — {exc}")
    else:
        entra_behavior = OBOTokenBehavior(
            provider=EntraOBOProvider(
                tenant_id=entra_env["ENTRA_TENANT_ID"],
                client_id=entra_env["ENTRA_CLIENT_ID"],
                client_secret=entra_env["ENTRA_CLIENT_SECRET"],
                scopes=[s.strip() for s in entra_env["ENTRA_SCOPES"].split(",")],
            )
        )

        entra_agent = ReActAgent(
            llm_gateway=create_gateway(),
            behaviors=[entra_behavior],
            agent_id="entra-obo-demo",
            logger=logger,
            metrics=metrics,
        )

        # user_assertion comes from the FastAPI Authorization header,
        # forwarded through the supervisor context unchanged.
        result = await entra_agent.execute(
            "Fetch open incidents from ServiceNow on behalf of the user",
            context={"user_assertion": entra_env["USER_ASSERTION_TOKEN"]},
        )

        # The exchanged OBO token is stored in context.metadata['obo_token'].
        # Agent tools retrieve it with: context.get('obo_token')
        logger.info("Entra OBO exchange completed", agent_success=result.success)

    # ── 7b: Okta RFC 8693 ─────────────────────────────────────────────────
    logger.info("=" * 60)
    logger.info("Example 7b: OBOTokenBehavior — Okta (RFC 8693 token-exchange)")
    logger.info("=" * 60)

    try:
        okta_env = _require_obo_env([
            "OKTA_DOMAIN", "OKTA_CLIENT_ID", "OKTA_CLIENT_SECRET",
            "OKTA_SCOPES", "USER_ASSERTION_TOKEN",
        ])
    except EnvironmentError as exc:
        logger.info(f"Skipping 7b — {exc}")
    else:
        okta_behavior = OBOTokenBehavior(
            provider=OktaOBOProvider(
                domain=okta_env["OKTA_DOMAIN"],
                client_id=okta_env["OKTA_CLIENT_ID"],
                client_secret=okta_env["OKTA_CLIENT_SECRET"],
                scopes=[s.strip() for s in okta_env["OKTA_SCOPES"].split(",")],
                authorization_server_id=os.getenv(
                    "OKTA_AUTHORIZATION_SERVER_ID", "default"
                ),
            )
        )

        okta_agent = ReActAgent(
            llm_gateway=create_gateway(),
            behaviors=[okta_behavior],
            agent_id="okta-obo-demo",
            logger=logger,
            metrics=metrics,
        )

        result = await okta_agent.execute(
            "Update the HR leave record on behalf of the user",
            context={"user_assertion": okta_env["USER_ASSERTION_TOKEN"]},
        )

        logger.info("Okta OBO exchange completed", agent_success=result.success)

    # ── 7c: Missing user_assertion — detected before any LLM call ─────────
    logger.info("=" * 60)
    logger.info("Example 7c: OBOTokenBehavior — missing user_assertion")
    logger.info("=" * 60)

    # Build a minimal agent with any OBO provider — the missing assertion is
    # caught in before_execute regardless of which provider is configured.
    sentinel_behavior = OBOTokenBehavior(
        provider=EntraOBOProvider(
            tenant_id="t", client_id="c", client_secret="s",
            scopes=["api://example/.default"],
        )
    )
    sentinel_agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[sentinel_behavior],
        agent_id="obo-missing-assertion-demo",
        logger=logger,
        metrics=metrics,
    )

    try:
        # No context supplied — OBOTokenBehavior raises ValueError in before_execute,
        # before the agent makes any LLM call.
        await sentinel_agent.execute("Task without a forwarded user token")
    except ValueError as exc:
        logger.info("Missing assertion caught as expected", error=str(exc))


# ---------------------------------------------------------------------------
# Example 8: Composing multiple behaviors on one agent
# ---------------------------------------------------------------------------

async def example_composed_behaviors() -> None:
    logger.info("=" * 60)
    logger.info("Example 8: All behaviors composed on a single agent")
    logger.info("=" * 60)

    # Real gateway — retry fires on transient API errors; all other behaviors stack normally
    agent = ReActAgent(
        llm_gateway=create_gateway(),
        behaviors=[
            RetryBehavior(max_retries=2, base_delay=0.05, logger=logger),
            GuardrailBehavior(rules=[
                GuardrailRule(name="no_secrets", blocked_words=["api_key", "secret"], apply_to="both"),
            ]),
            RateLimitBehavior(calls_per_second=5.0, burst=5, wait_on_limit=True),
            AuditBehavior(logger=logger, metrics=metrics),
        ],
        agent_id="composed-demo",
        logger=logger,
        metrics=metrics,
    )

    result = await agent.execute("Analyze the dataset and provide a summary")
    logger.info("Composed behavior result", success=result.success, output=result.output[:100])


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Behaviors Example starting")
    metrics.increment("example.behaviors.started")

    await example_retry()
    await example_guardrail()
    await example_circuit_breaker()
    await example_rate_limit()
    await example_human_in_loop()
    await example_audit()
    await example_obo_token()
    await example_composed_behaviors()

    logger.info("All behavior examples complete")
    metrics.increment("example.behaviors.completed")


if __name__ == "__main__":
    asyncio.run(main())

chain_of_thought_example.py

"""
Chain-of-Thought Agent Example

Demonstrates ChainOfThoughtAgent which:
- Instructs the LLM to reason inside <thinking>...</thinking> tags
- Strips the scratchpad from the final output
- Exposes the full reasoning in result.metadata["thinking"]

Useful when you want to see *why* the model reached a conclusion,
or when you need to log the reasoning chain for audit purposes.

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer

Usage::

    cd packages/orchestration-layer
    python examples/chain_of_thought_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.chain_of_thought_agent import ChainOfThoughtAgent
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("chain_of_thought_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


def create_gateway() -> UnifiedLLMGateway:
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


# ---------------------------------------------------------------------------
# Example 1: Reasoning visible in metadata
# ---------------------------------------------------------------------------

async def example_visible_reasoning(agent: ChainOfThoughtAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 1: Visible reasoning scratchpad")
    logger.info("=" * 60)

    result = await agent.execute(
        "A company has 3 departments. Department A has 12 employees, "
        "Department B has 8, Department C has 15. The company gives a "
        "10% bonus to the largest department and 5% to the others. "
        "If the average salary is $80,000, what is the total bonus payout?"
    )

    logger.info("Final answer", output=result.output)

    # The <thinking> block is accessible for audit/debugging
    thinking = result.metadata.get("thinking", "")
    if thinking:
        logger.info("Scratchpad captured", length=len(thinking))
        print("\n--- Reasoning scratchpad (Example 1) ---")
        print(thinking)
        print("--- End scratchpad ---\n")
    else:
        logger.info("No thinking block in response (model did not use <thinking> tags)")

    execution_id = result.metadata.get("execution_id")
    if execution_id and agent.checkpoint_manager:
        resumed = await agent.resume_from(str(execution_id))
        logger.info(
            "Resumed from checkpoint",
            execution_id=execution_id,
            resumed_output=resumed.output,
        )


# ---------------------------------------------------------------------------
# Example 2: Multi-step logical reasoning
# ---------------------------------------------------------------------------

async def example_logical_reasoning(agent: ChainOfThoughtAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 2: Multi-step logical reasoning")
    logger.info("=" * 60)

    result = await agent.execute(
        "All policies requiring manager approval must be reviewed annually. "
        "Policy X requires manager approval. Policy Y does not require approval. "
        "Policy Z was last reviewed 18 months ago and requires manager approval. "
        "Which policies must be reviewed this year?"
    )

    logger.info("Final answer", output=result.output)
    thinking = result.metadata.get("thinking", "")
    if thinking:
        logger.info("Reasoning chain captured", length=len(thinking))
        print("\n--- Reasoning chain (Example 2) ---")
        print(thinking)
        print("--- End reasoning chain ---\n")


# ---------------------------------------------------------------------------
# Example 3: Policy decision with audit trail
# ---------------------------------------------------------------------------

async def example_policy_decision(agent: ChainOfThoughtAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 3: Policy decision with full audit trail")
    logger.info("=" * 60)

    result = await agent.execute(
        "A customer requests a credit limit increase from $5,000 to $25,000. "
        "Their credit score is 720, they have been a customer for 3 years, "
        "and their income is $95,000/year with no missed payments. "
        "Based on standard lending guidelines, should this be approved, "
        "approved with conditions, or declined?"
    )

    logger.info("Decision", output=result.output, success=result.success)

    # In production, log the full reasoning for compliance/audit
    thinking = result.metadata.get("thinking", "")
    if thinking:
        logger.info(
            "Audit trail — model reasoning",
            decision=result.output[:100],
            reasoning_length=len(thinking),
        )
        # In a real app: store thinking in an audit log / database
        print("\n--- Full reasoning chain (for audit) ---")
        print(thinking)
        print("--- End reasoning chain ---\n")


# ---------------------------------------------------------------------------
# Example 4: Custom system_prompt — developer-defined persona
# ---------------------------------------------------------------------------

async def example_custom_prompt(
    gateway: UnifiedLLMGateway,
    checkpoint_manager: CheckpointManager,
) -> None:
    logger.info("=" * 60)
    logger.info("Example 4: ChainOfThoughtAgent — custom system_prompt")
    logger.info("=" * 60)

    # Developers can inspect the library default before customising:
    #   print(ChainOfThoughtAgent.DEFAULT_SYSTEM_PROMPT)
    #
    # Here we replace it entirely with a domain-specific persona.
    # The {task} placeholder is required — the agent injects the task there.
    compliance_prompt = """\
You are a senior compliance officer at a regulated financial institution.
You must evaluate every request against AML, KYC, and data-privacy regulations.

Before answering, reason through the applicable AML, KYC, and data-privacy rules, \
relevant risk factors, and any mitigants inside <thinking> tags.
Then state your compliance ruling and recommended action outside the tags.

Task: {task}
"""

    agent = ChainOfThoughtAgent(
        llm_gateway=gateway,
        agent_id="cot-compliance-demo",
        system_prompt=compliance_prompt,   # <-- developer override
        checkpoint_manager=checkpoint_manager,
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
        temperature=0.0,
    )

    result = await agent.execute(
        "A customer wants to wire $50,000 to an account in a high-risk jurisdiction "
        "with no supporting documentation. Should we proceed?"
    )

    logger.info("Compliance ruling", output=result.output)
    thinking = result.metadata.get("thinking", "")
    if thinking:
        logger.info("Regulatory analysis captured", length=len(thinking))
        print("\n--- Compliance reasoning ---")
        print(thinking)
        print("--- End ---\n")


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Chain-of-Thought Agent Example starting")
    metrics.increment("example.cot.started")

    gateway = create_gateway()
    state_store = InMemoryStateStore()
    checkpoint_manager = CheckpointManager(state_store)

    # Inspect the library's default prompt at any time without instantiating:
    logger.info(
        "Library default prompt (truncated)",
        preview=ChainOfThoughtAgent.DEFAULT_SYSTEM_PROMPT[:120].replace("\n", " "),
    )

    # Examples 1-3 use the default prompt — no system_prompt arg needed.
    agent = ChainOfThoughtAgent(
        llm_gateway=gateway,
        agent_id="cot-demo",
        checkpoint_manager=checkpoint_manager,
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
        temperature=0.0,
    )

    logger.info("ChainOfThoughtAgent created", agent_id="cot-demo")

    await example_visible_reasoning(agent)
    await example_logical_reasoning(agent)
    await example_policy_decision(agent)

    # Example 4 shows a developer overriding the prompt with a custom persona.
    await example_custom_prompt(gateway, checkpoint_manager)

    logger.info("Example complete")
    metrics.increment("example.cot.completed")


if __name__ == "__main__":
    asyncio.run(main())

complete_workflow.py

"""
Complete Workflow — End-to-End Orchestration

Demonstrates a production-ready integration of all orchestration-layer
components working together:

Architecture:
    config.py        → Environment configuration
    agent_setup.py   → Agent and behavior wiring        (inline below)
    complete_workflow.py → Orchestration platform

Pipeline:
    1. User task arrives
    2. RuleBasedRouter selects the right agent for the task
    3. Selected agent runs with RetryBehavior + GuardrailBehavior + AuditBehavior
    4. For complex research tasks: SupervisorOrchestrator decomposes and assigns
    5. Agent state and conversation history persisted in state store
    6. Checkpoints saved so long tasks can be resumed
    7. Results posted to shared Blackboard for downstream consumers
    8. Full observability: logging, metrics, performance monitoring, tracing

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer
    pip install -e packages/shared-core

Usage::

    cd packages/orchestration-layer
    python examples/complete_workflow.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
import time
from datetime import datetime, timezone
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.registry import ToolRegistry
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.agents.plan_execute_agent import PlanExecuteAgent
from gmf_forge_ai_orchestration.behaviors.retry import RetryBehavior
from gmf_forge_ai_orchestration.behaviors.guardrail import GuardrailBehavior, GuardrailRule
from gmf_forge_ai_orchestration.behaviors.audit import AuditBehavior
from gmf_forge_ai_orchestration.behaviors.circuit_breaker import CircuitBreakerBehavior
from gmf_forge_ai_orchestration.multi_agent.supervisor import SupervisorOrchestrator
from gmf_forge_ai_orchestration.routing.rule_router import RuleBasedRouter
from gmf_forge_ai_orchestration.routing.base import RoutingRequest
from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
from gmf_forge_ai_orchestration.state.factory import StateStoreFactory
from gmf_forge_ai_orchestration.state.base import ConversationState
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.blackboard import Blackboard

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

# Module-level observability
logger = BasicLogger("complete_workflow")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


# ---------------------------------------------------------------------------
# Setup helpers
# ---------------------------------------------------------------------------

def create_gateway() -> UnifiedLLMGateway:
    """Build LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)
        logger.debug("Using corporate SSL certificate")

    provider = AzureOpenAIProvider(**provider_kwargs)
    gateway = UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)
    logger.info("LLM gateway created", deployment=deployment)
    return gateway


def register_tools(registry: ToolRegistry) -> None:
    """Register tools available to agents."""

    async def search(query: str) -> str:
        logger.info("Tool: search", query=query)
        return f"Search results for '{query}': top 3 results retrieved."

    async def get_policy(policy_id: str) -> str:
        logger.info("Tool: get_policy", policy_id=policy_id)
        return f"Policy {policy_id}: Standard approval required. Last reviewed 2025-12-01."

    async def calculate(expression: str) -> str:
        logger.info("Tool: calculate", expression=expression)
        allowed = set("0123456789+-*/(). ")
        if not all(c in allowed for c in expression):
            return "Error: only basic arithmetic expressions are supported."
        try:
            result = eval(expression, {"__builtins__": {}})  # nosec
            return str(result)
        except Exception as exc:
            return f"Error: {exc}"

    registry.register("search", search, "Search for information. Input: query string.")
    registry.register("get_policy", get_policy, "Retrieve a policy by ID. Input: policy_id string.")
    registry.register("calculate", calculate, "Evaluate arithmetic expression. Input: expression string.")
    logger.info("Tools registered", count=3)


def build_standard_behaviors(logger_: BasicLogger, metrics_: BasicMetricsCollector) -> list:
    """Standard behavior stack applied to all production agents."""
    return [
        RetryBehavior(max_retries=3, base_delay=1.0, backoff_factor=2.0),
        GuardrailBehavior(rules=[
            GuardrailRule(name="no_secrets", blocked_words=["api_key", "password", "secret"], apply_to="both"),
            GuardrailRule(name="input_length", max_length=4000, apply_to="input"),
        ]),
        CircuitBreakerBehavior(failure_threshold=5, recovery_timeout=60.0, logger=logger_, metrics=metrics_),
        AuditBehavior(logger=logger_, metrics=metrics_),
    ]


# ---------------------------------------------------------------------------
# Platform class
# ---------------------------------------------------------------------------

class OrchestrationPlatform:
    """
    Production-ready orchestration platform.

    Wires together:
    - Multiple specialised agents (ReAct, PlanExecute)
    - Standard behavior stack on every agent
    - RuleBasedRouter to direct tasks to the right agent
    - SupervisorOrchestrator for complex multi-step tasks
    - State store for conversation history and cache
    - CheckpointManager for long-running task resumability
    - Blackboard for sharing results between agents/services
    - Full observability throughout
    """

    def __init__(self, gateway: UnifiedLLMGateway, tool_registry: ToolRegistry) -> None:
        logger.info("Initializing OrchestrationPlatform")
        metrics.increment("platform.init")

        self.gateway = gateway
        self.tool_registry = tool_registry

        # State backend (memory or redis based on env var)
        backend = os.getenv("STATE_BACKEND", "memory")
        if backend == "redis":
            redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
            self.state_store = StateStoreFactory.create("redis", url=redis_url, key_prefix="platform:")
        else:
            self.state_store = StateStoreFactory.create("memory")
        logger.info("State store created", backend=backend)

        # Shared infrastructure
        # CheckpointManager with TTL configuration
        # TTL is applied to all three checkpoint key types:
        #   - __ckpt_data__<id>: actual checkpoint payload
        #   - __ckpt_index__<agent>: agent checkpoint catalog
        #   - __ckpt_exec__<execution_id>: execution tracking catalog
        # All three expire together, preventing stale references after data expiry.
        # Default TTL: 86400 seconds (24 hours). Override with CHECKPOINT_TTL env var.
        checkpoint_ttl = int(os.getenv("CHECKPOINT_TTL", "86400"))
        self.checkpoint_manager = CheckpointManager(self.state_store, default_ttl=checkpoint_ttl)
        self.blackboard = Blackboard(self.state_store, namespace="platform-run")

        # Build agents
        behaviors = build_standard_behaviors(logger, metrics)

        self.react_agent = ReActAgent(
            llm_gateway=gateway,
            tool_registry=tool_registry,
            behaviors=behaviors,
            state_store=self.state_store,
            checkpoint_manager=self.checkpoint_manager,
            agent_id="react-agent",
            logger=logger,
            metrics=metrics,
            performance_monitor=monitor,
            tracer=tracer,
            max_steps=10,
            temperature=0.0,
        )

        self.research_agent = PlanExecuteAgent(
            llm_gateway=gateway,
            tool_registry=tool_registry,
            behaviors=behaviors,
            state_store=self.state_store,
            checkpoint_manager=self.checkpoint_manager,
            agent_id="research-agent",
            logger=logger,
            metrics=metrics,
            performance_monitor=monitor,
            tracer=tracer,
        )

        # Supervisor for complex multi-agent tasks
        self.supervisor = SupervisorOrchestrator(
            supervisor_gateway=gateway,
            agents={
                "react-agent": self.react_agent,
                "research-agent": self.research_agent,
            },
            agent_descriptions={
                "react-agent": "General-purpose agent with tools for search, policy lookup, and calculation.",
                "research-agent": "Multi-step research agent that plans and executes structured investigations.",
            },
            logger=logger,
            metrics=metrics,
            tracer=tracer,
        )

        # Router selects between direct agent or supervisor
        self.router = RuleBasedRouter(
            rules=[
                (lambda r: any(w in r.input.lower() for w in ["research", "investigate", "analyse", "compare"]),
                 "supervisor"),
                (lambda r: any(w in r.input.lower() for w in ["search", "find", "policy", "calculate"]),
                 "react-agent"),
            ],
            fallback_target="react-agent",
            logger=logger,
            metrics=metrics,
            tracer=tracer,
        )

        logger.info("OrchestrationPlatform ready", agents=2, behaviors_per_agent=4)

    async def process(self, task: str, session_id: str = "default") -> dict:
        """
        Process a task end-to-end:
          1. Route to the appropriate agent/orchestrator
          2. Execute with full behavior stack
          3. Persist conversation history
          4. Save checkpoint
          5. Post result to blackboard
        """
        request_id = f"req-{int(datetime.now(timezone.utc).timestamp() * 1000)}"
        start_time = time.monotonic()

        logger.info("Processing request", request_id=request_id, session_id=session_id, task=task[:80])
        metrics.increment("platform.requests.total")

        with tracer.trace(
            "platform.process",
            input=task,
            metadata={"request_id": request_id, "session_id": session_id},
        ) as trace:
            try:
                execution_id = None
                # Step 1: Route
                routing_decision = await self.router.route(
                    RoutingRequest(input=task, available_agents=["react-agent", "research-agent", "supervisor"])
                )
                target = routing_decision.target
                logger.info("Routed", target=target, confidence=routing_decision.confidence)
                trace.set_output({"route": target})

                # Step 2: Execute
                with trace.span("execution", input=task) as span:
                    if target == "supervisor":
                        # Forward execution context so all sub-agents receive
                        # user_assertion and any other caller-supplied keys.
                        # In a real FastAPI handler, build this dict from the
                        # Authorization header and ChatMessage.context body field.
                        result = await self.supervisor.run(
                            task,
                            context={"session_id": session_id},
                        )
                        output = result.final_output
                        success = result.success
                    else:
                        agent = self.react_agent
                        agent_result = await agent.execute(task, context={"session_id": session_id})
                        output = agent_result.output
                        success = agent_result.success
                        execution_id = agent_result.metadata.get("execution_id")
                    span.set_output(output)

                # Step 3: Persist conversation history
                conv_key = f"conv:{session_id}"
                raw_conv = await self.state_store.get(conv_key)
                conv = ConversationState.from_dict(raw_conv) if raw_conv else ConversationState(session_id=session_id)
                conv.add_message(role="user", content=task)
                conv.add_message(role="assistant", content=output)
                await self.state_store.set(conv_key, conv.to_dict())
                logger.info("Conversation history updated", session_id=session_id, total_messages=len(conv.messages))

                # Step 4: Checkpoint
                ckpt_id = await self.checkpoint_manager.save(
                    agent_id=target,
                    state={"task": task, "output": output, "success": success},
                    execution_id=str(execution_id or request_id),
                    metadata={"request_id": request_id, "session_id": session_id},
                )
                logger.info("Checkpoint saved", checkpoint_id=ckpt_id)

                # Step 5: Post to blackboard for downstream consumers
                await self.blackboard.write(
                    key=f"result:{request_id}",
                    value={"output": output, "success": success, "agent": target},
                    author=target,
                )

                duration_ms = int((time.monotonic() - start_time) * 1000)
                metrics.histogram("platform.request.duration_ms", duration_ms)
                metrics.increment("platform.requests.success" if success else "platform.requests.failed")

                logger.info(
                    "Request complete",
                    request_id=request_id,
                    success=success,
                    duration_ms=duration_ms,
                )

                return {
                    "request_id": request_id,
                    "output": output,
                    "success": success,
                    "agent": target,
                    "checkpoint_id": ckpt_id,
                    "duration_ms": duration_ms,
                }

            except Exception as exc:
                duration_ms = int((time.monotonic() - start_time) * 1000)
                metrics.increment("platform.requests.error")
                logger.error("Request failed", request_id=request_id, error=str(exc), duration_ms=duration_ms)
                raise


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Complete Workflow Example starting")
    metrics.increment("example.complete_workflow.started")

    # Bootstrap
    gateway = create_gateway()
    tool_registry = ToolRegistry()
    register_tools(tool_registry)
    platform = OrchestrationPlatform(gateway=gateway, tool_registry=tool_registry)

    # Run a variety of tasks through the platform
    tasks = [
        ("session-alice", "Search for the latest policy on remote work expenses"),
        ("session-alice", "What is 1250 * 0.085?"),
        ("session-bob", "Research and analyse how other companies handle AI governance policies"),
    ]

    results = []
    for session_id, task in tasks:
        logger.info("-" * 50)
        response = await platform.process(task=task, session_id=session_id)
        results.append(response)
        logger.info(
            "Response",
            output=response["output"][:120],
            agent=response["agent"],
            duration_ms=response["duration_ms"],
        )

    # Print blackboard summary
    logger.info("=" * 60)
    logger.info("Blackboard summary — results posted by agents:")
    all_entries = await platform.blackboard.list_entries()
    for entry in all_entries.values():
        logger.info(
            f"  [{entry.author}] {entry.key}",
            written_at=entry.written_at.isoformat(),
        )

    # Print performance summary
    logger.info("=" * 60)
    summary = monitor.get_stats()
    logger.info(
        "Performance summary",
        total_requests=summary.get("total_requests", 0),
        avg_latency_ms=summary.get("avg_latency_ms", 0),
    )

    logger.info("Complete workflow example finished")
    metrics.increment("example.complete_workflow.completed")


if __name__ == "__main__":
    asyncio.run(main())

multi_agent_example.py

"""
Multi-Agent Orchestration Example

Demonstrates all five multi-agent orchestration patterns:

1. PipelineOrchestrator              — agents run sequentially; each output feeds the next
2. SupervisorOrchestrator + LLMRouter — supervisor decomposes; LLMRouter assigns agents
3. DebateOrchestrator                — agents argue positions; LLM synthesises conclusions
4. SwarmOrchestrator                 — coordinator LLM routes each round to the best agent
5. SupervisorOrchestrator + RuleBasedRouter — router replaces LLM agent-assignment

Examples 2 and 5 both use SupervisorOrchestrator but with different routers:
  Example 2 — LLMRouter: a separate LLM call semantically assigns each subtask.
  Example 5 — RuleBasedRouter: keyword rules assign subtasks (no LLM cost per subtask).

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer

Usage::

    cd packages/orchestration-layer
    python examples/multi_agent_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.multi_agent.pipeline import PipelineOrchestrator
from gmf_forge_ai_orchestration.multi_agent.supervisor import SupervisorOrchestrator
from gmf_forge_ai_orchestration.multi_agent.debate import DebateOrchestrator
from gmf_forge_ai_orchestration.multi_agent.swarm import SwarmOrchestrator
from gmf_forge_ai_orchestration.routing.rule_router import RuleBasedRouter
from gmf_forge_ai_orchestration.routing.llm_router import LLMRouter
from gmf_forge_ai_orchestration.routing.base import RoutingRequest

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("multi_agent_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


def make_agent(name: str) -> ReActAgent:
    """Create an agent backed by a real LLM gateway."""
    return ReActAgent(
        llm_gateway=create_gateway(),
        agent_id=name,
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
    )


def create_gateway() -> UnifiedLLMGateway:
    """
    Create a real LLM gateway from environment variables.
    Used for orchestrators that need a supervisor/coordinator LLM.
    """
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


# ---------------------------------------------------------------------------
# Example 1: PipelineOrchestrator — sequential ETL pipeline
# ---------------------------------------------------------------------------

# Specialised system prompts keep each stage on-task when it receives the
# prior stage's output as input.  Without these, a generic agent treats the
# previous answer as its new task and the context degrades through the chain.
_ETL_ROLES = {
    "extract_agent": (
        "You are a data extraction specialist. "
        "When given an ETL task, describe the data you would extract: the source "
        "system, schema, and approximate record count. "
        "Produce a concise EXTRACTION SUMMARY as your final answer."
    ),
    "transform_agent": (
        "You are a data transformation specialist. "
        "You receive the extraction summary from the previous pipeline step. "
        "Describe the transformations you applied: cleaning, deduplication, "
        "type casting, enrichment, etc. "
        "Produce a concise TRANSFORMATION SUMMARY as your final answer."
    ),
    "load_agent": (
        "You are a data loading specialist. "
        "You receive the transformation summary from the previous pipeline step. "
        "Describe loading the transformed data into the target data warehouse: "
        "destination, record count loaded, and any errors encountered. "
        "Produce a concise LOAD CONFIRMATION as your final answer."
    ),
    "report_agent": (
        "You are a pipeline reporting specialist. "
        "You receive only the final load confirmation from the pipeline. "
        "Generate a concise ETL PIPELINE REPORT based solely on this load information: "
        "cover the destination, records loaded, any errors, and overall pipeline status. "
        "Do NOT use placeholder text — only report what is evident from the load confirmation."
    ),
}


def make_etl_agent(name: str) -> ReActAgent:
    """Create a pipeline agent with a stage-specific system prompt."""
    return ReActAgent(
        llm_gateway=create_gateway(),
        agent_id=name,
        system_prompt=_ETL_ROLES[name],
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
    )


async def example_pipeline() -> None:
    logger.info("=" * 60)
    logger.info("Example 1: PipelineOrchestrator — sequential pipeline")
    logger.info("=" * 60)

    # Classic ETL pipeline: extract → transform → load → report.
    # Each agent has a role-specific system_prompt so it processes the prior
    # stage's output rather than treating it as a new task from scratch.
    # router=None: PipelineOrchestrator uses fixed sequential order (default).
    pipeline = PipelineOrchestrator(
        agents=[
            make_etl_agent("extract_agent"),
            make_etl_agent("transform_agent"),
            make_etl_agent("load_agent"),
            make_etl_agent("report_agent"),
        ],
        router=None,   # default: fixed sequential order
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    result = await pipeline.run("Run the daily ETL pipeline for the transactions dataset")

    logger.info("Pipeline result", success=result.success, rounds=result.rounds)
    logger.info("Final output", output=result.final_output)
    logger.info("Step outputs:")
    for key, agent_result in result.agent_outputs.items():
        logger.info(f"  {key}", output=agent_result.output[:80])


# ---------------------------------------------------------------------------
# Example 2 & 5: shared agent roles for regulation/fintech domain
# ---------------------------------------------------------------------------

# Generic agents (no system_prompt) ask for clarification when given a subtask
# like "Summarize the key provisions" without knowing which regulations are in
# scope.  Domain-anchored system prompts prevent these context-free responses.
_REGULATION_ROLES = {
    "research_agent": (
        "You are a research specialist focused on AI regulation, fintech, and related policy. "
        "Provide authoritative, knowledge-based findings on any AI or fintech regulatory topic. "
        "Never ask for clarification — always deliver substantive content from your knowledge."
    ),
    "legal_agent": (
        "You are a legal analyst specialising in AI regulation, compliance law, and fintech. "
        "Analyse legal requirements and implications using your expert knowledge. "
        "Never ask for clarification — always provide your best expert legal analysis."
    ),
    "summary_agent": (
        "You are a synthesis specialist in AI regulation and fintech compliance. "
        "Synthesise findings into concise, actionable summaries. "
        "Never ask for clarification — always produce a comprehensive summary from your knowledge."
    ),
}


def make_regulation_agent(name: str) -> ReActAgent:
    """Agent with a domain system_prompt that prevents context-free clarification requests."""
    return ReActAgent(
        llm_gateway=create_gateway(),
        agent_id=name,
        system_prompt=_REGULATION_ROLES[name],
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
    )


# ---------------------------------------------------------------------------
# Example 2: SupervisorOrchestrator — task decomposition
# ---------------------------------------------------------------------------

async def example_supervisor() -> None:
    logger.info("=" * 60)
    logger.info("Example 2: SupervisorOrchestrator + LLMRouter")
    logger.info("=" * 60)

    # SupervisorOrchestrator with an LLMRouter injected:
    #   Phase 1 — The supervisor LLM *only* decomposes the task into subtask strings.
    #   Phase 2 — The LLMRouter asks a separate LLM call to pick the best agent for
    #             each subtask, using the agent_descriptions as semantic guidance.
    #   Phase 3 — Synthesis LLM merges all worker results as normal.
    #
    # Contrast with Example 5 which uses a RuleBasedRouter (no LLM cost per subtask).
    # Phase-based descriptions: differentiate on WHAT THE AGENT DOES (gather / analyse /
    # synthesise) rather than domain knowledge.  Domain-based descriptions fail in a
    # regulatory topic because the LLM's own knowledge maps everything to 'legal'.
    _AGENT_DESCRIPTIONS = {
        "research_agent": (
            "Phase 1 — Information gathering only. Use for any subtask that collects, "
            "identifies, or retrieves raw information (what regulations exist, what they say, "
            "who they affect). Does NOT interpret or analyse."
        ),
        "legal_agent": (
            "Phase 2 — Analysis and interpretation only. Use after information has been gathered. "
            "Interprets requirements, assesses compliance obligations, evaluates risk and impact. "
            "Does NOT gather raw information."
        ),
        "summary_agent": (
            "Phase 3 — Synthesis and reporting only. Use for the final subtask that compiles "
            "completed research and analysis into an executive summary or recommendation."
        ),
    }

    router = LLMRouter(
        llm_gateway=create_gateway(),
        agent_descriptions=_AGENT_DESCRIPTIONS,
        fallback_target="research_agent",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    supervisor = SupervisorOrchestrator(
        supervisor_gateway=create_gateway(),
        agents={
            "research_agent": make_regulation_agent("research_agent"),
            "legal_agent": make_regulation_agent("legal_agent"),
            "summary_agent": make_regulation_agent("summary_agent"),
        },
        router=router,   # LLMRouter assigns agents; supervisor LLM only decomposes
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    # Context is forwarded verbatim to every sub-agent via agent.execute().
    # user_assertion — end-user Bearer token from the HTTP Authorization header;
    #   task agents with OBOTokenBehavior exchange it for a downstream OBO token.
    # Add any extra keys your task agents need (locale, tenant_id, etc.).
    result = await supervisor.run(
        "Analyse the latest AI regulations and their impact on our fintech operations",
        context={
            "user_assertion": os.getenv("USER_ASSERTION_TOKEN", ""),
            "correlation_id": "example-supervisor-llm-router",
        },
    )

    logger.info("Supervisor+LLMRouter result", success=result.success, rounds=result.rounds)
    logger.info("Final synthesis", output=result.final_output[:200])
    logger.info("Worker outputs:")
    for agent_name, agent_result in result.agent_outputs.items():
        logger.info(f"  {agent_name}", output=agent_result.output[:80])


# ---------------------------------------------------------------------------
# Example 3: DebateOrchestrator — pros/cons deliberation
# ---------------------------------------------------------------------------

async def example_debate() -> None:
    logger.info("=" * 60)
    logger.info("Example 3: DebateOrchestrator — multi-agent debate")
    logger.info("=" * 60)

    _POSITIONS = {
        "microservices_advocate": (
            "You are a software architect who strongly advocates for microservices. "
            "Always argue in favour of microservices and counter any arguments for monoliths."
        ),
        "monolith_advocate": (
            "You are a software architect who strongly advocates for monolithic architecture. "
            "Always argue in favour of a monolith and counter any arguments for microservices."
        ),
    }

    def make_debate_agent(name: str) -> ReActAgent:
        """Agent backed by a real LLM that argues its assigned position."""
        return ReActAgent(
            llm_gateway=create_gateway(),
            agent_id=name,
            system_prompt=_POSITIONS[name],
            logger=logger,
            metrics=metrics,
            performance_monitor=monitor,
        )

    debate = DebateOrchestrator(
        agents=[
            make_debate_agent("microservices_advocate"),
            make_debate_agent("monolith_advocate"),
        ],
        synthesis_gateway=create_gateway(),
        debate_rounds=1,
        router=None,   # default: all agents participate every round
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    # context flows to every agent participating in each debate round.
    result = await debate.run(
        "Should we adopt microservices or a monolith for our new platform?",
        context={
            "user_assertion": os.getenv("USER_ASSERTION_TOKEN", ""),
            "correlation_id": "example-debate",
        },
    )

    logger.info("Debate result", success=result.success, rounds=result.rounds)
    logger.info("Final synthesis", output=result.final_output[:300])


# ---------------------------------------------------------------------------
# Example 4: SwarmOrchestrator — dynamic routing per round
# ---------------------------------------------------------------------------

# Specialised prompts for swarm agents: they must deliver content, not plans.
# Without these, agents produce "The next steps are…" descriptions, which the
# coordinator interprets as "not done" and the swarm loops to max_rounds.
_SWARM_ROLES = {
    "search_agent": (
        "You are a research analyst specialising in AI and technology. "
        "Provide substantive, knowledge-based findings. "
        "Do NOT describe steps to take — deliver actual facts and findings now."
    ),
    "analysis_agent": (
        "You are a strategic analysis specialist. "
        "Given research findings, provide deep analytical insights, implications, "
        "and conclusions. "
        "Do NOT describe steps to take — deliver the analysis now."
    ),
}


async def example_swarm() -> None:
    logger.info("=" * 60)
    logger.info("Example 4: SwarmOrchestrator — coordinator-driven swarm")
    logger.info("=" * 60)

    # Coordinator LLM decides which agent to call each round based on its output.
    # The coordinator now uses a structured format:
    #   AGENT: <agent_name>
    #   TASK: <specific task>
    # so the router can match the explicit agent name rather than doing fuzzy
    # keyword matching on free-form planning text (which always mixes both phases).
    router = RuleBasedRouter(
        rules=[
            (
                lambda r: "search_agent" in r.input.lower(),
                "search_agent",
            ),
            (
                lambda r: "analysis_agent" in r.input.lower(),
                "analysis_agent",
            ),
        ],
        fallback_target="analysis_agent",
        logger=logger,
    )

    swarm = SwarmOrchestrator(
        coordinator_gateway=create_gateway(),
        agents={
            "search_agent": ReActAgent(
                llm_gateway=create_gateway(),
                agent_id="search_agent",
                system_prompt=_SWARM_ROLES["search_agent"],
                logger=logger,
                metrics=metrics,
                performance_monitor=monitor,
            ),
            "analysis_agent": ReActAgent(
                llm_gateway=create_gateway(),
                agent_id="analysis_agent",
                system_prompt=_SWARM_ROLES["analysis_agent"],
                logger=logger,
                metrics=metrics,
                performance_monitor=monitor,
            ),
        },
        router=router,   # RuleBasedRouter dispatches each swarm round
        max_rounds=5,
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    # context is forwarded to the agent selected each swarm round.
    result = await swarm.run(
        "Research and analyse the impact of GenAI on software engineering roles.",
        context={
            "user_assertion": os.getenv("USER_ASSERTION_TOKEN", ""),
            "correlation_id": "example-swarm",
        },
    )

    logger.info("Swarm result", success=result.success, rounds=result.rounds)
    logger.info("Final output", output=result.final_output[:200])


# ---------------------------------------------------------------------------
# Example 5: SupervisorOrchestrator + RuleBasedRouter
# ---------------------------------------------------------------------------

async def example_supervisor_with_router() -> None:
    logger.info("=" * 60)
    logger.info("Example 5: SupervisorOrchestrator + RuleBasedRouter")
    logger.info("=" * 60)

    # When a router is injected into SupervisorOrchestrator:
    #   Phase 1 — The LLM *only* decomposes the task into subtask strings.
    #   Phase 2 — The RuleBasedRouter assigns each subtask to an agent
    #             (no LLM cost for agent selection).
    #   Phase 3 — Synthesis LLM merges results as normal.
    #
    # Swap RuleBasedRouter for LLMRouter or SemanticRouter without changing
    # any other orchestrator code.

    router = RuleBasedRouter(
        rules=[
            (
                lambda r: any(
                    w in r.input.lower()
                    for w in ["research", "find", "search", "latest", "identify",
                               "gather", "investigate", "review", "timeline", "compile"]
                ),
                "research_agent",
            ),
            (
                lambda r: any(
                    w in r.input.lower()
                    for w in ["legal", "regulation", "compliance", "law", "policy",
                               "enforcement", "penalty", "liability", "challenge",
                               "obligation", "analyse", "analyze", "implications",
                               "assess", "evaluate", "examine", "privacy", "transparency"]
                ),
                "legal_agent",
            ),
        ],
        fallback_target="summary_agent",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    supervisor = SupervisorOrchestrator(
        supervisor_gateway=create_gateway(),
        agents={
            "research_agent": make_regulation_agent("research_agent"),
            "legal_agent": make_regulation_agent("legal_agent"),
            "summary_agent": make_regulation_agent("summary_agent"),
        },
        router=router,   # RuleBasedRouter assigns agents; LLM only decomposes
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    # context flows through to every task agent assigned by the router.
    result = await supervisor.run(
        "Research the latest EU AI Act requirements and analyse their legal implications for our products",
        context={
            "user_assertion": os.getenv("USER_ASSERTION_TOKEN", ""),
            "correlation_id": "example-supervisor-rule-router",
        },
    )

    logger.info("Supervisor+Router result", success=result.success, rounds=result.rounds)
    logger.info("Final synthesis", output=result.final_output[:200])
    logger.info("Worker outputs:")
    for agent_name, agent_result in result.agent_outputs.items():
        logger.info(f"  {agent_name}", output=agent_result.output[:80])


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Multi-Agent Orchestration Example starting")
    metrics.increment("example.multi_agent.started")

    await example_pipeline()
    await example_supervisor()
    await example_debate()
    await example_swarm()
    await example_supervisor_with_router()

    logger.info("Multi-agent examples complete")
    metrics.increment("example.multi_agent.completed")


if __name__ == "__main__":
    asyncio.run(main())

plan_execute_example.py

"""
Plan-Execute Agent Example

Demonstrates PlanExecuteAgent which separates planning from execution:
  Phase 1 — Plan:    LLM decomposes the task into ordered steps (JSON array)
  Phase 2 — Execute: Each step is run sequentially; output feeds the next

Also shows ReflexionAgent which wraps any agent with a self-critique loop:
  Execute → Critique (is the answer good enough?) → Reflect & retry → Final

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer

Usage::

    cd packages/orchestration-layer
    python examples/plan_execute_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.registry import ToolRegistry
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.plan_execute_agent import PlanExecuteAgent
from gmf_forge_ai_orchestration.agents.reflexion_agent import ReflexionAgent
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("plan_execute_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


def create_gateway() -> UnifiedLLMGateway:
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


def register_research_tools(registry: ToolRegistry) -> None:
    """Register tools useful for research and planning tasks."""

    async def search_papers(topic: str) -> str:
        logger.info("Tool called", tool="search_papers", topic=topic)
        return (
            f"Found 3 papers on '{topic}': "
            "1) 'Advances in {topic} 2025', "
            "2) 'Benchmarking {topic} systems', "
            "3) 'Future directions in {topic}'."
        ).format(topic=topic)

    async def summarise_paper(title: str) -> str:
        logger.info("Tool called", tool="summarise_paper", title=title)
        return (
            f"Summary of '{title}': This paper presents novel approaches to the topic, "
            "demonstrating significant improvements over prior baselines with strong empirical results."
        )

    async def write_report_section(section: str, content: str) -> str:
        logger.info("Tool called", tool="write_report_section", section=section)
        return f"[Section: {section}]\n{content}\n"

    registry.register(
        "search_papers",
        search_papers,
        "Search for research papers on a topic. Input: topic string.",
    )
    registry.register(
        "summarise_paper",
        summarise_paper,
        "Summarise a research paper given its title. Input: title string.",
    )
    registry.register(
        "write_report_section",
        write_report_section,
        "Write a named section of a report. Inputs: section name, content.",
    )
    logger.info("Research tools registered", count=3)


# ---------------------------------------------------------------------------
# Example 1: PlanExecuteAgent — multi-step research task
# ---------------------------------------------------------------------------

async def example_plan_execute(
    gateway: UnifiedLLMGateway,
    tool_registry: ToolRegistry,
    checkpoint_manager: CheckpointManager,
) -> None:
    logger.info("=" * 60)
    logger.info("Example 1: PlanExecuteAgent — research report task")
    logger.info("=" * 60)

    agent = PlanExecuteAgent(
        llm_gateway=gateway,
        tool_registry=tool_registry,
        state_store=InMemoryStateStore(),
        checkpoint_manager=checkpoint_manager,
        agent_id="plan-execute-demo",
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
    )

    result = await agent.execute(
        "Write a brief research report on large language models: "
        "search for papers, summarise the top result, and write an introduction section."
    )

    logger.info("Plan-Execute result", success=result.success, output=result.output[:200])

    # The plan is stored in metadata
    plan = result.metadata.get("plan", [])
    logger.info("Plan generated", steps=len(plan))
    for i, step in enumerate(plan):
        logger.info(f"  Plan step {i+1}", step=step)

    execution_id = result.metadata.get("execution_id")
    if execution_id and agent.checkpoint_manager:
        resumed = await agent.resume_from(str(execution_id))
        logger.info(
            "Resumed PlanExecute from checkpoint",
            execution_id=execution_id,
            resumed_output=resumed.output,
        )


# ---------------------------------------------------------------------------
# Example 2: ReflexionAgent wrapping PlanExecuteAgent
# ---------------------------------------------------------------------------

async def example_reflexion(
    gateway: UnifiedLLMGateway,
    tool_registry: ToolRegistry,
    checkpoint_manager: CheckpointManager,
) -> None:
    logger.info("=" * 60)
    logger.info("Example 2: ReflexionAgent — self-critiquing wrapper")
    logger.info("=" * 60)

    inner_agent = PlanExecuteAgent(
        llm_gateway=gateway,
        tool_registry=tool_registry,
        checkpoint_manager=checkpoint_manager,
        agent_id="inner-plan-execute",
        logger=logger,
        metrics=metrics,
    )

    # ReflexionAgent will run inner_agent, critique the output,
    # and retry up to max_reflections times if unsatisfied.
    reflexion_agent = ReflexionAgent(
        inner_agent=inner_agent,
        max_reflections=2,
        checkpoint_manager=checkpoint_manager,
        agent_id="reflexion-demo",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    result = await reflexion_agent.execute(
        "Explain the difference between RAG and fine-tuning for LLMs in 2-3 sentences."
    )

    logger.info(
        "Reflexion result",
        success=result.success,
        output=result.output,
        reflections=result.metadata.get("reflections_used", 0),
    )


# ---------------------------------------------------------------------------
# Example 3: PlanExecuteAgent — custom plan and execute prompts
# ---------------------------------------------------------------------------

async def example_custom_prompts(
    gateway: UnifiedLLMGateway,
    checkpoint_manager: CheckpointManager,
) -> None:
    logger.info("=" * 60)
    logger.info("Example 3: PlanExecuteAgent — custom plan_prompt and execute_prompt")
    logger.info("=" * 60)

    # Developers can inspect the library defaults before customising:
    #   print(PlanExecuteAgent.DEFAULT_PLAN_PROMPT)
    #   print(PlanExecuteAgent.DEFAULT_EXECUTE_PROMPT)
    #
    # Here we supply domain-specific overrides for an IT incident workflow.
    # {task} is required in plan_prompt; execute_prompt requires {task},
    # {plan}, {previous_results}, {step_num}, and {current_step}.
    custom_plan_prompt = """\
You are an IT incident response coordinator.
Given the incident description below, produce a numbered remediation plan.
Each step must be a single concrete action that an engineer can execute immediately.

Incident: {task}

Respond with ONLY a JSON array of step strings:
["step 1", "step 2", ...]
"""

    custom_execute_prompt = """\
You are executing remediation step {step_num} of an IT incident response.

Incident: {task}
Full remediation plan:
{plan}
Completed steps so far:
{previous_results}

Now execute: {current_step}

Provide the exact commands or actions taken and their outcome. Be concise.
"""

    agent = PlanExecuteAgent(
        llm_gateway=gateway,
        agent_id="plan-execute-custom-demo",
        plan_prompt=custom_plan_prompt,       # <-- developer override
        execute_prompt=custom_execute_prompt, # <-- developer override
        checkpoint_manager=checkpoint_manager,
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
    )

    result = await agent.execute(
        "Web server returning 502 errors — upstream service unreachable since 14:30 UTC."
    )

    logger.info("Incident response result", success=result.success, output=result.output[:300])
    plan = result.metadata.get("plan", [])
    logger.info("Remediation plan generated", steps=len(plan))
    for i, step in enumerate(plan):
        logger.info(f"  Step {i+1}", step=step)


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Plan-Execute / Reflexion Agent Example starting")
    metrics.increment("example.plan_execute.started")

    gateway = create_gateway()
    tool_registry = ToolRegistry()
    register_research_tools(tool_registry)
    checkpoint_manager = CheckpointManager(InMemoryStateStore())

    # Inspect library default prompts at any time without instantiating:
    logger.info(
        "Library default plan prompt (truncated)",
        preview=PlanExecuteAgent.DEFAULT_PLAN_PROMPT[:120].replace("\n", " "),
    )

    # Examples 1-2 use the library defaults — no prompt args needed.
    await example_plan_execute(gateway, tool_registry, checkpoint_manager)
    await example_reflexion(gateway, tool_registry, checkpoint_manager)

    # Example 3 shows a developer overriding both prompts for a specific domain.
    await example_custom_prompts(gateway, checkpoint_manager)

    logger.info("Example complete")
    metrics.increment("example.plan_execute.completed")


if __name__ == "__main__":
    asyncio.run(main())

react_agent_example.py

"""
ReAct Agent Example

Demonstrates how to use ReActAgent with:
1. Tool registration (search, calculator, weather tools)
2. Behavior composition (retry + guardrail)
3. State persistence across runs
4. Full observability (logging, metrics, tracing)
5. Custom system_prompt — replacing the library default with a developer-defined persona

The ReAct loop:
  Thought -> Action (tool call or Final Answer) -> Observation -> repeat

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer
    pip install -e packages/shared-core

Usage::

    cd packages/orchestration-layer
    python examples/react_agent_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# If you get "SSL: CERTIFICATE_VERIFY_FAILED" errors, uncomment:
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.registry import ToolRegistry
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.behaviors.retry import RetryBehavior
from gmf_forge_ai_orchestration.behaviors.guardrail import GuardrailBehavior, GuardrailRule
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("react_agent_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


# ---------------------------------------------------------------------------
# Tool definitions
# ---------------------------------------------------------------------------

def register_tools(registry: ToolRegistry) -> None:
    """Register tools the ReAct agent can call."""

    async def search(query: str = "", raw: str = "", **kwargs) -> str:
        """Simulate a web search."""
        if not query and raw:
            query = raw.strip('"\'')
        logger.info("Tool called", tool="search", query=query)
        results = {
            "AI 2026 trends": "Key AI trends in 2026: multimodal agents, autonomous coding, edge inference.",
            "Python async": "Python async/await enables non-blocking I/O with asyncio event loop.",
        }
        for key, value in results.items():
            if key.lower() in query.lower():
                return value
        return f"Search results for '{query}': No cached results. In production this calls a real search API."

    async def calculator(expression: str = "", raw: str = "", **kwargs) -> str:
        """Evaluate a simple arithmetic expression safely."""
        if not expression and raw:
            expression = raw.strip('"\'')
        logger.info("Tool called", tool="calculator", expression=expression)
        try:
            allowed = set("0123456789+-*/(). ")
            if not all(c in allowed for c in expression):
                return "Error: only basic arithmetic is supported."
            result = eval(expression, {"__builtins__": {}})  # nosec - restricted eval
            return str(result)
        except Exception as exc:
            return f"Error: {exc}"

    async def get_weather(city: str = "", raw: str = "", **kwargs) -> str:
        """Return simulated weather for a city."""
        if not city and raw:
            city = raw.strip('"\'')
        logger.info("Tool called", tool="get_weather", city=city)
        return f"Weather in {city}: 22°C, partly cloudy. (simulated)"

    registry.register(
        "search",
        search,
        "Search the web for information. Input: query string.",
    )
    registry.register(
        "calculator",
        calculator,
        "Evaluate arithmetic expressions. Input: expression string e.g. '2 + 3 * 4'.",
    )
    registry.register(
        "get_weather",
        get_weather,
        "Get current weather for a city. Input: city name.",
    )
    logger.info("Tools registered", count=3)


# ---------------------------------------------------------------------------
# Gateway setup
# ---------------------------------------------------------------------------

def create_gateway() -> UnifiedLLMGateway:
    """Create the LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)
        logger.debug("Using corporate SSL certificate")

    provider = AzureOpenAIProvider(**provider_kwargs)
    gateway = UnifiedLLMGateway(
        default_provider=provider,
        performance_monitor=monitor,
    )
    logger.info("LLM gateway created", deployment=deployment)
    return gateway


# ---------------------------------------------------------------------------
# Example 1: Simple single-step task (no tools needed)
# ---------------------------------------------------------------------------

async def example_simple_task(agent: ReActAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 1: Simple task (no tool call)")
    logger.info("=" * 60)

    result = await agent.execute("What is the capital of France?")

    logger.info("Agent result", success=result.success, output=result.output)
    logger.info("Steps taken", count=len(result.steps))
    for i, step in enumerate(result.steps):
        logger.info(f"  Step {i+1}", thought=step.thought, action=step.action)

    execution_id = result.metadata.get("execution_id")
    if execution_id and agent.checkpoint_manager:
        resumed = await agent.resume_from(str(execution_id))
        logger.info(
            "Resumed from checkpoint",
            execution_id=execution_id,
            resumed_output=resumed.output,
        )


# ---------------------------------------------------------------------------
# Example 2: Multi-step task with tool calls
# ---------------------------------------------------------------------------

async def example_tool_use(agent: ReActAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 2: Multi-step task using tools")
    logger.info("=" * 60)

    result = await agent.execute(
        "Search for AI trends in 2026, then calculate 42 * 7, "
        "and finally tell me the weather in London."
    )

    logger.info("Agent result", success=result.success, output=result.output)
    logger.info("Steps taken", count=len(result.steps))
    for i, step in enumerate(result.steps):
        logger.info(
            f"  Step {i+1}",
            thought=step.thought[:80],
            action=step.action,
            observation=(step.observation or "")[:80],
        )


# ---------------------------------------------------------------------------
# Example 3: State persistence — resume context across calls
# ---------------------------------------------------------------------------

async def example_state_persistence(agent: ReActAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 3: State persistence across calls")
    logger.info("=" * 60)

    # First call
    result1 = await agent.execute("What is 15 * 8?", context={"session_id": "demo-session"})
    logger.info("First call complete", output=result1.output)

    # Second call — explicitly reference the prior result so the LLM can act on it
    result2 = await agent.execute(
        f"Now add 20 to {result1.output}.",
        context={"session_id": "demo-session"},
    )
    logger.info("Second call complete", output=result2.output)


# ---------------------------------------------------------------------------
# Example 4: Streaming output
# ---------------------------------------------------------------------------

async def example_streaming(agent: ReActAgent) -> None:
    logger.info("=" * 60)
    logger.info("Example 4: Streaming agent output")
    logger.info("=" * 60)

    print("Streaming tokens: ", end="", flush=True)
    async for token in agent.stream_execute("Explain what a ReAct agent is in one sentence."):
        print(token, end="", flush=True)
    print()  # newline after streaming
    logger.info("Streaming complete")


# Example 5: Custom system_prompt — developer-defined persona
# ---------------------------------------------------------------------------

async def example_custom_system_prompt(
    gateway,
    tool_registry,
    checkpoint_manager: CheckpointManager,
) -> None:
    logger.info("=" * 60)
    logger.info("Example 5: ReActAgent — custom system_prompt")
    logger.info("=" * 60)

    # Inspect the library default before customising:
    #   print(ReActAgent.DEFAULT_SYSTEM_PROMPT)
    #
    # The default prompt uses {tool_descriptions} — keep that placeholder in
    # your override if you want tool listings injected automatically.
    # The task is always appended separately, so no {task} placeholder is needed.
    policy_analyst_prompt = """\
You are an expert insurance policy analyst.
You review policy documents, coverage terms, and claims data.
Always reason through the policy language carefully before reaching conclusions.

You have access to the following tools:
{tool_descriptions}

Respond using EXACTLY this format for each step:
Thought: <your policy reasoning>
Action: <tool name or "Final Answer">
Action Input: <JSON object with tool arguments, or your final answer string>

When you have enough information, use:
Action: Final Answer
Action Input: {{"answer": "<your complete policy analysis>"}}
"""

    agent = ReActAgent(
        llm_gateway=gateway,
        tool_registry=tool_registry,
        agent_id="react-policy-analyst",
        system_prompt=policy_analyst_prompt,   # <-- developer override
        checkpoint_manager=checkpoint_manager,
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
        temperature=0.0,
    )

    result = await agent.execute(
        "What is the current weather in New York and does a standard home insurance "
        "policy typically cover weather-related roof damage?"
    )
    logger.info("Policy analyst result", success=result.success, output=result.output)


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("ReAct Agent Example starting")
    metrics.increment("example.react_agent.started")

    # Set up components
    gateway = create_gateway()

    tool_registry = ToolRegistry()
    register_tools(tool_registry)

    state_store = InMemoryStateStore()
    checkpoint_manager = CheckpointManager(state_store)

    # Behaviors: retry up to 2 times + block profanity in input/output
    behaviors = [
        RetryBehavior(max_retries=2, base_delay=0.5),
        GuardrailBehavior(rules=[
            GuardrailRule(
                name="no_profanity",
                blocked_words=["example_blocked_word"],
                apply_to="both",
            ),
            GuardrailRule(name="max_input_length", max_length=2000, apply_to="input"),
        ]),
    ]

    # Inspect the library default prompt at any time without instantiating:
    logger.info(
        "Library default system prompt (truncated)",
        preview=ReActAgent.DEFAULT_SYSTEM_PROMPT[:120].replace("\n", " "),
    )

    # Create agent with full observability (Examples 1-4 use library default prompt)
    agent = ReActAgent(
        llm_gateway=gateway,
        tool_registry=tool_registry,
        behaviors=behaviors,
        state_store=state_store,
        checkpoint_manager=checkpoint_manager,
        agent_id="react-demo-agent",
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
        max_steps=8,
        temperature=0.0,
    )

    logger.info("ReActAgent created", agent_id="react-demo-agent", behaviors=2, tools=3)

    # Run all examples
    await example_simple_task(agent)
    await example_tool_use(agent)
    await example_state_persistence(agent)
    await example_streaming(agent)

    # Example 5: developer overrides the system prompt with a custom persona
    await example_custom_system_prompt(gateway, tool_registry, checkpoint_manager)

    # Print metrics summary
    logger.info("=" * 60)
    logger.info("Example complete")
    metrics.increment("example.react_agent.completed")
    logger.info(
        "Performance summary",
        total_requests=monitor.get_stats().get("total_requests", 0),
    )


if __name__ == "__main__":
    asyncio.run(main())

reflexion_agent_example.py

"""
ReflexionAgent Example
======================

Demonstrates ReflexionAgent — a self-critiquing wrapper that runs any inner
agent, asks a separate LLM call to evaluate the output ("is this good enough?"),
and automatically retries with an improved prompt if not satisfied.

Architecture::

    ReflexionAgent.execute(task)

        ├─── inner_agent.execute(task)        ← first attempt

        ├─── _critique(task, output)          ← separate LLM call: YES / NO: <reason>
        │         │
        │         └─ if NO ──► _reflect_and_retry(task, output, critique)
        │                           │
        │                           └─ inner_agent.execute(improved_task)

        └─── repeat up to max_reflections times, then return best result

Key design points
-----------------
- ReflexionAgent is a **wrapper**: it does not execute tasks itself — it
  delegates to whatever inner_agent you give it (ReActAgent, PlanExecuteAgent,
  ChainOfThoughtAgent, or another ReflexionAgent).
- The critique call is always a **separate LLM call** on the same gateway —
  keep this in mind for cost accounting.
- The inner agent retains its own identity, behaviors, and checkpoints.  Only
  the reflexion metadata (round, critique) is saved on the outer agent.

When to use ReflexionAgent
--------------------------
- You need a quality gate on LLM output without writing custom evaluation code.
- The task is complex or open-ended enough that the first attempt is often
  insufficient (report writing, policy analysis, code generation reviews).

When NOT to use ReflexionAgent
-------------------------------
- Real-time or latency-sensitive paths — each reflection adds at least one
  extra LLM round-trip.
- Simple factual lookups — the extra critiquing overhead is not worth it.

Examples in this file
---------------------
1. Wrap ChainOfThoughtAgent (pure reasoning, no tools)
2. Wrap ReActAgent (tool-using agent, critique evaluates factual accuracy)
3. Custom critique + reflection prompts for domain-specific quality gates
4. max_reflections tuning — stop early if satisfied, cap total cost
5. Checkpoint and resume (pick up from a specific reflection round)
6. Access reflection metadata: round count, critique text, inner-agent steps

Prerequisites::

    pip install python-dotenv
    pip install -e packages/orchestration-layer
    pip install -e packages/shared-core

Usage::

    cd packages/orchestration-layer
    python examples/reflexion_agent_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# If you get "SSL: CERTIFICATE_VERIFY_FAILED" errors, uncomment:
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
from pathlib import Path

from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.registry import ToolRegistry
from gmf_forge_ai_shared_core.observability import (
    BasicLogger,
    BasicMetricsCollector,
    BasicPerformanceMonitor,
)
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.chain_of_thought_agent import ChainOfThoughtAgent
from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.agents.reflexion_agent import ReflexionAgent
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("reflexion_agent_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


# ---------------------------------------------------------------------------
# Gateway / infrastructure helpers
# ---------------------------------------------------------------------------

def create_gateway() -> UnifiedLLMGateway:
    """Build the LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)
        logger.debug("Using corporate SSL certificate")

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


def create_checkpoint_manager(store: InMemoryStateStore) -> CheckpointManager:
    return CheckpointManager(store, default_ttl=3600)


def register_tools(registry: ToolRegistry) -> None:
    """Register tools the inner ReActAgent can use when wrapped by ReflexionAgent."""

    async def search(query: str = "", raw: str = "", **kwargs) -> str:
        """Simulate a document search."""
        if not query and raw:
            query = raw.strip("\"'")
        logger.info("Tool called", tool="search", query=query)
        return (
            f"Search results for '{query}': "
            "Found 3 relevant documents covering the topic in depth."
        )

    async def get_regulation(regulation_id: str = "", raw: str = "", **kwargs) -> str:
        """Fetch a regulation text by ID."""
        if not regulation_id and raw:
            regulation_id = raw.strip("\"'")
        logger.info("Tool called", tool="get_regulation", regulation_id=regulation_id)
        return (
            f"Regulation {regulation_id}: Applies to all financial institutions. "
            "Requires annual compliance certification and board-level sign-off. "
            "Non-compliance penalty: up to $5M or 10% of annual revenue."
        )

    registry.register(
        "search",
        search,
        "Search for documents or information. Input: query string.",
    )
    registry.register(
        "get_regulation",
        get_regulation,
        "Retrieve full text of a regulation by its ID. Input: regulation_id string.",
    )
    logger.info("Tools registered", count=2)


# ---------------------------------------------------------------------------
# Example 1: ReflexionAgent wrapping ChainOfThoughtAgent
#             — pure reasoning, no tools
# ---------------------------------------------------------------------------

async def example_reflexion_on_cot(
    gateway: UnifiedLLMGateway,
    checkpoint_manager: CheckpointManager,
) -> None:
    """
    ReflexionAgent + ChainOfThoughtAgent.

    Use case: the task is a structured reasoning problem.  ChainOfThoughtAgent
    produces a first draft; ReflexionAgent critiques it and requests a more
    thorough answer if the reasoning is incomplete.

    The inner ChainOfThoughtAgent costs one LLM call per attempt.
    The critique costs one additional LLM call per round.
    Total cost (max): (1 + 1) × (max_reflections + 1) LLM calls.
    """
    logger.info("=" * 60)
    logger.info("Example 1: ReflexionAgent wrapping ChainOfThoughtAgent")
    logger.info("=" * 60)

    # Step 1 — Build the inner agent.
    #   ChainOfThoughtAgent uses a <thinking> scratchpad — ideal for analysis tasks.
    inner_cot = ChainOfThoughtAgent(
        llm_gateway=gateway,
        agent_id="inner-cot",
        temperature=0.1,
        logger=logger,
        metrics=metrics,
    )

    # Step 2 — Wrap it with ReflexionAgent.
    #   max_reflections=2 means: initial attempt + up to 2 reflection retries.
    #   After 2 unsatisfactory critiques the best available answer is returned.
    reflexion = ReflexionAgent(
        inner_cot,                         # the agent to wrap
        checkpoint_manager=checkpoint_manager,
        max_reflections=2,
        agent_id="reflexion-cot-demo",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    task = (
        "A company is considering adopting a generative AI system for contract review. "
        "Analyse the key risks (legal, operational, reputational) and provide a structured "
        "risk assessment with recommended mitigations for each risk category."
    )

    # Step 3 — Execute the task.
    #   ReflexionAgent:
    #   a) calls inner_cot.execute(task)
    #   b) calls _critique(task, output) — LLM responds YES / NO: <reason>
    #   c) if NO: calls _reflect_and_retry(task, output, critique)
    #   d) repeats up to max_reflections times
    result = await reflexion.execute(task)

    logger.info(
        "Reflexion result",
        success=result.success,
        output_length=len(result.output),
    )

    # The number of actual reflection rounds is in metadata.
    # 0 means the first attempt was accepted; 2 means two retries were needed.
    reflection_rounds = result.metadata.get("reflection_rounds", 0)
    logger.info("Reflection rounds used", rounds=reflection_rounds)
    print(f"\nFinal answer ({reflection_rounds} reflection(s) used):\n{result.output}\n")


# ---------------------------------------------------------------------------
# Example 2: ReflexionAgent wrapping ReActAgent (tool-using)
#             — factual tasks with live tool calls
# ---------------------------------------------------------------------------

async def example_reflexion_on_react(
    gateway: UnifiedLLMGateway,
    tool_registry: ToolRegistry,
    checkpoint_manager: CheckpointManager,
) -> None:
    """
    ReflexionAgent + ReActAgent with tools.

    Use case: a compliance research task where the answer must cite specific
    regulations. The default critique checks completeness; if the answer does
    not reference enough sources, the reflection prompt asks for a more cited answer.

    The inner ReActAgent may make multiple tool calls per attempt.
    Each reflection cycle adds one LLM critique call plus a full new ReActAgent run.
    """
    logger.info("=" * 60)
    logger.info("Example 2: ReflexionAgent wrapping ReActAgent (with tools)")
    logger.info("=" * 60)

    inner_react = ReActAgent(
        llm_gateway=gateway,
        tool_registry=tool_registry,
        agent_id="inner-react",
        max_steps=6,
        temperature=0.0,
        logger=logger,
        metrics=metrics,
    )

    reflexion = ReflexionAgent(
        inner_react,
        checkpoint_manager=checkpoint_manager,
        max_reflections=2,
        agent_id="reflexion-react-demo",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    task = (
        "Research GDPR Article 17 (right to erasure) and Basel III capital requirements. "
        "Summarise each regulation and explain any cross-domain compliance implications "
        "for a fintech company operating in the EU."
    )

    result = await reflexion.execute(task)

    logger.info("Result", success=result.success)
    logger.info(
        "Inner agent steps across all attempts",
        step_count=len(result.steps),
    )

    # The steps list contains ALL steps from ALL inner-agent attempts combined.
    for i, step in enumerate(result.steps):
        logger.info(
            f"  Step {i+1}",
            thought=step.thought[:80] if step.thought else "",
            action=step.action,
        )

    print(f"\nFinal answer:\n{result.output[:600]}\n")

    # Checkpoint: the ReflexionAgent saves one checkpoint per reflection round.
    # Use the execution_id to resume from any saved reflection round.
    execution_id = result.metadata.get("execution_id")
    if execution_id:
        logger.info("Checkpoint saved", execution_id=execution_id)
        logger.info(
            "To resume: await reflexion.resume_from(execution_id)"
            " or resume_from(execution_id, from_reflection=1)"
        )


# ---------------------------------------------------------------------------
# Example 3: Custom critique + reflection prompts
#             — domain-specific quality gate
# ---------------------------------------------------------------------------

async def example_custom_prompts(
    gateway: UnifiedLLMGateway,
    checkpoint_manager: CheckpointManager,
) -> None:
    """
    Custom critique and reflection prompts.

    The default critique prompt checks general quality.  In production you
    almost always want a domain-specific critique — e.g. checking that a
    compliance answer cites the correct regulation IDs, that a code review
    mentions security issues, or that a financial analysis includes numbers.

    Print the library defaults first to use them as a starting point:
        print(ReflexionAgent.DEFAULT_CRITIQUE_PROMPT)
        print(ReflexionAgent.DEFAULT_REFLECT_PROMPT)
    """
    logger.info("=" * 60)
    logger.info("Example 3: Custom critique + reflection prompts")
    logger.info("=" * 60)

    # Inspect the library defaults before overriding — good practice.
    logger.info("Default critique prompt (first 100 chars)",
                prompt=ReflexionAgent.DEFAULT_CRITIQUE_PROMPT[:100])
    logger.info("Default reflection prompt (first 100 chars)",
                prompt=ReflexionAgent.DEFAULT_REFLECT_PROMPT[:100])

    # Custom critique: strict quality check for compliance answers.
    # Must return "YES" or "NO: <reason>" — the ReflexionAgent parses this format.
    COMPLIANCE_CRITIQUE = """\
You are a senior compliance auditor evaluating an AI-generated compliance analysis.

Task that was given: {task}
AI response to evaluate: {response}

Evaluate the response against these criteria:
1. Does it cite specific regulation names or article numbers?
2. Does it address both the legal requirement AND the business impact?
3. Is the language precise (no vague terms like "may be required")?
4. Does it include actionable next steps or recommendations?

Reply with ONLY:
- "YES" if ALL four criteria are met.
- "NO: <brief list of failed criteria>" if any criterion is not met.
"""

    # Custom reflection prompt: tell the model exactly what to improve.
    # Must contain {task}, {previous_output}, and {critique}.
    COMPLIANCE_REFLECT = """\
You are a compliance analyst revising your previous analysis.

Original task: {task}

Your previous answer was rejected for the following reason:
{critique}

Previous answer (do NOT repeat this verbatim — improve it):
{previous_output}

Write a revised, improved analysis that addresses every point in the critique.
Include specific regulation citations, clear business impact, and concrete next steps.
"""

    inner = ChainOfThoughtAgent(
        llm_gateway=gateway,
        agent_id="inner-cot-compliance",
        logger=logger,
    )

    reflexion = ReflexionAgent(
        inner,
        checkpoint_manager=checkpoint_manager,
        max_reflections=2,
        critique_prompt=COMPLIANCE_CRITIQUE,    # ← custom critique prompt
        reflect_prompt=COMPLIANCE_REFLECT,       # ← custom reflection prompt
        agent_id="reflexion-custom-prompts",
        logger=logger,
        metrics=metrics,
    )

    task = (
        "Explain the compliance obligations of an EU-based payments company "
        "under PSD2 and GDPR when handling a customer data access request."
    )

    result = await reflexion.execute(task)
    logger.info("Custom-prompt reflexion result", success=result.success)
    print(f"\nCompliance analysis:\n{result.output[:600]}\n")


# ---------------------------------------------------------------------------
# Example 4: max_reflections tuning
#             — balancing quality vs. latency/cost
# ---------------------------------------------------------------------------

async def example_max_reflections_tuning(
    gateway: UnifiedLLMGateway,
) -> None:
    """
    max_reflections tuning.

    max_reflections=0 : Run the inner agent exactly once; skip all critiquing.
                        Equivalent to calling inner_agent.execute() directly.
                        Use when you want the Reflexion wrapper for structural
                        consistency but don't need quality gates.

    max_reflections=1 : One chance to improve — low latency overhead, often
                        enough for most tasks.

    max_reflections=3+: Higher quality ceiling but significant latency and cost.
                        Reserved for high-stakes, complex tasks.

    ReflexionAgent stops EARLY when the critique returns YES — you only pay for
    the reflections that are actually needed.
    """
    logger.info("=" * 60)
    logger.info("Example 4: max_reflections tuning")
    logger.info("=" * 60)

    task = "Summarise the key principles of responsible AI in 3 concise bullet points."

    for max_r in [0, 1, 2]:
        inner = ChainOfThoughtAgent(
            llm_gateway=gateway,
            agent_id=f"inner-cot-max{max_r}",
            logger=logger,
        )
        reflexion = ReflexionAgent(
            inner,
            max_reflections=max_r,
            agent_id=f"reflexion-max{max_r}",
            logger=logger,
            metrics=metrics,
        )

        result = await reflexion.execute(task)
        reflection_rounds = result.metadata.get("reflection_rounds", 0)
        logger.info(
            f"max_reflections={max_r}",
            reflection_rounds_used=reflection_rounds,
            output_length=len(result.output),
        )

    logger.info(
        "Tip: reflection_rounds in result.metadata tells you how many retries "
        "were actually needed (may be less than max_reflections if satisfied early)."
    )


# ---------------------------------------------------------------------------
# Example 5: Checkpoint and resume
#             — resume from a specific reflection round
# ---------------------------------------------------------------------------

async def example_checkpoint_resume(
    gateway: UnifiedLLMGateway,
    checkpoint_manager: CheckpointManager,
) -> None:
    """
    Checkpoint and resume.

    ReflexionAgent saves one checkpoint per reflection round so long-running
    reflexion chains can be resumed without re-running completed rounds.

    resume_from(execution_id)
        → resumes from the LATEST checkpoint (continues from where it stopped).

    resume_from(execution_id, from_reflection=1)
        → resumes from exactly reflection round 1 (re-runs rounds 2+ onward).
    """
    logger.info("=" * 60)
    logger.info("Example 5: Checkpoint and resume")
    logger.info("=" * 60)

    inner = ChainOfThoughtAgent(
        llm_gateway=gateway,
        agent_id="inner-cot-ckpt",
        logger=logger,
    )
    reflexion = ReflexionAgent(
        inner,
        checkpoint_manager=checkpoint_manager,
        max_reflections=2,
        agent_id="reflexion-ckpt-demo",
        logger=logger,
        metrics=metrics,
    )

    task = (
        "Design a high-level architecture for a GDPR-compliant AI-powered "
        "document processing system for a law firm."
    )
    result = await reflexion.execute(task)
    execution_id = result.metadata.get("execution_id")
    logger.info("Initial run complete", execution_id=execution_id,
                reflection_rounds=result.metadata.get("reflection_rounds", 0))

    if execution_id and reflexion.checkpoint_manager:
        # Resume from the latest checkpoint — continues from where it left off.
        resumed = await reflexion.resume_from(str(execution_id))
        logger.info("Resumed from latest checkpoint", output_length=len(resumed.output))

        # Resume from a specific reflection round.
        # This re-runs from round 1 onward (useful for partial re-evaluation).
        try:
            resumed_from_r1 = await reflexion.resume_from(str(execution_id), from_reflection=1)
            logger.info(
                "Resumed from reflection round 1",
                output_length=len(resumed_from_r1.output),
            )
        except ValueError as e:
            logger.info("No round-1 checkpoint available (task was accepted early)", error=str(e))


# ---------------------------------------------------------------------------
# Example 6: Accessing reflection metadata
#             — round count, critique text, inner-agent steps
# ---------------------------------------------------------------------------

async def example_reflection_metadata(
    gateway: UnifiedLLMGateway,
) -> None:
    """
    Reflection metadata.

    result.metadata contains:
    - "execution_id"       : UUID for this reflexion run (used for checkpoints)
    - "reflection_rounds"  : number of reflection cycles actually performed
    - "critiques"          : list of critique strings, one per reflection round
    - "inner_agent_id"     : agent_id of the inner agent that was wrapped

    result.steps contains ALL AgentSteps from ALL inner-agent attempts combined.
    Each step has: thought, action, action_input, observation, metadata.
    """
    logger.info("=" * 60)
    logger.info("Example 6: Accessing reflection metadata")
    logger.info("=" * 60)

    inner = ChainOfThoughtAgent(
        llm_gateway=gateway,
        agent_id="inner-cot-meta",
        logger=logger,
    )
    reflexion = ReflexionAgent(
        inner,
        max_reflections=2,
        agent_id="reflexion-meta-demo",
        logger=logger,
        metrics=metrics,
    )

    result = await reflexion.execute(
        "List five concrete ways AI can reduce administrative overhead in legal work."
    )

    # Execution metadata
    meta = result.metadata
    logger.info("Execution ID", execution_id=meta.get("execution_id"))
    logger.info("Reflection rounds used", rounds=meta.get("reflection_rounds", 0))

    # Critique texts — one per reflection round that was triggered.
    critiques = meta.get("critiques", [])
    for i, critique in enumerate(critiques, start=1):
        logger.info(f"  Critique {i}", text=critique[:120])

    # Inner-agent steps across all attempts.
    logger.info("Total inner-agent steps", count=len(result.steps))
    for i, step in enumerate(result.steps):
        logger.info(
            f"  Step {i+1}",
            action=step.action,
            thought=(step.thought or "")[:60],
        )

    # Reflexion success is True only if the inner agent's last attempt succeeded.
    logger.info("Final result", success=result.success, output=result.output[:200])


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    gateway = create_gateway()
    store = InMemoryStateStore()
    checkpoint_manager = create_checkpoint_manager(store)

    tool_registry = ToolRegistry()
    register_tools(tool_registry)

    await example_reflexion_on_cot(gateway, checkpoint_manager)
    await example_reflexion_on_react(gateway, tool_registry, checkpoint_manager)
    await example_custom_prompts(gateway, checkpoint_manager)
    await example_max_reflections_tuning(gateway)
    await example_checkpoint_resume(gateway, checkpoint_manager)
    await example_reflection_metadata(gateway)

    logger.info("All ReflexionAgent examples completed.")


if __name__ == "__main__":
    asyncio.run(main())

routing_example.py

"""
Routing Example

Demonstrates all four routing strategies for directing tasks to the right agent:

1. RuleBasedRouter    — explicit sync/async conditions evaluated in order
2. LoadBalancingRouter — weighted round-robin across agent pool
3. LLMRouter          — LLM reads agent descriptions and picks the best match
4. SemanticRouter     — cosine similarity between embedded input and route descriptions

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer
    pip install -e packages/shared-core

Usage::

    cd packages/orchestration-layer
    python examples/routing_example.py
"""

# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
# ============================================================================

import asyncio
import os
from pathlib import Path
from typing import List
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import BasicLogger, BasicMetricsCollector
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.routing.base import RoutingRequest
from gmf_forge_ai_orchestration.routing.rule_router import RuleBasedRouter
from gmf_forge_ai_orchestration.routing.load_balance_router import LoadBalancingRouter
from gmf_forge_ai_orchestration.routing.llm_router import LLMRouter
from gmf_forge_ai_orchestration.routing.semantic_router import SemanticRouter

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("routing_example")
metrics = BasicMetricsCollector()
tracer = get_tracer()

# Agent pool used across all examples
AGENTS = ["search_agent", "code_agent", "data_agent", "policy_agent"]


def create_gateway() -> UnifiedLLMGateway:
    """Build real LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs: dict = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider)


def make_request(task: str, agents: List[str] = None) -> RoutingRequest:
    return RoutingRequest(
        input=task,
        available_agents=agents or AGENTS,
        context={"source": "routing_example"},
    )


# ---------------------------------------------------------------------------
# Example 1: RuleBasedRouter — explicit keyword/condition rules
# ---------------------------------------------------------------------------

async def example_rule_router() -> None:
    logger.info("=" * 60)
    logger.info("Example 1: RuleBasedRouter — keyword-based routing")
    logger.info("=" * 60)

    # Async condition demonstrating that rules can be async callables
    async def is_policy_query(req: RoutingRequest) -> bool:
        keywords = ["policy", "compliance", "regulation", "guideline", "rule"]
        return any(kw in req.input.lower() for kw in keywords)

    router = RuleBasedRouter(
        rules=[
            (lambda r: any(w in r.input.lower() for w in ["search", "find", "look up", "web"]), "search_agent"),
            (lambda r: any(w in r.input.lower() for w in ["code", "function", "debug", "python", "script"]), "code_agent"),
            (is_policy_query, "policy_agent"),  # must precede data rule: "data retention policy" contains "data"
            (lambda r: any(w in r.input.lower() for w in ["data", "analyse", "csv", "sql", "database"]), "data_agent"),
        ],
        fallback_target="search_agent",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    tasks = [
        "Search the web for AI trends in 2026",
        "Write a Python function to parse JSON",
        "Analyse this CSV file for outliers",
        "What is the compliance policy for data retention?",
        "Summarise the meeting notes",          # falls back to search_agent
    ]

    for task in tasks:
        decision = await router.route(make_request(task))
        logger.info(
            "Rule routing decision",
            task=task[:55],
            target=decision.target,
            confidence=decision.confidence,
        )


# ---------------------------------------------------------------------------
# Example 2: LoadBalancingRouter — weighted round-robin
# ---------------------------------------------------------------------------

async def example_load_balancer() -> None:
    logger.info("=" * 60)
    logger.info("Example 2: LoadBalancingRouter — weighted round-robin")
    logger.info("=" * 60)

    # search_agent handles 2x the traffic of others (heavier capacity)
    router = LoadBalancingRouter(
        agent_weights={"search_agent": 2, "code_agent": 1, "data_agent": 1},
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    agents = ["search_agent", "code_agent", "data_agent"]
    targets = []

    for i in range(8):
        decision = await router.route(make_request(f"task {i}", agents=agents))
        targets.append(decision.target)

    logger.info("Round-robin sequence", targets=targets)
    # Expected pattern: search_agent appears twice per cycle
    from collections import Counter
    counts = Counter(targets)
    logger.info("Distribution", counts=dict(counts))


# ---------------------------------------------------------------------------
# Example 3: LLMRouter — LLM selects best agent from descriptions
# ---------------------------------------------------------------------------

async def example_llm_router() -> None:
    logger.info("=" * 60)
    logger.info("Example 3: LLMRouter — LLM-based intelligent routing")
    logger.info("=" * 60)

    # The real LLM selects the best matching agent from the descriptions provided.
    router = LLMRouter(
        llm_gateway=create_gateway(),
        agent_descriptions={
            "search_agent": "Searches the web and retrieves information from external sources.",
            "code_agent": "Writes, reviews, and debugs code in Python, JavaScript, and SQL.",
            "data_agent": "Analyses structured data, runs queries, and generates reports.",
            "policy_agent": "Handles compliance, policy lookup, and regulatory questions.",
        },
        fallback_target="search_agent",
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    tasks = [
        "Find the latest news on autonomous vehicles",
        "Refactor this Python class to use dataclasses",
        "Is our data retention policy compliant with GDPR?",
    ]

    for task in tasks:
        decision = await router.route(make_request(task))
        logger.info(
            "LLM routing decision",
            task=task[:60],
            target=decision.target,
            reasoning=decision.reasoning[:80],
        )


# ---------------------------------------------------------------------------
# Example 4: SemanticRouter — cosine similarity routing
# ---------------------------------------------------------------------------

async def example_semantic_router() -> None:
    logger.info("=" * 60)
    logger.info("Example 4: SemanticRouter — embedding similarity routing")
    logger.info("=" * 60)

    # In production, provide a real embedding function:
    #   async def embed(text: str) -> List[float]:
    #       response = await gateway.embed(text)
    #       return response.embedding
    #
    # For this demo we use a deterministic fake embedding based on word hashing.
    async def fake_embed(text: str) -> List[float]:
        """Domain-aware deterministic embedding for routing demo.

        Each of the 4 dimensions maps to one agent domain.
        Words from a domain push the vector toward that domain's axis,
        so cosine similarity correctly identifies the best-matching route.
        """
        DOMAIN_KEYWORDS = [
            # dim 0 — search / retrieval
            {"web", "search", "news", "find", "article", "retrieve",
             "information", "electric", "vehicles", "autonomous", "latest"},
            # dim 1 — code / programming
            {"code", "programming", "python", "function", "class", "method",
             "debug", "script", "sort", "write", "refactor", "dataclass"},
            # dim 2 — data / analytics
            {"data", "analysis", "sql", "csv", "database", "statistics",
             "report", "chart", "query", "duplicate", "records", "analyse"},
            # dim 3 — policy / compliance
            {"policy", "compliance", "regulation", "legal", "guideline",
             "rule", "gdpr", "requirements", "compliant"},
        ]
        vec = [0.0] * 4
        for raw_word in text.lower().split():
            word = raw_word.strip("?.,!:;")
            for dim, keywords in enumerate(DOMAIN_KEYWORDS):
                if word in keywords:
                    vec[dim] += 1.0
        norm = sum(x * x for x in vec) ** 0.5
        if norm == 0:
            return [0.25, 0.25, 0.25, 0.25]  # neutral — no keyword match
        return [x / norm for x in vec]

    router = SemanticRouter(
        embed_fn=fake_embed,
        fallback_target="search_agent",
        similarity_threshold=0.0,
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    # Register routes with descriptions — embeddings computed at registration time
    await router.add_route("search_agent", "web search information retrieval news articles")
    await router.add_route("code_agent", "code programming python function class method debug")
    await router.add_route("data_agent", "data analysis sql csv database statistics report chart")
    await router.add_route("policy_agent", "policy compliance regulation legal guideline rule")

    logger.info("Routes registered", count=4)

    tasks = [
        "Find news about electric vehicles",
        "Write a Python function to sort a list",
        "Create a SQL query to find duplicate records",
        "What are the GDPR compliance requirements?",
    ]

    for task in tasks:
        decision = await router.route(make_request(task, agents=list(router._route_embeddings.keys())))
        logger.info(
            "Semantic routing decision",
            task=task[:60],
            target=decision.target,
            confidence=round(decision.confidence, 3),
        )


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Routing Example starting")
    metrics.increment("example.routing.started")

    await example_rule_router()
    await example_load_balancer()
    await example_llm_router()
    await example_semantic_router()

    logger.info("Routing examples complete")
    metrics.increment("example.routing.completed")


if __name__ == "__main__":
    asyncio.run(main())

state_management_example.py

"""
State Management Example

Demonstrates all state management primitives:

1. InMemoryStateStore  — fast, in-process key/value store with TTL
2. RedisStateStore     — distributed, persistent store (requires Redis)
3. StateStoreFactory   — backend switching via config
4. ConversationState   — structured per-session message history
5. CheckpointManager   — save/restore agent execution snapshots
6. Blackboard          — shared read/write board for multi-agent coordination

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer

    # For Redis examples:
    docker run -p 6379:6379 redis:latest   (or set REDIS_URL in .env)

Usage::

    cd packages/orchestration-layer
    python examples/state_management_example.py
"""

import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.observability import BasicLogger, BasicMetricsCollector

from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore
from gmf_forge_ai_orchestration.state.factory import StateStoreFactory
from gmf_forge_ai_orchestration.state.base import ConversationState
from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
from gmf_forge_ai_orchestration.state.blackboard import Blackboard

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

logger = BasicLogger("state_management_example")
metrics = BasicMetricsCollector()

REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379")
STATE_BACKEND = os.getenv("STATE_BACKEND", "memory")


# ---------------------------------------------------------------------------
# Example 1: InMemoryStateStore — basic CRUD and TTL
# ---------------------------------------------------------------------------

async def example_in_memory_store() -> None:
    logger.info("=" * 60)
    logger.info("Example 1: InMemoryStateStore — CRUD and TTL")
    logger.info("=" * 60)

    store = InMemoryStateStore()

    # Basic set/get
    await store.set("user:alice", {"name": "Alice", "role": "analyst"})
    user = await store.get("user:alice")
    logger.info("Stored and retrieved user", user=user)

    # Check existence
    exists = await store.exists("user:alice")
    missing = await store.exists("user:nobody")
    logger.info("Existence checks", alice_exists=exists, nobody_exists=missing)

    # TTL — key expires after 1 second
    await store.set("temp:token", "abc123", ttl=1)
    token_before = await store.get("temp:token")
    logger.info("Token before expiry", token=token_before)

    await asyncio.sleep(1.1)
    token_after = await store.get("temp:token")
    logger.info("Token after expiry", token=token_after)   # None

    # Delete
    await store.delete("user:alice")
    after_delete = await store.get("user:alice")
    logger.info("After delete", value=after_delete)   # None

    # Concurrent writes — safe under asyncio.Lock
    async def write(key: str, value: int) -> None:
        await store.set(key, value)

    await asyncio.gather(*[write(f"concurrent:{i}", i) for i in range(10)])
    logger.info("Concurrent writes complete", keys_written=10)

    metrics.increment("example.state.memory_store_demonstrated")


# ---------------------------------------------------------------------------
# Example 2: RedisStateStore — distributed persistent store
# ---------------------------------------------------------------------------

async def example_redis_store() -> None:
    logger.info("=" * 60)
    logger.info("Example 2: RedisStateStore — distributed persistent store")
    logger.info("=" * 60)

    try:
        store = RedisStateStore(url=REDIS_URL, key_prefix="demo:")
    except ImportError:
        logger.warning("redis package not installed — skipping Redis example")
        return

    try:
        # Test connectivity with a simple set/get
        await store.set("ping", "pong")
        pong = await store.get("ping")
        if pong != "pong":
            logger.warning("Redis connection check failed — skipping Redis example")
            return
    except Exception as exc:
        logger.warning("Redis not reachable — skipping Redis example", error=str(exc))
        return

    # Set with TTL
    await store.set("session:xyz", {"user": "bob", "authenticated": True}, ttl=3600)
    session = await store.get("session:xyz")
    logger.info("Session stored in Redis", session=session)

    # Exists and delete
    await store.set("cache:result", {"computed": True})
    logger.info("Cache key exists", exists=await store.exists("cache:result"))
    await store.delete("cache:result")
    logger.info("Cache key after delete", exists=await store.exists("cache:result"))

    # Clear all keys with the demo: prefix
    await store.clear()
    logger.info("Redis store cleared")
    await store.close()

    metrics.increment("example.state.redis_store_demonstrated")


# ---------------------------------------------------------------------------
# Example 3: StateStoreFactory — switch backend via config
# ---------------------------------------------------------------------------

async def example_factory() -> None:
    logger.info("=" * 60)
    logger.info("Example 3: StateStoreFactory — backend switching")
    logger.info("=" * 60)

    # Switch backend by changing STATE_BACKEND env var (memory | redis)
    backend = STATE_BACKEND
    logger.info("Creating store", backend=backend)

    if backend == "redis":
        store = StateStoreFactory.create("redis", url=REDIS_URL, key_prefix="factory:")
    else:
        store = StateStoreFactory.create("memory")

    await store.set("factory:test", {"created_by": "factory", "backend": backend})
    value = await store.get("factory:test")
    logger.info("Factory-created store works", value=value, backend=backend)

    metrics.increment("example.state.factory_demonstrated", backend=backend)


# ---------------------------------------------------------------------------
# Example 4: ConversationState — structured message history
# ---------------------------------------------------------------------------

async def example_conversation_state() -> None:
    logger.info("=" * 60)
    logger.info("Example 4: ConversationState — structured message history")
    logger.info("=" * 60)

    store = InMemoryStateStore()
    session_id = "session-demo-001"

    # Create a new conversation
    conv = ConversationState(session_id=session_id)
    conv.add_message(role="user", content="What is the refund policy?")
    conv.add_message(role="assistant", content="You may request a refund within 30 days.")
    conv.add_message(role="user", content="What if the product was damaged?")

    # Persist to store
    await store.set(f"conv:{session_id}", conv.to_dict())
    logger.info("Conversation persisted", session_id=session_id, messages=len(conv.messages))

    # Retrieve and restore
    raw = await store.get(f"conv:{session_id}")
    restored = ConversationState.from_dict(raw)
    logger.info("Conversation restored", messages=len(restored.messages))

    for msg in restored.messages:
        logger.info(f"  [{msg.role}]", content=msg.content[:80])

    metrics.increment("example.state.conversation_demonstrated")


# ---------------------------------------------------------------------------
# Example 5: CheckpointManager — save/restore agent snapshots with TTL
# ---------------------------------------------------------------------------

async def example_checkpoints() -> None:
    logger.info("=" * 60)
    logger.info("Example 5: CheckpointManager — agent execution snapshots with TTL")
    logger.info("=" * 60)

    store = InMemoryStateStore()
    
    # -----------------------------------------------------------------------
    # 5a: Default TTL Configuration (24 hours)
    # -----------------------------------------------------------------------
    logger.info("5a: Default TTL Configuration")
    
    manager = CheckpointManager(store)  # Uses default TTL of 86400 seconds (24 hours)
    agent_id = "research-agent-001"

    # Simulate agent saving progress checkpoints during a long task
    # All checkpoint keys (data, index, exec) are written with TTL applied
    ckpt1 = await manager.save(
        agent_id,
        state={"step": 1, "results": ["paper_A"]},
        metadata={"task": "literature review", "step_name": "search"},
    )
    logger.info("Checkpoint 1 saved with default TTL", checkpoint_id=ckpt1)

    ckpt2 = await manager.save(
        agent_id,
        state={"step": 2, "results": ["paper_A", "paper_B", "paper_C"]},
        metadata={"task": "literature review", "step_name": "expand"},
    )
    logger.info("Checkpoint 2 saved with default TTL", checkpoint_id=ckpt2)

    # List all checkpoints for this agent
    # The index key (__ckpt_index__research-agent-001) stores this list and has TTL applied
    history = await manager.list(agent_id)
    logger.info("Checkpoint history", count=len(history))

    # Restore the latest checkpoint
    latest = await manager.load(ckpt2)
    logger.info(
        "Restored checkpoint",
        step=latest.state["step"],
        results=latest.state["results"],
        metadata=latest.metadata,
    )

    # Delete a checkpoint
    await manager.delete(ckpt1, agent_id)
    history_after = await manager.list(agent_id)
    logger.info("After delete", remaining_checkpoints=len(history_after))

    # -----------------------------------------------------------------------
    # 5b: Custom TTL Configuration (1 hour for short-lived tasks)
    # -----------------------------------------------------------------------
    logger.info("5b: Custom TTL Configuration")
    
    custom_ttl = 3600  # 1 hour for short-lived tasks
    manager_short = CheckpointManager(store, default_ttl=custom_ttl)
    agent_id_short = "short-task-agent"
    
    ckpt_short = await manager_short.save(
        agent_id_short,
        state={"phase": "processing", "progress": 50},
    )
    logger.info("Short-lived checkpoint saved", checkpoint_id=ckpt_short, ttl_seconds=custom_ttl)
    
    ckpt_restored = await manager_short.load(ckpt_short)
    logger.info("Short-lived checkpoint restored", phase=ckpt_restored.state["phase"])

    # -----------------------------------------------------------------------
    # 5c: Execution Context — Checkpoints grouped by execution ID
    # -----------------------------------------------------------------------
    logger.info("5c: Execution Context with Tracking")
    
    exec_id = "research-exec-2026-001"
    manager_exec = CheckpointManager(store, default_ttl=7200)  # 2 hours for this execution
    
    # Save multiple checkpoints for one execution
    ckpt_exec1 = await manager_exec.save(
        "research-agent",
        {"stage": "initial_research"},
        execution_id=exec_id,
    )
    logger.info("Execution checkpoint 1 saved", execution_id=exec_id, checkpoint_id=ckpt_exec1)
    
    ckpt_exec2 = await manager_exec.save(
        "research-agent",
        {"stage": "analysis", "papers_reviewed": 5},
        execution_id=exec_id,
    )
    logger.info("Execution checkpoint 2 saved", execution_id=exec_id, checkpoint_id=ckpt_exec2)
    
    # Retrieve all checkpoints for this execution
    # The exec key (__ckpt_exec__research-exec-2026-001) catalogs all checkpoint IDs
    exec_history = await manager_exec.list_by_execution(exec_id)
    logger.info("Checkpoints for execution", count=len(exec_history), execution_id=exec_id)
    
    # Load the latest checkpoint for resuming the execution
    latest_exec = await manager_exec.load_latest_for_execution(exec_id)
    logger.info(
        "Latest checkpoint for execution",
        execution_id=exec_id,
        stage=latest_exec.state["stage"],
        papers_reviewed=latest_exec.state.get("papers_reviewed", 0),
    )

    # -----------------------------------------------------------------------
    # 5d: TTL Architecture (All three key types co-expire)
    # -----------------------------------------------------------------------
    logger.info("5d: TTL Architecture Overview")
    logger.info("CheckpointManager maintains three key types per checkpoint:")
    logger.info("  __ckpt_data__<id>:    Actual checkpoint payload (expires with TTL)")
    logger.info("  __ckpt_index__<agent>: Agent catalog of checkpoint IDs (expires with TTL)")
    logger.info("  __ckpt_exec__<exec>:  Execution catalog (expires with TTL)")
    logger.info("  → All three expire together, preventing stale references")

    metrics.increment("example.state.checkpoints_demonstrated")


# ---------------------------------------------------------------------------
# Example 6: Blackboard — shared multi-agent communication board
# ---------------------------------------------------------------------------

async def example_blackboard() -> None:
    logger.info("=" * 60)
    logger.info("Example 6: Blackboard — shared multi-agent communication")
    logger.info("=" * 60)

    store = InMemoryStateStore()

    # Two agents share one blackboard (same store, same namespace)
    board_agent_a = Blackboard(store, namespace="research-run-001")
    board_agent_b = Blackboard(store, namespace="research-run-001")

    # Agent A writes its findings
    await board_agent_a.write("search_results", ["result_1", "result_2", "result_3"], author="search-agent")
    await board_agent_a.write("query", "AI agent architectures 2026", author="search-agent")
    logger.info("Agent A wrote to blackboard")

    # Agent B reads Agent A's findings
    results = await board_agent_b.read("search_results")
    query = await board_agent_b.read("query")
    logger.info("Agent B read from blackboard", results=results, query=query)

    # Agent B writes its synthesis
    await board_agent_b.write(
        "summary",
        "Agent architectures in 2026 focus on multi-modal reasoning and autonomy.",
        author="summarise-agent",
    )

    # List all entries on the board
    all_entries = await board_agent_a.list_entries()
    logger.info("All blackboard entries", count=len(all_entries))
    for entry in all_entries.values():
        logger.info(f"  [{entry.author}] {entry.key}", written_at=entry.written_at.isoformat())

    # Namespace isolation — separate run has its own board
    board_other_run = Blackboard(store, namespace="research-run-002")
    await board_other_run.write("search_results", ["other_result"], author="other-agent")

    run1_keys = await board_agent_a.keys()
    run2_keys = await board_other_run.keys()
    logger.info("Namespace isolation", run1_keys=run1_keys, run2_keys=run2_keys)

    metrics.increment("example.state.blackboard_demonstrated")


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("State Management Example starting")
    metrics.increment("example.state_management.started")

    await example_in_memory_store()
    await example_redis_store()
    await example_factory()
    await example_conversation_state()
    await example_checkpoints()
    await example_blackboard()

    logger.info("State management examples complete")
    metrics.increment("example.state_management.completed")


if __name__ == "__main__":
    asyncio.run(main())

workflow_example.py

"""
Workflow Example

Demonstrates the three workflow engines:

1. DAGWorkflow          — nodes with dependencies; independent nodes run in parallel
2. StateMachineWorkflow — state + transition table; conditional branching
3. EventDrivenWorkflow  — asyncio.Queue-based event bus; event chaining

Prerequisites:
    pip install python-dotenv
    pip install -e packages/orchestration-layer

Usage::

    cd packages/orchestration-layer
    python examples/workflow_example.py
"""

import asyncio
import os
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import BasicLogger, BasicMetricsCollector, BasicPerformanceMonitor
from gmf_forge_ai_shared_core.observability.tracing import get_tracer

from gmf_forge_ai_orchestration.agents.base import AgentResult
from gmf_forge_ai_orchestration.agents.react_agent import ReActAgent
from gmf_forge_ai_orchestration.workflows.dag_workflow import DAGWorkflow
from gmf_forge_ai_orchestration.workflows.base import WorkflowEdge, WorkflowNode
from gmf_forge_ai_orchestration.workflows.state_machine_workflow import StateMachineWorkflow
from gmf_forge_ai_orchestration.workflows.event_driven_workflow import (
    EventDrivenWorkflow,
    WorkflowEvent,
)

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv(Path(__file__).parent / ".env")

WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"

logger = BasicLogger("workflow_example")
metrics = BasicMetricsCollector()
monitor = BasicPerformanceMonitor()
tracer = get_tracer()


def create_gateway() -> UnifiedLLMGateway:
    """Build real LLM gateway from environment variables."""
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    api_key = os.getenv("AZURE_OPENAI_API_KEY")
    deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
    api_version = os.getenv("AZURE_OPENAI_API_VERSION", "2024-02-15-preview")

    if not endpoint or not api_key:
        raise EnvironmentError(
            "AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_API_KEY must be set in .env"
        )

    provider_kwargs = {
        "endpoint": endpoint,
        "api_key": api_key,
        "deployment_name": deployment,
        "api_version": api_version,
    }
    if CORPORATE_CERT.exists():
        provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)

    provider = AzureOpenAIProvider(**provider_kwargs)
    return UnifiedLLMGateway(default_provider=provider, performance_monitor=monitor)


def make_agent(name: str) -> ReActAgent:
    """Create a ReActAgent backed by the real LLM gateway."""
    return ReActAgent(
        llm_gateway=create_gateway(),
        agent_id=name,
        logger=logger,
        metrics=metrics,
        performance_monitor=monitor,
        tracer=tracer,
    )


# ---------------------------------------------------------------------------
# Example 1: DAGWorkflow — dependency-driven parallel execution
# ---------------------------------------------------------------------------

async def example_dag_workflow() -> None:
    logger.info("=" * 60)
    logger.info("Example 1: DAGWorkflow — parallel and sequential execution")
    logger.info("=" * 60)

    # Workflow: search + fetch_data run in parallel, then summarise, then report
    #
    #   search ──┐
    #              ├──► summarise ──► report
    #   fetch  ──┘

    search_agent = make_agent("search")
    fetch_agent = make_agent("fetch")
    summarise_agent = make_agent("summarise")
    report_agent = make_agent("report")

    wf = DAGWorkflow(
        nodes=[
            WorkflowNode(node_id="search", agent=search_agent),
            WorkflowNode(node_id="fetch", agent=fetch_agent),
            WorkflowNode(
                node_id="summarise",
                agent=summarise_agent,
                # Map outputs from upstream nodes into this node's input
                inputs_map={"search_output": "search", "data_output": "fetch"},
            ),
            WorkflowNode(
                node_id="report",
                agent=report_agent,
                inputs_map={"summary": "summarise"},
            ),
        ],
        edges=[
            WorkflowEdge(source="search", target="summarise"),
            WorkflowEdge(source="fetch", target="summarise"),
            WorkflowEdge(source="summarise", target="report"),
        ],
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    result = await wf.run({"query": "Analyse market trends for Q1 2026"})

    logger.info("DAG workflow complete", success=result.success)
    logger.info("Final output", output=result.final_output)
    for node_id, agent_result in result.outputs.items():
        logger.info(f"  Node '{node_id}' output", output=agent_result.output[:80])


# ---------------------------------------------------------------------------
# Example 2: StateMachineWorkflow — conditional branching
# ---------------------------------------------------------------------------

async def example_state_machine() -> None:
    logger.info("=" * 60)
    logger.info("Example 2: StateMachineWorkflow — state + conditional transitions")
    logger.info("=" * 60)

    # Workflow for document processing:
    #   validate → (if valid) enrich → summarise → END
    #              (if invalid) reject → END

    validate_agent = make_agent("validate")
    enrich_agent = make_agent("enrich")
    summarise_agent = make_agent("summarise")
    reject_agent = make_agent("reject")

    def is_valid(result: AgentResult) -> bool:
        return "VALID" in result.output

    def is_invalid(result: AgentResult) -> bool:
        return "INVALID" in result.output

    wf = StateMachineWorkflow(
        states={
            "validate": validate_agent,
            "enrich": enrich_agent,
            "summarise": summarise_agent,
            "reject": reject_agent,
        },
        transitions={
            "validate": [
                (is_valid, "enrich"),
                (None, "reject"),        # default if not valid
            ],
            "enrich": [(None, "summarise")],
            "summarise": [(None, "END")],
            "reject": [(None, "END")],
        },
        initial_state="validate",
        terminal_states={"END"},
        logger=logger,
        metrics=metrics,
        tracer=tracer,
    )

    result = await wf.run({"document": "invoice_2026_Q1.pdf"})

    logger.info(
        "State machine workflow complete",
        success=result.success,
        final_output=result.final_output,
        states_visited=list(result.outputs.keys()),
    )


# ---------------------------------------------------------------------------
# Example 3: EventDrivenWorkflow — event bus with chaining
# ---------------------------------------------------------------------------

async def example_event_driven() -> None:
    logger.info("=" * 60)
    logger.info("Example 3: EventDrivenWorkflow — event bus and chaining")
    logger.info("=" * 60)

    wf = EventDrivenWorkflow(terminal_event="processing_complete", logger=logger, metrics=metrics)
    processing_results = []

    # Handler: document received → validate it, emit validated event
    async def on_document_received(event: WorkflowEvent) -> Optional[WorkflowEvent]:
        doc = event.payload.get("document", "unknown")
        logger.info("Handling event", event_name=event.name, document=doc)
        processing_results.append(f"received: {doc}")

        # Chain: emit next event
        return WorkflowEvent(
            name="document_validated",
            payload={"document": doc, "valid": True},
            source="validation_handler",
        )

    # Handler: document validated → enrich it, emit enriched event
    async def on_document_validated(event: WorkflowEvent) -> Optional[WorkflowEvent]:
        doc = event.payload.get("document")
        logger.info("Handling event", event_name=event.name, document=doc)
        processing_results.append(f"validated: {doc}")

        return WorkflowEvent(
            name="document_enriched",
            payload={"document": doc, "tags": ["finance", "Q1", "2026"]},
            source="enrichment_handler",
        )

    # Handler: document enriched → store it, signal completion
    async def on_document_enriched(event: WorkflowEvent) -> Optional[WorkflowEvent]:
        doc = event.payload.get("document")
        tags = event.payload.get("tags", [])
        logger.info("Handling event", event_name=event.name, document=doc, tags=tags)
        processing_results.append(f"enriched: {doc}{tags}")

        # Emit terminal event to stop the workflow
        return WorkflowEvent(name="processing_complete", payload={"document": doc})

    async def on_processing_complete(event: WorkflowEvent) -> None:
        doc = event.payload.get("document")
        logger.info("Processing complete", document=doc)
        processing_results.append(f"completed: {doc}")
        return None   # terminal — no further chaining

    # Register handlers
    wf.on("document_received", on_document_received)
    wf.on("document_validated", on_document_validated)
    wf.on("document_enriched", on_document_enriched)
    wf.on("processing_complete", on_processing_complete)

    # Seed the initial event
    await wf.emit(WorkflowEvent(
        name="document_received",
        payload={"document": "contract_2026.pdf"},
        source="upload_api",
    ))

    result = await wf.run({})

    logger.info("Event-driven workflow complete", success=result.success)
    logger.info("Processing pipeline", steps=processing_results)


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

async def main() -> None:
    logger.info("Workflow Example starting")
    metrics.increment("example.workflows.started")

    await example_dag_workflow()
    await example_state_machine()
    await example_event_driven()

    logger.info("Workflow examples complete")
    metrics.increment("example.workflows.completed")


if __name__ == "__main__":
    asyncio.run(main())

.env.example

# Azure OpenAI Configuration
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-azure-openai-api-key
AZURE_OPENAI_DEPLOYMENT=gpt-4
AZURE_OPENAI_API_VERSION=2024-02-15-preview

# State Backend (memory | redis)
STATE_BACKEND=memory

# Redis (only required when STATE_BACKEND=redis)
REDIS_URL=redis://localhost:6379

# Checkpoint TTL Configuration (seconds)
# Controls how long checkpoint data persists in the state store.
# Default: 86400 (24 hours)
# All three checkpoint key types (data, index, exec) share this TTL and co-expire.
CHECKPOINT_TTL=86400

# =============================================================================
# Tracing  (gmf_forge_ai_shared_core.observability.tracing)
# Set TRACING_PROVIDER to activate a backend. Defaults to "null" (no-op).
# Supported values: null, splunk
# =============================================================================
TRACING_PROVIDER=null

# Splunk Observability Cloud (TRACING_PROVIDER=splunk)
SPLUNK_OTLP_ENDPOINT=https://ingest.<realm>.signalfx.com/v2/trace/otlp
SPLUNK_ACCESS_TOKEN=your-splunk-access-token
SPLUNK_SERVICE_NAME=gmf-forge-ai-orchestration

# =============================================================================
# OBO Token Behavior  (behaviors_example.py — Example 7)
# Provide credentials for whichever identity provider you use.
# USER_ASSERTION_TOKEN is the user's existing access token to exchange.
# =============================================================================

# Microsoft Entra ID OBO (Example 7a)
ENTRA_TENANT_ID=your-azure-ad-tenant-id
ENTRA_CLIENT_ID=your-agent-service-principal-app-id
ENTRA_CLIENT_SECRET=your-agent-service-principal-secret
# Comma-separated downstream scopes, e.g. api://servicenow/.default
ENTRA_SCOPES=api://servicenow/.default

# Okta RFC 8693 token-exchange (Example 7b)
OKTA_DOMAIN=company.okta.com
OKTA_CLIENT_ID=your-okta-client-id
OKTA_CLIENT_SECRET=your-okta-client-secret
# Comma-separated downstream scopes, e.g. servicenow.write
OKTA_SCOPES=servicenow.write
# Okta authorization server ID — "default" uses the org authorization server
OKTA_AUTHORIZATION_SERVER_ID=default

# The user's access token to exchange (required for both 7a and 7b)
#
# For local testing only — in case of Entra, acquire a token via the Azure CLI against your API's scope:
# az account get-access-token --resource api://<your-api-app-id> --query accessToken -o tsv
#
# For local testing only - in case of Okta use the below curl command to acquire a token (replace values in <>):
# curl -X POST https://<OKTA_DOMAIN>/oauth2/default/v1/token \
#  -H "Content-Type: application/x-www-form-urlencoded" \
#  -d "grant_type=password" \
#  -d "username=user@company.com" \
#  -d "password=yourpassword" \
#  -d "scope=openid" \
#  -d "client_id=<your-spa-client-id>"
USER_ASSERTION_TOKEN=eyJ...