README.md

# basic-rag-app

Retrieval-Augmented Generation over PolicyHub corporate policy documents.
Exposes a FastAPI server and a CLI for offline ingestion and ad-hoc querying.

---

## Architecture

```
[STEP 1 β€” POST /createindex  or  python app.py createindex]
    AzureAISearchIndexBuilder       (gmf_forge_ai_data.indexing)
        document_type=PolicyHubChunkDocument   ← full field schema
        .create_or_replace_index()

[STEP 2 β€” POST /ingest  or  python app.py ingest]
    connector/PolicyHubAdminConnector   (SoapConnector β†’ SOAP Admin API)
        β†’ RecursiveChunker              (gmf_forge_ai_data.chunkers)
        β†’ BatchEmbeddings               (gmf_forge_ai_data.embeddings / Azure OpenAI)
        β†’ AzureAISearchVectorStore      (gmf_forge_ai_data.vector_stores)

[QUERY β€” POST /query  or  python app.py query "…"]
    query/QueryPipeline
        QueryRewriter                   (gmf_forge_ai_data.query / LLM)
        β†’ AzureOpenAIEmbeddings         (gmf_forge_ai_data.embeddings)
        β†’ HybridRetriever               (gmf_forge_ai_data.retrieval)
        β†’ RelevanceFilter               (gmf_forge_ai_data.context)
        β†’ ContextWindowManager          (gmf_forge_ai_data.context)
    query/GenerationPipeline
        β†’ UnifiedLLMGateway             (gmf_forge_ai_shared_core.llm_gateway)
        ← structured XML response
```

---

## Source layout

```
apps/basic-rag-app/
β”œβ”€β”€ .env                        # credentials and tuning (not committed)
β”œβ”€β”€ postman-environment.json    # Postman environment β€” import to test the API
β”œβ”€β”€ setup.py
└── src/
    β”œβ”€β”€ app.py                  # FastAPI server + CLI entry point
    β”œβ”€β”€ config.py               # AppConfig dataclass β€” loaded from .env once
    β”œβ”€β”€ connector/
    β”‚   β”œβ”€β”€ __init__.py
    β”‚   └── policyhub.py        # PolicyHubAdminConnector (SoapConnector subclass)
    β”œβ”€β”€ ingestion/
    β”‚   β”œβ”€β”€ __init__.py
    β”‚   β”œβ”€β”€ models.py           # PolicyHubChunkDocument, PolicyHubIngestionResult
    β”‚   └── pipeline.py         # PolicyHubIngestionPipeline (loadβ†’chunkβ†’embedβ†’index)
    └── query/
        β”œβ”€β”€ __init__.py
        β”œβ”€β”€ pipeline.py         # QueryPipeline (rewriteβ†’embedβ†’retrieveβ†’filterβ†’window)
        └── generation.py       # GenerationPipeline (promptβ†’LLMβ†’parse XML response)
```

### Index schema

`PolicyHubChunkDocument` (in `ingestion/models.py`) extends the base `Document`
class with PolicyHub-specific fields.  When passed as `document_type` to
`AzureAISearchIndexBuilder`, each field becomes a dedicated, queryable Azure AI
Search field β€” not a value buried in a JSON blob.

| Field | Type | Notes |
|---|---|---|
| `id` | String (key) | chunk identifier |
| `content` | String | chunk text, full-text searchable |
| `embedding` | Collection(Single) | 1536-dim HNSW vector |
| `document_id` | String | PolicyHub document ID |
| `document_name` | String | document title, searchable |
| `documentlink` | String | URL to document in PolicyHub |
| `language` | String | facetable |
| `locale` | String | facetable |
| `revisionid` | String | filterable |
| `source` | String | WSDL endpoint, facetable |
| `upload_date` | String | filterable, sortable |
| `version` | String | filterable |
| `folder_id` | String | filterable |
| `folder_path` | String | filterable |
| `author` | String | filterable |
| `description` | String | filterable |
| `mime_type` | String | filterable |
| `parent_doc_id` | String | ID of the unchunked source document |

### Response format

Every `/query` call returns three parsed sections alongside the raw answer:

```xml
<SummarizedContent>
  LLM-generated answer based solely on retrieved policy documents
</SummarizedContent>
<Citations>
  [Document Name] "verbatim excerpt that supports the answer"
</Citations>
<References>
  <Item1>https://…/PolicyHub/Document/465</Item1>
  <Item2>https://…/PolicyHub/Document/1498</Item2>
</References>
```

---

## Prerequisites

- Python 3.11+
- Access to the PolicyHub SOAP Admin API (`AdminApi.svc?wsdl`)
- Azure OpenAI resource with embedding + chat models deployed
- Azure AI Search service (Basic tier or higher, `2024-07-01` API or newer)

---

## Installation

```bash
# From the monorepo root β€” install in editable mode
pip install -e packages/shared-core
pip install -e packages/data-layer
pip install -e apps/basic-rag-app
```

---

## Configuration

Place `.env` at `apps/basic-rag-app/.env` and fill in your values:

```ini
# ── Auth ──────────────────────────────────────────────────────────────────────
APP_API_KEY=your-secret-api-key

# ── Azure OpenAI ──────────────────────────────────────────────────────────────
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key
AZURE_OPENAI_EMBEDDING_MODEL=text-embedding-ada-002
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini
AZURE_OPENAI_CHAT_API_VERSION=2025-01-01-preview

# ── Azure AI Search ───────────────────────────────────────────────────────────
AZURE_SEARCH_ENDPOINT=https://your-search.search.windows.net
AZURE_SEARCH_API_KEY=your-search-api-key
AZURE_SEARCH_INDEX=policyhub-docs

# ── PolicyHub SOAP ────────────────────────────────────────────────────────────
MITRATECH_ADMIN_WSDL_URL=https://your-instance.policyhub.com/PolicyHubAPI/AdminApi.svc?wsdl
MITRATECH_API_USERNAME=your-username
MITRATECH_API_PASSWORD=your-password

# ── TLS (optional β€” only needed on corporate networks) ────────────────────────
SSL_CERT_PATH=../../certs/gmf_and_public_cas.pem

# ── RAG tuning (optional β€” defaults shown) ────────────────────────────────────
CHUNK_SIZE=512
CHUNK_OVERLAP=64
TOP_K=5
MIN_RELEVANCE_SCORE=0.01
MAX_CONTEXT_TOKENS=4000
EMBEDDING_DIMENSION=1536

# ── API server (optional β€” defaults shown) ────────────────────────────────────
API_HOST=0.0.0.0
API_PORT=8000
```

---

## Usage

All commands are run from `apps/basic-rag-app/`:

```bash
cd apps/basic-rag-app
```

### 1. Create the index (run once before first ingest)

Provisions the full Azure AI Search index schema including all `PolicyHubChunkDocument`
fields.  **If the index already exists it is dropped and recreated β€” all documents are
lost.**  Run once, or when the schema must change.

```bash
python src/app.py createindex
```

### 2. Ingest policy documents

```bash
# Ingest all documents
python src/app.py ingest

# Ingest a limited sample (development / testing)
python src/app.py ingest --max-docs 50
```

### 3. Start the API server

```bash
python src/app.py serve
```

API: `http://localhost:8000`
Interactive docs: `http://localhost:8000/docs`

### 4. Query from the CLI

```bash
python src/app.py query "What is the cybersecurity incident response policy?"
```

---

## API Reference

All protected endpoints require the header `X-API-Key: <APP_API_KEY>`.

### `GET /health` β€” no auth

```json
{ "status": "ok", "index": "policyhub-docs", "index_status": "ready", "doc_count": 4231 }
```

`index_status` is `"not_created"` when the index does not yet exist (server stays healthy).

### `GET /stats` β€” auth required

```json
{
  "index": "policyhub-docs",
  "index_status": "ready",
  "doc_count": 4231,
  "chunk_size": 512,
  "chunk_overlap": 64,
  "top_k": 5,
  "min_relevance_score": 0.75,
  "max_context_tokens": 4000
}
```

### `POST /createindex` β€” auth required

No request body. Provisions the index; replaces if it already exists.

```json
{
  "index_name": "policyhub-docs",
  "existed": false,
  "replaced": false,
  "message": "Index 'policyhub-docs' created successfully."
}
```

### `POST /ingest` β€” auth required

```json
{ "max_docs": 100 }
```

Response:

```json
{
  "documents_loaded": 100,
  "chunks_created": 631,
  "chunks_indexed": 631,
  "duration_ms": 142300
}
```

### `POST /query` β€” auth required

```json
{ "query": "What is the data classification policy?" }
```

Response:

```json
{
  "query": "What is the data classification policy?",
  "rewritten_query": "data classification policy requirements tiers",
  "answer": "<SummarizedContent>…</SummarizedContent><Citations>…</Citations><References>…</References>",
  "summarized_content": "All corporate data must be classified into one of four tiers…",
  "citations": "[Data Classification Policy] \"All data must be classified…\"",
  "references": [
    "https://your-instance.policyhub.com/PolicyHub/Document/312",
    "https://your-instance.policyhub.com/PolicyHub/Document/874"
  ],
  "sources": [
    {
      "document_name": "Data Classification Policy",
      "documentlink": "https://…/PolicyHub/Document/312",
      "score": "0.9142",
      "rank": "0"
    }
  ],
  "duration_ms": 1843
}
```

---

## Logging

Set `LOG_LEVEL=DEBUG` in `.env` for verbose SOAP and retrieval traces.
Default (`INFO`) shows pipeline milestones and warnings only.

policyhub-rag-api.postman_collection.json

{
  "info": {
    "name": "PolicyHub RAG API",
    "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
  },
  "variable": [
    { "key": "base_url", "value": "http://localhost:8000" },
    { "key": "api_key",  "value": "your-secret-api-key-here" }
  ],
  "item": [
    {
      "name": "Health",
      "request": {
        "method": "GET",
        "url": "{{base_url}}/health"
      }
    },
    {
      "name": "Stats",
      "request": {
        "method": "GET",
        "url": "{{base_url}}/stats",
        "header": [{ "key": "X-API-Key", "value": "{{api_key}}" }]
      }
    },
    {
      "name": "Create Index",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/createindex",
        "header": [{ "key": "X-API-Key", "value": "{{api_key}}" }]
      }
    },
    {
      "name": "Ingest β€” all documents",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/ingest",
        "header": [
          { "key": "X-API-Key",      "value": "{{api_key}}" },
          { "key": "Content-Type",   "value": "application/json" }
        ],
        "body": { "mode": "raw", "raw": "{}" }
      }
    },
    {
      "name": "Ingest β€” sample 50 docs",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/ingest",
        "header": [
          { "key": "X-API-Key",      "value": "{{api_key}}" },
          { "key": "Content-Type",   "value": "application/json" }
        ],
        "body": { "mode": "raw", "raw": "{\"max_docs\": 50}" }
      }
    },
    {
      "name": "Query β€” cybersecurity policy",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/query",
        "header": [
          { "key": "X-API-Key",      "value": "{{api_key}}" },
          { "key": "Content-Type",   "value": "application/json" }
        ],
        "body": {
          "mode": "raw",
          "raw": "{\"query\": \"What is the cybersecurity incident response policy?\"}"
        }
      }
    },
    {
      "name": "Query β€” data classification",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/query",
        "header": [
          { "key": "X-API-Key",      "value": "{{api_key}}" },
          { "key": "Content-Type",   "value": "application/json" }
        ],
        "body": {
          "mode": "raw",
          "raw": "{\"query\": \"What are the data classification requirements?\"}"
        }
      }
    },
    {
      "name": "Query β€” password requirements",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/query",
        "header": [
          { "key": "X-API-Key",      "value": "{{api_key}}" },
          { "key": "Content-Type",   "value": "application/json" }
        ],
        "body": {
          "mode": "raw",
          "raw": "{\"query\": \"What are the password requirements?\"}"
        }
      }
    },
    {
      "name": "Query β€” auth failure (401)",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/query",
        "header": [
          { "key": "X-API-Key",      "value": "wrong-key" },
          { "key": "Content-Type",   "value": "application/json" }
        ],
        "body": {
          "mode": "raw",
          "raw": "{\"query\": \"test\"}"
        }
      }
    }
  ]
}

policyhub-rag-api.postman_environment.json

{
  "id": "policyhub-rag-api-env",
  "name": "PolicyHub RAG API β€” Local",
  "values": [
    {
      "key": "base_url",
      "value": "http://localhost:8000",
      "type": "default",
      "enabled": true
    },
    {
      "key": "api_key",
      "value": "<some_api_key>",
      "type": "secret",
      "enabled": true
    }
  ],
  "_postman_variable_scope": "environment",
  "_postman_exported_at": "2026-05-26T00:00:00.000Z",
  "_postman_exported_using": "Postman/11.0"
}

setup.py

"""Basic RAG Application."""

from setuptools import setup, find_packages

setup(
    name="basic-rag-app",
    version="0.1.0",
    packages=find_packages(where="src"),
    package_dir={"": "src"},
    python_requires=">=3.11",
    install_requires=[
        "gmf-forge-ai-shared-core>=1.0.0",
        "gmf-forge-ai-data>=1.0.0",
        "fastapi>=0.111.0",
        "uvicorn[standard]>=0.29.0",
        "pydantic>=2.0.0",
        "python-dotenv>=1.0.0",
    ],
    entry_points={
        "console_scripts": [
            "basic-rag-app=app:main",
        ],
    },
)

src/__init__.py

src/app.py

"""basic-rag-app β€” FastAPI server + CLI entry point.

Endpoints:
    GET  /health          liveness probe β€” no auth required
    GET  /stats           index statistics (X-API-Key required)
    POST /ingest          run ingestion pipeline (X-API-Key required)
    POST /query           run query + generation pipeline (X-API-Key required)

CLI:
    python app.py serve                       # start FastAPI server
    python app.py ingest [--max-docs N]       # offline ingest then exit
    python app.py query "your question"       # single query then exit

Auth:
    All protected endpoints require:  X-API-Key: <APP_API_KEY>

Query response format:
    <SummarizedContent> … </SummarizedContent>
    <Citations>         … </Citations>
    <References>        … </References>
"""

import argparse
import asyncio
import secrets
from contextlib import asynccontextmanager
from typing import Optional

import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import APIKeyHeader
from pydantic import BaseModel

from azure.core.exceptions import ResourceNotFoundError
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_data.indexing import AzureAISearchIndexBuilder
from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore

from config import AppConfig, load_config
from ingestion import PolicyHubIngestionPipeline, PolicyHubChunkDocument, PolicyHubIngestionResult
from query import QueryPipeline, GenerationPipeline

logger = BasicLogger(__name__)


# ── Auth ───────────────────────────────────────────────────────────────────────

_API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=True)


def _make_auth_dep(config: AppConfig):
    """Return a FastAPI dependency that validates the X-API-Key header."""
    def _dep(key: str = Security(_API_KEY_HEADER)) -> None:
        if not secrets.compare_digest(key, config.app_api_key):
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid API key",
            )
    return _dep


# ── Request / response models ──────────────────────────────────────────────────

class CreateIndexResponse(BaseModel):
    index_name: str
    existed: bool
    replaced: bool
    message: str


class IngestRequest(BaseModel):
    max_docs: Optional[int] = None


class IngestResponse(BaseModel):
    documents_loaded: int
    chunks_created: int
    chunks_indexed: int
    duration_ms: int


class QueryRequest(BaseModel):
    query: str


class QueryResponse(BaseModel):
    summarized_content: str
    citations: str
    references: list[str]


# ── Application factory ────────────────────────────────────────────────────────

def create_app(config: Optional[AppConfig] = None) -> FastAPI:
    cfg = config or load_config()
    auth = _make_auth_dep(cfg)

    ingestion = PolicyHubIngestionPipeline(cfg)
    query_pipeline = QueryPipeline(cfg)
    generation = GenerationPipeline(cfg)

    @asynccontextmanager
    async def lifespan(_app: FastAPI):
        logger.info(
            "basic-rag-app started",
            index=cfg.search_index,
            host=cfg.api_host,
            port=cfg.api_port,
        )
        yield
        logger.info("basic-rag-app shutting down")

    app = FastAPI(
        title="PolicyHub RAG API",
        description=(
            "Retrieval-Augmented Generation over PolicyHub corporate policy documents. "
            "Ingest documents via POST /ingest, then query via POST /query."
        ),
        version="0.1.0",
        lifespan=lifespan,
    )

    def _store() -> AzureAISearchVectorStore:
        return AzureAISearchVectorStore(
            endpoint=cfg.search_endpoint,
            index_name=cfg.search_index,
            api_key=cfg.search_api_key,
            embedding_dimension=cfg.embedding_dimension,
        )

    # ── Routes ─────────────────────────────────────────────────────────────────

    @app.get("/health")
    async def health():
        """Liveness probe β€” no auth required."""
        try:
            doc_count = _store().count()
            index_status = "ready"
        except ResourceNotFoundError:
            doc_count = 0
            index_status = "not_created"
        return {"status": "ok", "index": cfg.search_index, "index_status": index_status, "doc_count": doc_count}

    @app.get("/stats", dependencies=[Depends(auth)])
    async def stats():
        """Index and RAG configuration statistics."""
        try:
            doc_count = _store().count()
            index_status = "ready"
        except ResourceNotFoundError:
            doc_count = 0
            index_status = "not_created"
        return {
            "index": cfg.search_index,
            "index_status": index_status,
            "doc_count": doc_count,
            "chunk_size": cfg.chunk_size,
            "chunk_overlap": cfg.chunk_overlap,
            "top_k": cfg.top_k,
            "min_relevance_score": cfg.min_relevance_score,
            "max_context_tokens": cfg.max_context_tokens,
        }

    @app.post("/createindex", response_model=CreateIndexResponse, dependencies=[Depends(auth)])
    async def create_index():
        """
        Create or replace the Azure AI Search index using AzureAISearchIndexBuilder.

        **Warning:** if the index already exists it is dropped and recreated β€”
        all indexed documents are lost.  Call this once before the first ingest,
        or when the schema must change (e.g. different embedding dimension).
        Do NOT call before every ingest run.
        """
        def _run():
            builder = AzureAISearchIndexBuilder(
                endpoint=cfg.search_endpoint,
                api_key=cfg.search_api_key,
                index_name=cfg.search_index,
                embedding_dimension=cfg.embedding_dimension,
                document_type=PolicyHubChunkDocument,
                ssl_cert_path=cfg.ssl_cert_path,
                semantic_config={
                    "name": "policyhub-semantic-config",
                    "title_field": "document_name",
                    "content_fields": ["content"],
                    "keyword_fields": ["language", "locale", "source"],
                },
            )
            existed = builder.index_exists()
            builder.create_or_replace_index()
            return existed

        existed = await asyncio.get_event_loop().run_in_executor(None, _run)
        action = "replaced" if existed else "created"
        return CreateIndexResponse(
            index_name=cfg.search_index,
            existed=existed,
            replaced=existed,
            message=f"Index '{cfg.search_index}' {action} successfully.",
        )

    @app.post("/ingest", response_model=IngestResponse, dependencies=[Depends(auth)])
    async def ingest(body: IngestRequest):
        """
        Run the full ingestion pipeline. Replaces the existing index.
        Pass ``max_docs`` to cap documents loaded (useful for testing).
        """
        logger.info("Ingestion started", max_docs=body.max_docs)
        result: PolicyHubIngestionResult = await asyncio.get_event_loop().run_in_executor(
            None, lambda: ingestion.run(max_docs=body.max_docs)
        )
        logger.info(
            "Ingestion complete",
            documents=result.documents_loaded,
            chunks=result.chunks_indexed,
            duration_ms=result.duration_ms,
        )
        return IngestResponse(
            documents_loaded=result.documents_loaded,
            chunks_created=result.chunks_created,
            chunks_indexed=result.chunks_indexed,
            duration_ms=result.duration_ms,
        )

    @app.post("/query", response_model=QueryResponse, dependencies=[Depends(auth)])
    async def query(body: QueryRequest):
        """
        Query the indexed policy documents.

        Returns a structured response:
            <SummarizedContent> LLM answer        </SummarizedContent>
            <Citations>         Source excerpts   </Citations>
            <References>        Source URLs/names </References>
        """
        if not body.query.strip():
            raise HTTPException(status_code=400, detail="Query must not be empty")

        query_result = await query_pipeline.run(body.query)
        gen_result = await generation.generate(body.query, query_result.context)

        return QueryResponse(
            summarized_content=gen_result.summarized_content,
            citations=gen_result.citations,
            references=gen_result.references,
        )

    return app


# ── CLI ────────────────────────────────────────────────────────────────────────

def _cli_ingest(cfg: AppConfig, max_docs: Optional[int]) -> None:
    result = PolicyHubIngestionPipeline(cfg).run(max_docs=max_docs)
    logger.info(
        "Ingestion complete",
        documents=result.documents_loaded,
        chunks=result.chunks_indexed,
        duration_ms=result.duration_ms,
    )


async def _cli_query(cfg: AppConfig, query_text: str) -> None:
    qp = QueryPipeline(cfg)
    gp = GenerationPipeline(cfg)
    query_result = await qp.run(query_text)
    gen_result = await gp.generate(query_text, query_result.context)
    print(gen_result.answer)


def main() -> None:
    parser = argparse.ArgumentParser(prog="basic-rag-app")
    sub = parser.add_subparsers(dest="command", required=True)

    sub.add_parser("serve", help="Start the FastAPI server")

    sub.add_parser("createindex", help="Create or replace the Azure AI Search index and exit")

    p_ingest = sub.add_parser("ingest", help="Run ingestion pipeline and exit")
    p_ingest.add_argument("--max-docs", type=int, default=None,
                          help="Cap the number of documents loaded from PolicyHub")

    p_query = sub.add_parser("query", help="Run a single query and exit")
    p_query.add_argument("text", help="Query text")

    args = parser.parse_args()
    cfg = load_config()

    if args.command == "serve":
        uvicorn.run(create_app(cfg), host=cfg.api_host, port=cfg.api_port)
    elif args.command == "createindex":
        builder = AzureAISearchIndexBuilder(
            endpoint=cfg.search_endpoint,
            api_key=cfg.search_api_key,
            index_name=cfg.search_index,
            embedding_dimension=cfg.embedding_dimension,
            document_type=PolicyHubChunkDocument,
            ssl_cert_path=cfg.ssl_cert_path,
            semantic_config={
                "name": "policyhub-semantic-config",
                "title_field": "document_name",
                "content_fields": ["content"],
                "keyword_fields": ["language", "locale", "source"],
            },
        )
        existed = builder.index_exists()
        builder.create_or_replace_index()
        action = "replaced" if existed else "created"
        logger.info(f"Index {action}", index=cfg.search_index)
    elif args.command == "ingest":
        _cli_ingest(cfg, max_docs=args.max_docs)
    elif args.command == "query":
        asyncio.run(_cli_query(cfg, args.text))


if __name__ == "__main__":
    main()

src/config.py

"""Application configuration β€” loaded once from environment variables."""

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

from dotenv import load_dotenv

from gmf_forge_ai_shared_core.observability import BasicLogger

logger = BasicLogger(__name__)

_ENV_PATH = Path(__file__).parent.parent / ".env"


@dataclass
class AppConfig:
    # ── Auth ──────────────────────────────────────────────────────────────────
    app_api_key: str              # X-API-Key header value expected on all endpoints

    # ── PolicyHub SOAP source ─────────────────────────────────────────────────
    admin_wsdl_url: str
    api_username: str
    api_password: str

    # ── Azure OpenAI ───────────────────────────────────────────────────────────
    openai_endpoint: str
    openai_api_key: str
    embedding_deployment: str     # e.g. text-embedding-ada-002-2policychatbot
    chat_deployment: str          # e.g. gpt-4o
    chat_api_version: str

    # ── Azure AI Search ───────────────────────────────────────────────────────
    search_endpoint: str
    search_api_key: str

    # ── Optional infrastructure ───────────────────────────────────────────────
    ssl_cert_path: Optional[str] = None

    # ── Index / RAG tuning ────────────────────────────────────────────────────
    search_index: str = "policyhub-docs"
    embedding_dimension: int = 1536
    chunk_size: int = 512
    chunk_overlap: int = 64
    top_k: int = 5
    min_relevance_score: float = 0.01  # HybridRetriever (Azure RRF) scores max at ~0.05
    max_context_tokens: int = 4000

    # ── API server ────────────────────────────────────────────────────────────
    api_host: str = "0.0.0.0"
    api_port: int = 8000


def load_config() -> AppConfig:
    """Load and validate AppConfig from environment variables.

    Raises:
        ValueError: if any required environment variable is missing.
    """
    load_dotenv(_ENV_PATH)

    missing: list[str] = []

    def _require(name: str) -> str:
        val = os.getenv(name, "").strip()
        if not val:
            missing.append(name)
        return val

    def _optional(name: str, default: str = "") -> str:
        return os.getenv(name, default).strip() or default

    config = AppConfig(
        app_api_key=_require("APP_API_KEY"),
        admin_wsdl_url=_require("MITRATECH_ADMIN_WSDL_URL"),
        api_username=_require("MITRATECH_API_USERNAME"),
        api_password=_require("MITRATECH_API_PASSWORD"),
        openai_endpoint=_require("AZURE_OPENAI_ENDPOINT"),
        openai_api_key=_require("AZURE_OPENAI_API_KEY"),
        embedding_deployment=_require("AZURE_OPENAI_EMBEDDING_MODEL"),
        chat_deployment=_require("AZURE_OPENAI_CHAT_DEPLOYMENT"),
        search_endpoint=_require("AZURE_SEARCH_ENDPOINT"),
        search_api_key=_require("AZURE_SEARCH_API_KEY"),
        chat_api_version=_optional("AZURE_OPENAI_CHAT_API_VERSION", "2024-02-15-preview"),
        ssl_cert_path=_optional("SSL_CERT_PATH") or None,
        search_index=_optional("AZURE_SEARCH_INDEX", "policyhub-docs"),
        embedding_dimension=int(_optional("EMBEDDING_DIMENSION", "1536")),
        chunk_size=int(_optional("CHUNK_SIZE", "512")),
        chunk_overlap=int(_optional("CHUNK_OVERLAP", "64")),
        top_k=int(_optional("TOP_K", "5")),
        min_relevance_score=float(_optional("MIN_RELEVANCE_SCORE", "0.75")),
        max_context_tokens=int(_optional("MAX_CONTEXT_TOKENS", "4000")),
        api_host=_optional("API_HOST", "0.0.0.0"),
        api_port=int(_optional("API_PORT", "8000")),
    )

    if missing:
        raise ValueError(f"Missing required environment variables: {', '.join(missing)}")

    logger.info(
        "Configuration loaded",
        index=config.search_index,
        top_k=config.top_k,
        chunk_size=config.chunk_size,
    )
    return config

src/connector/__init__.py

from .policyhub import PolicyHubAdminConnector

__all__ = ["PolicyHubAdminConnector"]

src/connector/policyhub.py

"""PolicyHub Admin API SOAP connector β€” local to the basic-rag-app.

Traverses the full PolicyHub folder tree, fetches document content, and
returns ``Document`` objects ready for chunking and indexing.

The developer example version (with dry-run mode and MyLibrary support)
lives in packages/data-layer/examples/soap_connector_example.py.
"""

from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional

from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_data.connectors import SoapConnector
from gmf_forge_ai_data.vector_stores import Document

logger = BasicLogger(__name__)

_CREDENTIALS_QNAME = (
    "{http://schemas.datacontract.org/2004/07/"
    "HitecLabs.PolicyHub.Api.Wcf.Security}Credentials"
)


class PolicyHubAdminConnector(SoapConnector):
    """
    Connects to the PolicyHub Admin API via SOAP and loads documents.

    Usage::

        with PolicyHubAdminConnector(wsdl_url=..., username=..., password=...) as conn:
            docs = conn.load(max_docs=50)
    """

    def __init__(self, folder_ids: Optional[List[str]] = None, **kwargs):
        super().__init__(credentials_type_qname=_CREDENTIALS_QNAME, **kwargs)
        self.folder_ids = folder_ids

    # ── Raw API calls ──────────────────────────────────────────────────────────

    def get_library(self) -> Dict[str, Any]:
        return self.call("GetLibrary")

    def get_documents_by_folder(self, folder_id: str) -> Dict[str, Any]:
        return self.call("GetDocumentsByFolder", folderId=folder_id)

    def get_document_data(self, document_id: str) -> Dict[str, Any]:
        return self.call("GetDocumentData", documentId=document_id)

    # ── High-level loader ──────────────────────────────────────────────────────

    def load(self, max_docs: Optional[int] = None) -> List[Document]:
        """Traverse the library tree and return Document objects.

        Args:
            max_docs: Cap the total returned.  ``None`` = entire library.
        """
        return list(self.stream(max_docs=max_docs))

    def stream(self, max_docs: Optional[int] = None) -> Iterator[Document]:
        """Yield Documents one at a time without accumulating the full list.

        Suitable for batch-processing pipelines that want to chunk/embed/index
        each document (or small group of documents) without holding the entire
        corpus in memory.

        Args:
            max_docs: Stop after this many documents.  ``None`` = entire library.
        """
        library = self.get_library()
        folders = list(self._traverse_folders(library))
        if self.folder_ids:
            folders = [f for f in folders if f["FolderId"] in self.folder_ids]

        count = 0
        base_url = self.wsdl_url.split("/PolicyHubAPI/")[0]

        for folder in folders:
            if max_docs is not None and count >= max_docs:
                return

            folder_id = folder["FolderId"]
            folder_path = folder.get("LibraryPath", folder.get("Name", ""))

            try:
                result = self.get_documents_by_folder(folder_id)
            except Exception as exc:
                logger.debug("Skipping folder", folder_id=folder_id, error=str(exc))
                continue

            raw_docs = _extract_list(result, "PolicyHubDocument")
            if not raw_docs:
                continue

            for raw in raw_docs:
                if max_docs is not None and count >= max_docs:
                    return
                doc = self._build_document(raw, folder_id, folder_path, base_url)
                if doc:
                    count += 1
                    yield doc

        logger.info("PolicyHub stream complete", total_documents=count)

    def _build_document(
        self,
        raw: Dict[str, Any],
        folder_id: str,
        folder_path: str,
        base_url: str,
    ) -> Optional[Document]:
        doc_id = raw.get("DocumentId")
        doc_title = raw.get("Name", "Untitled")

        try:
            data = self.get_document_data(doc_id)
        except Exception as exc:
            logger.warning("Could not fetch document data",
                           document_id=doc_id, title=doc_title, error=str(exc))
            return None

        mime_type   = (data.get("MimeType") or "") if data else ""
        revision_id = str(data.get("RevisionId", "")) if data else ""
        version     = str(data.get("Version", "")) if data else ""
        language    = (data.get("Language") or "") if data else ""
        locale      = (data.get("Locale") or "") if data else ""
        raw_bytes   = data.get("Data") if data else None

        ext     = _mime_to_ext(mime_type)
        content = self.extract_text(raw_bytes or b"", f"{doc_title}{ext}")

        if not content.strip():
            logger.debug("Skipping empty document", document_id=doc_id, title=doc_title)
            return None

        return Document(
            id=self._make_doc_id(str(doc_id)),
            content=content.strip(),
            timestamp=_parse_date(raw.get("LastModifiedDateUtc")),
            metadata={
                "document_id":   str(doc_id),
                "document_name": doc_title,
                "documentlink":  f"{base_url}/PolicyHub/Document/{doc_id}",
                "language":      language,
                "locale":        locale,
                "revisionid":    revision_id,
                "source":        self.wsdl_url,
                "upload_date":   str(raw.get("PublishedDateUtc", "")),
                "version":       version,
                "folder_id":     str(folder_id),
                "folder_path":   folder_path,
                "description":   raw.get("Description", ""),
                "author":        raw.get("AuthorName", ""),
                "mime_type":     mime_type,
                "file_name":     f"{doc_title}{ext}",
                "created_at":    str(raw.get("CreationDateUtc", "")),
                "modified_at":   str(raw.get("LastModifiedDateUtc", "")),
            },
        )

    def _traverse_folders(self, node: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
        yield node
        children = (node.get("Children") or {}).get("PolicyHubFolder", [])
        if not isinstance(children, list):
            children = [children]
        for child in children:
            if isinstance(child, dict):
                yield from self._traverse_folders(child)


# ── Helpers ────────────────────────────────────────────────────────────────────

def _extract_list(result: Any, key: str) -> List[Dict[str, Any]]:
    if isinstance(result, list):
        return result
    if isinstance(result, dict):
        val = result.get(key)
        if val is None:
            return []
        return val if isinstance(val, list) else [val]
    return []


def _mime_to_ext(mime_type: str) -> str:
    mime = (mime_type or "").lower()
    if "pdf" in mime:
        return ".pdf"
    if "word" in mime or "document" in mime or "officedocument" in mime:
        return ".docx"
    if "text" in mime:
        return ".txt"
    return ".bin"


def _parse_date(value: Any) -> Optional[datetime]:
    if value is None:
        return None
    if isinstance(value, datetime):
        return value
    try:
        return datetime.fromisoformat(str(value))
    except Exception:
        return None

src/ingestion/__init__.py

from .models import PolicyHubChunkDocument, PolicyHubIngestionResult
from .pipeline import PolicyHubIngestionPipeline

__all__ = [
    "PolicyHubChunkDocument",
    "PolicyHubIngestionResult",
    "PolicyHubIngestionPipeline",
]

src/ingestion/models.py

"""Domain models for the basic-rag-app.

PolicyHubChunkDocument  β€” Document subclass with PolicyHub metadata as typed
                           indexed fields.  Passed to AzureAISearchIndexBuilder
                           so every field becomes a dedicated index field
                           (not a value buried in the document_data blob).

PolicyHubIngestionResult β€” plain result dataclass returned by the ingestion
                           pipeline after a run completes.
"""

from dataclasses import dataclass, field
from typing import Optional

from gmf_forge_ai_data.vector_stores import Document


@dataclass
class PolicyHubChunkDocument(Document):
    """
    A chunked PolicyHub document with all metadata as first-class index fields.

    Pass this class as ``document_type`` to ``AzureAISearchIndexBuilder`` to
    automatically provision one Azure AI Search field per attribute below.
    ``AzureAISearchVectorStore`` will write each populated attribute as an
    indexed field on every upload.
    """
    document_id: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": True, "sortable": False})
    document_name: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": False, "sortable": True})
    documentlink: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
    language: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": True, "sortable": False})
    locale: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": True, "sortable": False})
    revisionid: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
    source: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": True, "sortable": False})
    upload_date: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": True})
    version: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
    folder_id: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
    folder_path: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
    author: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": True, "sortable": False})
    description: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": False, "facetable": False, "sortable": False})
    mime_type: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": True, "sortable": False})
    parent_doc_id: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})


@dataclass
class PolicyHubIngestionResult:
    """Counts and timing returned after a completed ingestion run."""
    documents_loaded: int
    chunks_created: int
    chunks_indexed: int
    duration_ms: int

src/ingestion/pipeline.py

"""Ingestion pipeline β€” chunks, embeds, and indexes PolicyHub documents.

Pipeline (per document batch):
    PolicyHubAdminConnector  (connector/)    -> stream raw Documents
        -> RecursiveChunker                  -> split into overlapping chunks
        -> BatchEmbeddings                   -> embed each chunk via Azure OpenAI
        -> AzureAISearchVectorStore          -> write batch to Azure AI Search

Documents are processed in batches of DOC_BATCH_SIZE to keep memory bounded
and avoid HTTP 413 errors on the index upload step.

The index schema must be provisioned first via POST /createindex or
``python app.py createindex`` (uses AzureAISearchIndexBuilder with
``document_type=PolicyHubChunkDocument``).  This pipeline never touches
the index schema.
"""

import time
from typing import Iterator, List, Optional

from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
from gmf_forge_ai_data.chunkers import RecursiveChunker
from gmf_forge_ai_data.embeddings import AzureOpenAIEmbeddings, BatchEmbeddings
from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore

from config import AppConfig
from connector import PolicyHubAdminConnector
from .models import PolicyHubChunkDocument, PolicyHubIngestionResult

logger = BasicLogger(__name__)

# Number of source documents to chunk/embed/index per iteration.
# Tune this to balance memory usage vs. number of API round-trips.
_DOC_BATCH_SIZE = 50

# Maximum chunks per Azure AI Search upload call.
# Azure AI Search enforces a 16 MB per-request limit.  Each chunk carries
# ~6 KB of embedding (1536 Γ— float32) plus text and metadata, so 200 chunks
# β‰ˆ 3–4 MB β€” comfortably under the limit.  This also avoids the SDK bug
# where its internal 413-retry-split passes error_map twice (TypeError).
_CHUNK_UPLOAD_BATCH_SIZE = 200


def _batched(it: Iterator, size: int):
    """Yield successive lists of `size` items from iterator `it`."""
    batch = []
    for item in it:
        batch.append(item)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch


class PolicyHubIngestionPipeline:
    """Stream -> chunk -> embed -> index, one document batch at a time."""

    def __init__(self, config: AppConfig):
        self._cfg = config
        self._tracer = get_tracer()

    def run(self, max_docs: Optional[int] = None) -> PolicyHubIngestionResult:
        started = time.monotonic()
        cfg = self._cfg

        embedder = AzureOpenAIEmbeddings(
            endpoint=cfg.openai_endpoint,
            api_key=cfg.openai_api_key,
            deployment_name=cfg.embedding_deployment,
            ssl_cert_path=cfg.ssl_cert_path,
        )
        chunker = RecursiveChunker(
            chunk_size=cfg.chunk_size,
            chunk_overlap=cfg.chunk_overlap,
        )
        store = AzureAISearchVectorStore(
            endpoint=cfg.search_endpoint,
            index_name=cfg.search_index,
            api_key=cfg.search_api_key,
            embedding_dimension=cfg.embedding_dimension,
            document_type=PolicyHubChunkDocument,
        )

        total_docs = 0
        total_chunks_created = 0
        total_chunks_indexed = 0
        batch_num = 0

        with self._tracer.trace("policyhub_ingestion", input={"max_docs": max_docs}) as trace:
            connector = PolicyHubAdminConnector(
                wsdl_url=cfg.admin_wsdl_url,
                username=cfg.api_username,
                password=cfg.api_password,
                ssl_verify=cfg.ssl_cert_path is not None,
                ssl_cert_path=cfg.ssl_cert_path,
            )
            with connector:
                doc_stream = connector.stream(max_docs=max_docs)

                for doc_batch in _batched(doc_stream, _DOC_BATCH_SIZE):
                    batch_num += 1
                    total_docs += len(doc_batch)
                    batch_label = f"batch_{batch_num}"

                    with trace.span(batch_label, input={"docs": len(doc_batch), "total_so_far": total_docs}) as batch_span:

                        # --- Chunk ---
                        with batch_span.span("chunk", input={"docs": len(doc_batch)}) as span:
                            chunks: List[PolicyHubChunkDocument] = []
                            for doc in doc_batch:
                                meta = doc.metadata
                                for chunk in chunker.chunk(doc.content, metadata=meta):
                                    chunks.append(PolicyHubChunkDocument(
                                        id=f"{doc.id}_c{total_chunks_created + len(chunks)}",
                                        content=chunk.text,
                                        metadata=chunk.metadata,
                                        document_id=meta.get("document_id"),
                                        document_name=meta.get("document_name"),
                                        documentlink=meta.get("documentlink"),
                                        language=meta.get("language") or None,
                                        locale=meta.get("locale") or None,
                                        revisionid=meta.get("revisionid") or None,
                                        source=meta.get("source"),
                                        upload_date=meta.get("upload_date") or None,
                                        version=meta.get("version") or None,
                                        folder_id=meta.get("folder_id"),
                                        folder_path=meta.get("folder_path"),
                                        author=meta.get("author") or None,
                                        description=meta.get("description") or None,
                                        mime_type=meta.get("mime_type") or None,
                                        parent_doc_id=doc.id,
                                    ))
                            total_chunks_created += len(chunks)
                            span.set_output({"chunks": len(chunks)})

                        # --- Embed ---
                        with batch_span.span("embed", input={"chunks": len(chunks)}) as span:
                            vectors = BatchEmbeddings(
                                provider=embedder, show_progress=False
                            ).embed_batch([c.content for c in chunks])
                            for chunk, vector in zip(chunks, vectors):
                                chunk.embedding = vector
                            span.set_output({"embedded": len(vectors)})

                        # --- Index ---
                        # Split into sub-batches to stay under Azure AI Search's
                        # 16 MB per-request limit and avoid the SDK's broken
                        # 413-retry-split (error_map duplicate kwarg bug).
                        with batch_span.span("index", input={"chunks": len(chunks)}) as span:
                            for chunk_sub in _batched(iter(chunks), _CHUNK_UPLOAD_BATCH_SIZE):
                                store.add_documents(chunk_sub)
                            total_chunks_indexed += len(chunks)
                            span.set_output({"indexed": len(chunks)})

                        batch_span.set_output({
                            "chunks_in_batch": len(chunks),
                            "total_docs": total_docs,
                            "total_indexed": total_chunks_indexed,
                        })

                    logger.info(
                        "Batch complete",
                        batch=batch_num,
                        docs_in_batch=len(doc_batch),
                        chunks_in_batch=len(chunks),
                        total_docs=total_docs,
                        total_indexed=total_chunks_indexed,
                    )

            duration_ms = int((time.monotonic() - started) * 1000)
            trace.set_output({
                "documents": total_docs,
                "chunks": total_chunks_indexed,
                "duration_ms": duration_ms,
            })

        logger.info(
            "Ingestion complete",
            documents=total_docs,
            chunks_created=total_chunks_created,
            chunks_indexed=total_chunks_indexed,
            duration_ms=duration_ms,
        )
        return PolicyHubIngestionResult(
            documents_loaded=total_docs,
            chunks_created=total_chunks_created,
            chunks_indexed=total_chunks_indexed,
            duration_ms=duration_ms,
        )

src/query/__init__.py

from .pipeline import QueryPipeline, QueryResult
from .generation import GenerationPipeline, GenerationResult

__all__ = [
    "QueryPipeline",
    "QueryResult",
    "GenerationPipeline",
    "GenerationResult",
]

src/query/generation.py

"""Generation pipeline β€” assembles context into a structured LLM prompt and
returns a templated response.

Response template (LLM is instructed to produce exactly this):

    <SummarizedContent>
      LLM-generated answer to the user's question
    </SummarizedContent>
    <Citations>
      Verbatim excerpts from source documents that support the answer
    </Citations>
    <References>
      <Item1>url or document name</Item1>
      <Item2>url or document name</Item2>
    </References>
"""

import re
import time
from dataclasses import dataclass
from typing import Dict, List

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers.azure_openai_provider import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
from gmf_forge_ai_data.context import WindowedContext

from config import AppConfig
from registry import prompt_registry
from registry.llm_registry import llm_registry

logger = BasicLogger(__name__)


@dataclass
class GenerationResult:
    query: str
    answer: str                      # full templated LLM response
    summarized_content: str          # extracted <SummarizedContent> text
    citations: str                   # extracted <Citations> text
    references: List[str]            # extracted <Item*> values from <References>
    sources: List[Dict[str, str]]    # [{document_name, documentlink, score, rank}]
    duration_ms: int


class GenerationPipeline:
    """
    Assembles windowed context into a structured prompt and calls the LLM.

    The LLM is instructed to respond in a fixed XML-like template.  The
    template is parsed so each section (summary, citations, references) is
    returned separately for independent rendering in the API response or UI.
    """

    def __init__(self, config: AppConfig):
        self._tracer = get_tracer()
        # Pass all registered LLMs to UnifiedLLMGateway
        self._llm_gateway = UnifiedLLMGateway(
            provider_registry=llm_registry
        )
        self._deployment = config.chat_deployment

    async def generate(self, query: str, context: WindowedContext) -> GenerationResult:
        started = time.monotonic()

        context_blocks: List[str] = []
        sources: List[Dict[str, str]] = []
        # Maps source number (1-based) -> link or name, used to derive references
        # from whichever sources the LLM actually cited.
        source_map: Dict[int, str] = {}

        for i, result in enumerate(context.results, 1):
            doc = result.document
            meta = doc.metadata
            # Prefer typed fields (PolicyHubChunkDocument); fall back to metadata dict
            name = (
                getattr(doc, "document_name", None)
                or meta.get("document_name", meta.get("file_name", f"Document {i}"))
            )
            link = (
                getattr(doc, "documentlink", None)
                or meta.get("documentlink", meta.get("source", ""))
            )

            context_blocks.append(f"[Source {i}: {name}]\n{result.document.content}")
            sources.append({
                "document_name": name,
                "documentlink": link,
                "score": f"{result.score:.4f}",
                "rank": str(result.rank),
            })
            source_map[i] = link or name

        prompt = (
            prompt_registry.format("policyhub_system", context_blocks="\n\n".join(context_blocks))
            + "\n\n"
            + prompt_registry.format("policyhub_user", query=query)
        )

        with self._tracer.trace("policyhub_generation", input=query) as trace:
            with trace.generation("llm_response", model=self._deployment) as span:
                # Use UnifiedLLMGateway to make the call
                response = await self._llm_gateway.complete(prompt=prompt, temperature=0.2)
                answer = response.content if hasattr(response, "content") else str(response)
                span.set_output({"chars": len(answer), "sources": len(sources)})

        duration_ms = int((time.monotonic() - started) * 1000)
        summarized, citations, _ = _parse_template(answer)
        references = _cited_references(citations, source_map)
        logger.info("Generation complete", query=query, chars=len(answer), sources=len(sources))

        return GenerationResult(
            query=query,
            answer=answer,
            summarized_content=summarized,
            citations=citations,
            references=references,
            sources=sources,
            duration_ms=duration_ms,
        )


def _parse_template(text: str):
    """Extract the three sections from the LLM template response."""

    def _extract(tag: str) -> str:
        m = re.search(rf"<{tag}>(.*?)</{tag}>", text, re.DOTALL)
        return m.group(1).strip() if m else ""

    summarized = _extract("SummarizedContent")
    citations = _extract("Citations")
    return summarized, citations, None


def _cited_references(citations: str, source_map: Dict[int, str]) -> List[str]:
    """Return deduplicated links/names for only the sources cited by the LLM.

    Parses '[Source N:' patterns from the Citations block and maps each N back
    to the link or document name stored in source_map.
    """
    cited_nums = [int(m) for m in re.findall(r"\[Source\s+(\d+):", citations)]
    seen: set[str] = set()
    refs: List[str] = []
    for n in cited_nums:
        ref = source_map.get(n)
        if ref and ref not in seen:
            seen.add(ref)
            refs.append(ref)
    return refs

src/query/pipeline.py

"""Online query pipeline β€” rewrites, embeds, retrieves, filters, and windows context.

Pipeline:
    QueryRewriter               (LLM-powered query improvement)
        -> AzureOpenAIEmbeddings (embed the rewritten query)
        -> HybridRetriever       (vector + keyword search against Azure AI Search)
        -> RelevanceFilter       (drop results below the score threshold)
        -> ContextWindowManager  (fit remaining results into the token budget)
"""

import time
from dataclasses import dataclass

from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
from gmf_forge_ai_data.context import ContextWindowManager, RelevanceFilter, WindowedContext
from gmf_forge_ai_data.embeddings import AzureOpenAIEmbeddings
from gmf_forge_ai_data.query import QueryRewriter
from gmf_forge_ai_data.retrieval import HybridRetriever, RetrievalQuery
from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore

from config import AppConfig
from ingestion import PolicyHubChunkDocument
from registry.llm_registry import llm_registry

logger = BasicLogger(__name__)


@dataclass
class QueryResult:
    original_query: str
    rewritten_query: str
    context: WindowedContext
    duration_ms: int


class QueryPipeline:
    """
    Online query pipeline β€” runs per user request.

        QueryRewriter
            -> AzureOpenAIEmbeddings
            -> HybridRetriever
            -> RelevanceFilter
            -> ContextWindowManager
    """

    def __init__(self, config: AppConfig):
        cfg = config
        self._tracer = get_tracer()
        self._top_k = cfg.top_k

        self._embedder = AzureOpenAIEmbeddings(
            endpoint=cfg.openai_endpoint,
            api_key=cfg.openai_api_key,
            deployment_name=cfg.embedding_deployment,
            ssl_cert_path=cfg.ssl_cert_path,
        )
        self._retriever = HybridRetriever(
            AzureAISearchVectorStore(
                endpoint=cfg.search_endpoint,
                index_name=cfg.search_index,
                api_key=cfg.search_api_key,
                embedding_dimension=cfg.embedding_dimension,
                document_type=PolicyHubChunkDocument,
            )
        )
        self._filter = RelevanceFilter(min_score=cfg.min_relevance_score)
        self._window = ContextWindowManager(max_tokens=cfg.max_context_tokens)

        llm = UnifiedLLMGateway(
            provider_registry=llm_registry
        )
        self._rewriter = QueryRewriter(llm_gateway=llm)

    async def run(self, query_text: str) -> QueryResult:
        started = time.monotonic()

        with self._tracer.trace("policyhub_query", input=query_text) as trace:

            # 1. Rewrite
            with trace.span("rewrite", input=query_text) as span:
                rewritten = await self._rewriter.rewrite(query_text)
                span.set_output({"rewritten": rewritten.rewritten})
            logger.info("Query rewritten", original=query_text, rewritten=rewritten.rewritten)

            # 2. Embed
            with trace.span("embed", input=rewritten.rewritten) as span:
                embedding = self._embedder.embed_text(rewritten.rewritten)
                span.set_output({"dimension": len(embedding)})

            # 3. Retrieve
            with trace.span("retrieve", input={"query": rewritten.rewritten, "top_k": self._top_k}) as span:
                results = self._retriever.retrieve(
                    RetrievalQuery(text=rewritten.rewritten, embedding=embedding, top_k=self._top_k)
                )
                span.set_output({"results": len(results)})
            logger.info("Retrieved chunks", count=len(results))

            # 4. Filter
            with trace.span("filter", input={"results": len(results)}) as span:
                filtered = self._filter.filter(results)
                span.set_output({"kept": len(filtered), "dropped": len(results) - len(filtered)})
            logger.info("Filtered chunks", kept=len(filtered), dropped=len(results) - len(filtered))

            # 5. Window
            with trace.span("window", input={"results": len(filtered)}) as span:
                context = self._window.fit(filtered)
                span.set_output({"tokens": context.total_tokens, "docs": len(context.results)})

            duration_ms = int((time.monotonic() - started) * 1000)
            trace.set_output({"rewritten": rewritten.rewritten, "context_docs": len(context.results)})

        return QueryResult(
            original_query=query_text,
            rewritten_query=rewritten.rewritten,
            context=context,
            duration_ms=duration_ms,
        )

src/registry/__init__.py

from .prompt_registry import prompt_registry
from .llm_registry import llm_registry

__all__ = ["prompt_registry", "llm_registry"]

src/registry/llm_registry.py

"""LLM Provider registry for managing and registering LLM configurations.

This file centralizes the registration of LLM providers, allowing the application
components to retrieve LLM instances dynamically based on configuration.
"""

from gmf_forge_ai_shared_core.registry import LLMProviderRegistry
from config import load_config
from gmf_forge_ai_shared_core.llm_gateway.providers.azure_openai_provider import AzureOpenAIProvider
import os

llm_registry = LLMProviderRegistry()

# Load configuration from AppConfig
config = load_config()

# Resolve ssl_cert_path directly from the environment
ssl_cert_path = config.ssl_cert_path

if not os.path.exists(ssl_cert_path):
    raise FileNotFoundError(f"SSL certificate not found at {ssl_cert_path}")

# Create an instance of AzureOpenAIProvider
azure_openai_provider = AzureOpenAIProvider(
    endpoint=config.openai_endpoint,
    api_key=config.openai_api_key,
    deployment_name=config.chat_deployment,
    api_version=config.chat_api_version,
    ssl_cert_path=ssl_cert_path,
)

# Register the provider
llm_registry.register(
    name="openai",
    provider=azure_openai_provider
)

src/registry/prompt_registry.py

"""Prompt registry for the PolicyHub generation pipeline.

All LLM prompts are versioned and registered here. To iterate on a prompt,
add a new registration with a bumped version β€” the pipeline always picks up
the latest version automatically via PromptRegistry.get().
"""

from gmf_forge_ai_shared_core.registry import PromptRegistry

prompt_registry = PromptRegistry()

prompt_registry.register(
    name="policyhub_system",
    version="1.0",
    variables=["context_blocks"],
    description="System prompt for the PolicyHub corporate policy assistant.",
    template="""\
You are a corporate policy assistant helping employees understand company policies \
and compliance requirements.

Answer the user's question using ONLY the provided policy document excerpts below.
If the answer cannot be determined from the provided context, state that clearly.

You MUST format your response using EXACTLY this structure β€” do not add any text \
outside these tags:

<SummarizedContent>
Write a clear, concise answer to the user's question based solely on the policy \
documents. Use plain language. Use bullet points if listing multiple items.
</SummarizedContent>
<Citations>
For each document excerpt that directly supports your answer, quote it verbatim \
on its own line using this format:
[Document Name] "exact quoted text"
</Citations>
<References>
List the URL (or document name if no URL) for each source you actually cited above,
one entry per unique document using <Item1>, <Item2>, ... format.
Omit any sources you did not cite. If nothing was cited, leave this section empty.
</References>

--- Policy Document Excerpts ---

{context_blocks}""",
)

prompt_registry.register(
    name="policyhub_user",
    version="1.0",
    variables=["query"],
    description="User turn prompt wrapping the employee's question.",
    template="Question: {query}",
)