README.md
# basic-rag-app
Retrieval-Augmented Generation over PolicyHub corporate policy documents.
Exposes a FastAPI server and a CLI for offline ingestion and ad-hoc querying.
---
## Architecture
```
[STEP 1 β POST /createindex or python app.py createindex]
AzureAISearchIndexBuilder (gmf_forge_ai_data.indexing)
document_type=PolicyHubChunkDocument β full field schema
.create_or_replace_index()
[STEP 2 β POST /ingest or python app.py ingest]
connector/PolicyHubAdminConnector (SoapConnector β SOAP Admin API)
β RecursiveChunker (gmf_forge_ai_data.chunkers)
β BatchEmbeddings (gmf_forge_ai_data.embeddings / Azure OpenAI)
β AzureAISearchVectorStore (gmf_forge_ai_data.vector_stores)
[QUERY β POST /query or python app.py query "β¦"]
query/QueryPipeline
QueryRewriter (gmf_forge_ai_data.query / LLM)
β AzureOpenAIEmbeddings (gmf_forge_ai_data.embeddings)
β HybridRetriever (gmf_forge_ai_data.retrieval)
β RelevanceFilter (gmf_forge_ai_data.context)
β ContextWindowManager (gmf_forge_ai_data.context)
query/GenerationPipeline
β UnifiedLLMGateway (gmf_forge_ai_shared_core.llm_gateway)
β structured XML response
```
---
## Source layout
```
apps/basic-rag-app/
βββ .env # credentials and tuning (not committed)
βββ postman-environment.json # Postman environment β import to test the API
βββ setup.py
βββ src/
βββ app.py # FastAPI server + CLI entry point
βββ config.py # AppConfig dataclass β loaded from .env once
βββ connector/
β βββ __init__.py
β βββ policyhub.py # PolicyHubAdminConnector (SoapConnector subclass)
βββ ingestion/
β βββ __init__.py
β βββ models.py # PolicyHubChunkDocument, PolicyHubIngestionResult
β βββ pipeline.py # PolicyHubIngestionPipeline (loadβchunkβembedβindex)
βββ query/
βββ __init__.py
βββ pipeline.py # QueryPipeline (rewriteβembedβretrieveβfilterβwindow)
βββ generation.py # GenerationPipeline (promptβLLMβparse XML response)
```
### Index schema
`PolicyHubChunkDocument` (in `ingestion/models.py`) extends the base `Document`
class with PolicyHub-specific fields. When passed as `document_type` to
`AzureAISearchIndexBuilder`, each field becomes a dedicated, queryable Azure AI
Search field β not a value buried in a JSON blob.
| Field | Type | Notes |
|---|---|---|
| `id` | String (key) | chunk identifier |
| `content` | String | chunk text, full-text searchable |
| `embedding` | Collection(Single) | 1536-dim HNSW vector |
| `document_id` | String | PolicyHub document ID |
| `document_name` | String | document title, searchable |
| `documentlink` | String | URL to document in PolicyHub |
| `language` | String | facetable |
| `locale` | String | facetable |
| `revisionid` | String | filterable |
| `source` | String | WSDL endpoint, facetable |
| `upload_date` | String | filterable, sortable |
| `version` | String | filterable |
| `folder_id` | String | filterable |
| `folder_path` | String | filterable |
| `author` | String | filterable |
| `description` | String | filterable |
| `mime_type` | String | filterable |
| `parent_doc_id` | String | ID of the unchunked source document |
### Response format
Every `/query` call returns three parsed sections alongside the raw answer:
```xml
<SummarizedContent>
LLM-generated answer based solely on retrieved policy documents
</SummarizedContent>
<Citations>
[Document Name] "verbatim excerpt that supports the answer"
</Citations>
<References>
<Item1>https://β¦/PolicyHub/Document/465</Item1>
<Item2>https://β¦/PolicyHub/Document/1498</Item2>
</References>
```
---
## Prerequisites
- Python 3.11+
- Access to the PolicyHub SOAP Admin API (`AdminApi.svc?wsdl`)
- Azure OpenAI resource with embedding + chat models deployed
- Azure AI Search service (Basic tier or higher, `2024-07-01` API or newer)
---
## Installation
```bash
# From the monorepo root β install in editable mode
pip install -e packages/shared-core
pip install -e packages/data-layer
pip install -e apps/basic-rag-app
```
---
## Configuration
Place `.env` at `apps/basic-rag-app/.env` and fill in your values:
```ini
# ββ Auth ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
APP_API_KEY=your-secret-api-key
# ββ Azure OpenAI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key
AZURE_OPENAI_EMBEDDING_MODEL=text-embedding-ada-002
AZURE_OPENAI_CHAT_DEPLOYMENT=gpt-4o-mini
AZURE_OPENAI_CHAT_API_VERSION=2025-01-01-preview
# ββ Azure AI Search βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
AZURE_SEARCH_ENDPOINT=https://your-search.search.windows.net
AZURE_SEARCH_API_KEY=your-search-api-key
AZURE_SEARCH_INDEX=policyhub-docs
# ββ PolicyHub SOAP ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
MITRATECH_ADMIN_WSDL_URL=https://your-instance.policyhub.com/PolicyHubAPI/AdminApi.svc?wsdl
MITRATECH_API_USERNAME=your-username
MITRATECH_API_PASSWORD=your-password
# ββ TLS (optional β only needed on corporate networks) ββββββββββββββββββββββββ
SSL_CERT_PATH=../../certs/gmf_and_public_cas.pem
# ββ RAG tuning (optional β defaults shown) ββββββββββββββββββββββββββββββββββββ
CHUNK_SIZE=512
CHUNK_OVERLAP=64
TOP_K=5
MIN_RELEVANCE_SCORE=0.01
MAX_CONTEXT_TOKENS=4000
EMBEDDING_DIMENSION=1536
# ββ API server (optional β defaults shown) ββββββββββββββββββββββββββββββββββββ
API_HOST=0.0.0.0
API_PORT=8000
```
---
## Usage
All commands are run from `apps/basic-rag-app/`:
```bash
cd apps/basic-rag-app
```
### 1. Create the index (run once before first ingest)
Provisions the full Azure AI Search index schema including all `PolicyHubChunkDocument`
fields. **If the index already exists it is dropped and recreated β all documents are
lost.** Run once, or when the schema must change.
```bash
python src/app.py createindex
```
### 2. Ingest policy documents
```bash
# Ingest all documents
python src/app.py ingest
# Ingest a limited sample (development / testing)
python src/app.py ingest --max-docs 50
```
### 3. Start the API server
```bash
python src/app.py serve
```
API: `http://localhost:8000`
Interactive docs: `http://localhost:8000/docs`
### 4. Query from the CLI
```bash
python src/app.py query "What is the cybersecurity incident response policy?"
```
---
## API Reference
All protected endpoints require the header `X-API-Key: <APP_API_KEY>`.
### `GET /health` β no auth
```json
{ "status": "ok", "index": "policyhub-docs", "index_status": "ready", "doc_count": 4231 }
```
`index_status` is `"not_created"` when the index does not yet exist (server stays healthy).
### `GET /stats` β auth required
```json
{
"index": "policyhub-docs",
"index_status": "ready",
"doc_count": 4231,
"chunk_size": 512,
"chunk_overlap": 64,
"top_k": 5,
"min_relevance_score": 0.75,
"max_context_tokens": 4000
}
```
### `POST /createindex` β auth required
No request body. Provisions the index; replaces if it already exists.
```json
{
"index_name": "policyhub-docs",
"existed": false,
"replaced": false,
"message": "Index 'policyhub-docs' created successfully."
}
```
### `POST /ingest` β auth required
```json
{ "max_docs": 100 }
```
Response:
```json
{
"documents_loaded": 100,
"chunks_created": 631,
"chunks_indexed": 631,
"duration_ms": 142300
}
```
### `POST /query` β auth required
```json
{ "query": "What is the data classification policy?" }
```
Response:
```json
{
"query": "What is the data classification policy?",
"rewritten_query": "data classification policy requirements tiers",
"answer": "<SummarizedContent>β¦</SummarizedContent><Citations>β¦</Citations><References>β¦</References>",
"summarized_content": "All corporate data must be classified into one of four tiersβ¦",
"citations": "[Data Classification Policy] \"All data must be classifiedβ¦\"",
"references": [
"https://your-instance.policyhub.com/PolicyHub/Document/312",
"https://your-instance.policyhub.com/PolicyHub/Document/874"
],
"sources": [
{
"document_name": "Data Classification Policy",
"documentlink": "https://β¦/PolicyHub/Document/312",
"score": "0.9142",
"rank": "0"
}
],
"duration_ms": 1843
}
```
---
## Logging
Set `LOG_LEVEL=DEBUG` in `.env` for verbose SOAP and retrieval traces.
Default (`INFO`) shows pipeline milestones and warnings only.
policyhub-rag-api.postman_collection.json
{
"info": {
"name": "PolicyHub RAG API",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"variable": [
{ "key": "base_url", "value": "http://localhost:8000" },
{ "key": "api_key", "value": "your-secret-api-key-here" }
],
"item": [
{
"name": "Health",
"request": {
"method": "GET",
"url": "{{base_url}}/health"
}
},
{
"name": "Stats",
"request": {
"method": "GET",
"url": "{{base_url}}/stats",
"header": [{ "key": "X-API-Key", "value": "{{api_key}}" }]
}
},
{
"name": "Create Index",
"request": {
"method": "POST",
"url": "{{base_url}}/createindex",
"header": [{ "key": "X-API-Key", "value": "{{api_key}}" }]
}
},
{
"name": "Ingest β all documents",
"request": {
"method": "POST",
"url": "{{base_url}}/ingest",
"header": [
{ "key": "X-API-Key", "value": "{{api_key}}" },
{ "key": "Content-Type", "value": "application/json" }
],
"body": { "mode": "raw", "raw": "{}" }
}
},
{
"name": "Ingest β sample 50 docs",
"request": {
"method": "POST",
"url": "{{base_url}}/ingest",
"header": [
{ "key": "X-API-Key", "value": "{{api_key}}" },
{ "key": "Content-Type", "value": "application/json" }
],
"body": { "mode": "raw", "raw": "{\"max_docs\": 50}" }
}
},
{
"name": "Query β cybersecurity policy",
"request": {
"method": "POST",
"url": "{{base_url}}/query",
"header": [
{ "key": "X-API-Key", "value": "{{api_key}}" },
{ "key": "Content-Type", "value": "application/json" }
],
"body": {
"mode": "raw",
"raw": "{\"query\": \"What is the cybersecurity incident response policy?\"}"
}
}
},
{
"name": "Query β data classification",
"request": {
"method": "POST",
"url": "{{base_url}}/query",
"header": [
{ "key": "X-API-Key", "value": "{{api_key}}" },
{ "key": "Content-Type", "value": "application/json" }
],
"body": {
"mode": "raw",
"raw": "{\"query\": \"What are the data classification requirements?\"}"
}
}
},
{
"name": "Query β password requirements",
"request": {
"method": "POST",
"url": "{{base_url}}/query",
"header": [
{ "key": "X-API-Key", "value": "{{api_key}}" },
{ "key": "Content-Type", "value": "application/json" }
],
"body": {
"mode": "raw",
"raw": "{\"query\": \"What are the password requirements?\"}"
}
}
},
{
"name": "Query β auth failure (401)",
"request": {
"method": "POST",
"url": "{{base_url}}/query",
"header": [
{ "key": "X-API-Key", "value": "wrong-key" },
{ "key": "Content-Type", "value": "application/json" }
],
"body": {
"mode": "raw",
"raw": "{\"query\": \"test\"}"
}
}
}
]
}
policyhub-rag-api.postman_environment.json
{
"id": "policyhub-rag-api-env",
"name": "PolicyHub RAG API β Local",
"values": [
{
"key": "base_url",
"value": "http://localhost:8000",
"type": "default",
"enabled": true
},
{
"key": "api_key",
"value": "<some_api_key>",
"type": "secret",
"enabled": true
}
],
"_postman_variable_scope": "environment",
"_postman_exported_at": "2026-05-26T00:00:00.000Z",
"_postman_exported_using": "Postman/11.0"
}
setup.py
"""Basic RAG Application."""
from setuptools import setup, find_packages
setup(
name="basic-rag-app",
version="0.1.0",
packages=find_packages(where="src"),
package_dir={"": "src"},
python_requires=">=3.11",
install_requires=[
"gmf-forge-ai-shared-core>=1.0.0",
"gmf-forge-ai-data>=1.0.0",
"fastapi>=0.111.0",
"uvicorn[standard]>=0.29.0",
"pydantic>=2.0.0",
"python-dotenv>=1.0.0",
],
entry_points={
"console_scripts": [
"basic-rag-app=app:main",
],
},
)
src/app.py
"""basic-rag-app β FastAPI server + CLI entry point.
Endpoints:
GET /health liveness probe β no auth required
GET /stats index statistics (X-API-Key required)
POST /ingest run ingestion pipeline (X-API-Key required)
POST /query run query + generation pipeline (X-API-Key required)
CLI:
python app.py serve # start FastAPI server
python app.py ingest [--max-docs N] # offline ingest then exit
python app.py query "your question" # single query then exit
Auth:
All protected endpoints require: X-API-Key: <APP_API_KEY>
Query response format:
<SummarizedContent> β¦ </SummarizedContent>
<Citations> β¦ </Citations>
<References> β¦ </References>
"""
import argparse
import asyncio
import secrets
from contextlib import asynccontextmanager
from typing import Optional
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import APIKeyHeader
from pydantic import BaseModel
from azure.core.exceptions import ResourceNotFoundError
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_data.indexing import AzureAISearchIndexBuilder
from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore
from config import AppConfig, load_config
from ingestion import PolicyHubIngestionPipeline, PolicyHubChunkDocument, PolicyHubIngestionResult
from query import QueryPipeline, GenerationPipeline
logger = BasicLogger(__name__)
# ββ Auth βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=True)
def _make_auth_dep(config: AppConfig):
"""Return a FastAPI dependency that validates the X-API-Key header."""
def _dep(key: str = Security(_API_KEY_HEADER)) -> None:
if not secrets.compare_digest(key, config.app_api_key):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
)
return _dep
# ββ Request / response models ββββββββββββββββββββββββββββββββββββββββββββββββββ
class CreateIndexResponse(BaseModel):
index_name: str
existed: bool
replaced: bool
message: str
class IngestRequest(BaseModel):
max_docs: Optional[int] = None
class IngestResponse(BaseModel):
documents_loaded: int
chunks_created: int
chunks_indexed: int
duration_ms: int
class QueryRequest(BaseModel):
query: str
class QueryResponse(BaseModel):
summarized_content: str
citations: str
references: list[str]
# ββ Application factory ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def create_app(config: Optional[AppConfig] = None) -> FastAPI:
cfg = config or load_config()
auth = _make_auth_dep(cfg)
ingestion = PolicyHubIngestionPipeline(cfg)
query_pipeline = QueryPipeline(cfg)
generation = GenerationPipeline(cfg)
@asynccontextmanager
async def lifespan(_app: FastAPI):
logger.info(
"basic-rag-app started",
index=cfg.search_index,
host=cfg.api_host,
port=cfg.api_port,
)
yield
logger.info("basic-rag-app shutting down")
app = FastAPI(
title="PolicyHub RAG API",
description=(
"Retrieval-Augmented Generation over PolicyHub corporate policy documents. "
"Ingest documents via POST /ingest, then query via POST /query."
),
version="0.1.0",
lifespan=lifespan,
)
def _store() -> AzureAISearchVectorStore:
return AzureAISearchVectorStore(
endpoint=cfg.search_endpoint,
index_name=cfg.search_index,
api_key=cfg.search_api_key,
embedding_dimension=cfg.embedding_dimension,
)
# ββ Routes βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/health")
async def health():
"""Liveness probe β no auth required."""
try:
doc_count = _store().count()
index_status = "ready"
except ResourceNotFoundError:
doc_count = 0
index_status = "not_created"
return {"status": "ok", "index": cfg.search_index, "index_status": index_status, "doc_count": doc_count}
@app.get("/stats", dependencies=[Depends(auth)])
async def stats():
"""Index and RAG configuration statistics."""
try:
doc_count = _store().count()
index_status = "ready"
except ResourceNotFoundError:
doc_count = 0
index_status = "not_created"
return {
"index": cfg.search_index,
"index_status": index_status,
"doc_count": doc_count,
"chunk_size": cfg.chunk_size,
"chunk_overlap": cfg.chunk_overlap,
"top_k": cfg.top_k,
"min_relevance_score": cfg.min_relevance_score,
"max_context_tokens": cfg.max_context_tokens,
}
@app.post("/createindex", response_model=CreateIndexResponse, dependencies=[Depends(auth)])
async def create_index():
"""
Create or replace the Azure AI Search index using AzureAISearchIndexBuilder.
**Warning:** if the index already exists it is dropped and recreated β
all indexed documents are lost. Call this once before the first ingest,
or when the schema must change (e.g. different embedding dimension).
Do NOT call before every ingest run.
"""
def _run():
builder = AzureAISearchIndexBuilder(
endpoint=cfg.search_endpoint,
api_key=cfg.search_api_key,
index_name=cfg.search_index,
embedding_dimension=cfg.embedding_dimension,
document_type=PolicyHubChunkDocument,
ssl_cert_path=cfg.ssl_cert_path,
semantic_config={
"name": "policyhub-semantic-config",
"title_field": "document_name",
"content_fields": ["content"],
"keyword_fields": ["language", "locale", "source"],
},
)
existed = builder.index_exists()
builder.create_or_replace_index()
return existed
existed = await asyncio.get_event_loop().run_in_executor(None, _run)
action = "replaced" if existed else "created"
return CreateIndexResponse(
index_name=cfg.search_index,
existed=existed,
replaced=existed,
message=f"Index '{cfg.search_index}' {action} successfully.",
)
@app.post("/ingest", response_model=IngestResponse, dependencies=[Depends(auth)])
async def ingest(body: IngestRequest):
"""
Run the full ingestion pipeline. Replaces the existing index.
Pass ``max_docs`` to cap documents loaded (useful for testing).
"""
logger.info("Ingestion started", max_docs=body.max_docs)
result: PolicyHubIngestionResult = await asyncio.get_event_loop().run_in_executor(
None, lambda: ingestion.run(max_docs=body.max_docs)
)
logger.info(
"Ingestion complete",
documents=result.documents_loaded,
chunks=result.chunks_indexed,
duration_ms=result.duration_ms,
)
return IngestResponse(
documents_loaded=result.documents_loaded,
chunks_created=result.chunks_created,
chunks_indexed=result.chunks_indexed,
duration_ms=result.duration_ms,
)
@app.post("/query", response_model=QueryResponse, dependencies=[Depends(auth)])
async def query(body: QueryRequest):
"""
Query the indexed policy documents.
Returns a structured response:
<SummarizedContent> LLM answer </SummarizedContent>
<Citations> Source excerpts </Citations>
<References> Source URLs/names </References>
"""
if not body.query.strip():
raise HTTPException(status_code=400, detail="Query must not be empty")
query_result = await query_pipeline.run(body.query)
gen_result = await generation.generate(body.query, query_result.context)
return QueryResponse(
summarized_content=gen_result.summarized_content,
citations=gen_result.citations,
references=gen_result.references,
)
return app
# ββ CLI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _cli_ingest(cfg: AppConfig, max_docs: Optional[int]) -> None:
result = PolicyHubIngestionPipeline(cfg).run(max_docs=max_docs)
logger.info(
"Ingestion complete",
documents=result.documents_loaded,
chunks=result.chunks_indexed,
duration_ms=result.duration_ms,
)
async def _cli_query(cfg: AppConfig, query_text: str) -> None:
qp = QueryPipeline(cfg)
gp = GenerationPipeline(cfg)
query_result = await qp.run(query_text)
gen_result = await gp.generate(query_text, query_result.context)
print(gen_result.answer)
def main() -> None:
parser = argparse.ArgumentParser(prog="basic-rag-app")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("serve", help="Start the FastAPI server")
sub.add_parser("createindex", help="Create or replace the Azure AI Search index and exit")
p_ingest = sub.add_parser("ingest", help="Run ingestion pipeline and exit")
p_ingest.add_argument("--max-docs", type=int, default=None,
help="Cap the number of documents loaded from PolicyHub")
p_query = sub.add_parser("query", help="Run a single query and exit")
p_query.add_argument("text", help="Query text")
args = parser.parse_args()
cfg = load_config()
if args.command == "serve":
uvicorn.run(create_app(cfg), host=cfg.api_host, port=cfg.api_port)
elif args.command == "createindex":
builder = AzureAISearchIndexBuilder(
endpoint=cfg.search_endpoint,
api_key=cfg.search_api_key,
index_name=cfg.search_index,
embedding_dimension=cfg.embedding_dimension,
document_type=PolicyHubChunkDocument,
ssl_cert_path=cfg.ssl_cert_path,
semantic_config={
"name": "policyhub-semantic-config",
"title_field": "document_name",
"content_fields": ["content"],
"keyword_fields": ["language", "locale", "source"],
},
)
existed = builder.index_exists()
builder.create_or_replace_index()
action = "replaced" if existed else "created"
logger.info(f"Index {action}", index=cfg.search_index)
elif args.command == "ingest":
_cli_ingest(cfg, max_docs=args.max_docs)
elif args.command == "query":
asyncio.run(_cli_query(cfg, args.text))
if __name__ == "__main__":
main()
src/config.py
"""Application configuration β loaded once from environment variables."""
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from dotenv import load_dotenv
from gmf_forge_ai_shared_core.observability import BasicLogger
logger = BasicLogger(__name__)
_ENV_PATH = Path(__file__).parent.parent / ".env"
@dataclass
class AppConfig:
# ββ Auth ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
app_api_key: str # X-API-Key header value expected on all endpoints
# ββ PolicyHub SOAP source βββββββββββββββββββββββββββββββββββββββββββββββββ
admin_wsdl_url: str
api_username: str
api_password: str
# ββ Azure OpenAI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
openai_endpoint: str
openai_api_key: str
embedding_deployment: str # e.g. text-embedding-ada-002-2policychatbot
chat_deployment: str # e.g. gpt-4o
chat_api_version: str
# ββ Azure AI Search βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
search_endpoint: str
search_api_key: str
# ββ Optional infrastructure βββββββββββββββββββββββββββββββββββββββββββββββ
ssl_cert_path: Optional[str] = None
# ββ Index / RAG tuning ββββββββββββββββββββββββββββββββββββββββββββββββββββ
search_index: str = "policyhub-docs"
embedding_dimension: int = 1536
chunk_size: int = 512
chunk_overlap: int = 64
top_k: int = 5
min_relevance_score: float = 0.01 # HybridRetriever (Azure RRF) scores max at ~0.05
max_context_tokens: int = 4000
# ββ API server ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
api_host: str = "0.0.0.0"
api_port: int = 8000
def load_config() -> AppConfig:
"""Load and validate AppConfig from environment variables.
Raises:
ValueError: if any required environment variable is missing.
"""
load_dotenv(_ENV_PATH)
missing: list[str] = []
def _require(name: str) -> str:
val = os.getenv(name, "").strip()
if not val:
missing.append(name)
return val
def _optional(name: str, default: str = "") -> str:
return os.getenv(name, default).strip() or default
config = AppConfig(
app_api_key=_require("APP_API_KEY"),
admin_wsdl_url=_require("MITRATECH_ADMIN_WSDL_URL"),
api_username=_require("MITRATECH_API_USERNAME"),
api_password=_require("MITRATECH_API_PASSWORD"),
openai_endpoint=_require("AZURE_OPENAI_ENDPOINT"),
openai_api_key=_require("AZURE_OPENAI_API_KEY"),
embedding_deployment=_require("AZURE_OPENAI_EMBEDDING_MODEL"),
chat_deployment=_require("AZURE_OPENAI_CHAT_DEPLOYMENT"),
search_endpoint=_require("AZURE_SEARCH_ENDPOINT"),
search_api_key=_require("AZURE_SEARCH_API_KEY"),
chat_api_version=_optional("AZURE_OPENAI_CHAT_API_VERSION", "2024-02-15-preview"),
ssl_cert_path=_optional("SSL_CERT_PATH") or None,
search_index=_optional("AZURE_SEARCH_INDEX", "policyhub-docs"),
embedding_dimension=int(_optional("EMBEDDING_DIMENSION", "1536")),
chunk_size=int(_optional("CHUNK_SIZE", "512")),
chunk_overlap=int(_optional("CHUNK_OVERLAP", "64")),
top_k=int(_optional("TOP_K", "5")),
min_relevance_score=float(_optional("MIN_RELEVANCE_SCORE", "0.75")),
max_context_tokens=int(_optional("MAX_CONTEXT_TOKENS", "4000")),
api_host=_optional("API_HOST", "0.0.0.0"),
api_port=int(_optional("API_PORT", "8000")),
)
if missing:
raise ValueError(f"Missing required environment variables: {', '.join(missing)}")
logger.info(
"Configuration loaded",
index=config.search_index,
top_k=config.top_k,
chunk_size=config.chunk_size,
)
return config
src/connector/__init__.py
from .policyhub import PolicyHubAdminConnector
__all__ = ["PolicyHubAdminConnector"]
src/connector/policyhub.py
"""PolicyHub Admin API SOAP connector β local to the basic-rag-app.
Traverses the full PolicyHub folder tree, fetches document content, and
returns ``Document`` objects ready for chunking and indexing.
The developer example version (with dry-run mode and MyLibrary support)
lives in packages/data-layer/examples/soap_connector_example.py.
"""
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_data.connectors import SoapConnector
from gmf_forge_ai_data.vector_stores import Document
logger = BasicLogger(__name__)
_CREDENTIALS_QNAME = (
"{http://schemas.datacontract.org/2004/07/"
"HitecLabs.PolicyHub.Api.Wcf.Security}Credentials"
)
class PolicyHubAdminConnector(SoapConnector):
"""
Connects to the PolicyHub Admin API via SOAP and loads documents.
Usage::
with PolicyHubAdminConnector(wsdl_url=..., username=..., password=...) as conn:
docs = conn.load(max_docs=50)
"""
def __init__(self, folder_ids: Optional[List[str]] = None, **kwargs):
super().__init__(credentials_type_qname=_CREDENTIALS_QNAME, **kwargs)
self.folder_ids = folder_ids
# ββ Raw API calls ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_library(self) -> Dict[str, Any]:
return self.call("GetLibrary")
def get_documents_by_folder(self, folder_id: str) -> Dict[str, Any]:
return self.call("GetDocumentsByFolder", folderId=folder_id)
def get_document_data(self, document_id: str) -> Dict[str, Any]:
return self.call("GetDocumentData", documentId=document_id)
# ββ High-level loader ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def load(self, max_docs: Optional[int] = None) -> List[Document]:
"""Traverse the library tree and return Document objects.
Args:
max_docs: Cap the total returned. ``None`` = entire library.
"""
return list(self.stream(max_docs=max_docs))
def stream(self, max_docs: Optional[int] = None) -> Iterator[Document]:
"""Yield Documents one at a time without accumulating the full list.
Suitable for batch-processing pipelines that want to chunk/embed/index
each document (or small group of documents) without holding the entire
corpus in memory.
Args:
max_docs: Stop after this many documents. ``None`` = entire library.
"""
library = self.get_library()
folders = list(self._traverse_folders(library))
if self.folder_ids:
folders = [f for f in folders if f["FolderId"] in self.folder_ids]
count = 0
base_url = self.wsdl_url.split("/PolicyHubAPI/")[0]
for folder in folders:
if max_docs is not None and count >= max_docs:
return
folder_id = folder["FolderId"]
folder_path = folder.get("LibraryPath", folder.get("Name", ""))
try:
result = self.get_documents_by_folder(folder_id)
except Exception as exc:
logger.debug("Skipping folder", folder_id=folder_id, error=str(exc))
continue
raw_docs = _extract_list(result, "PolicyHubDocument")
if not raw_docs:
continue
for raw in raw_docs:
if max_docs is not None and count >= max_docs:
return
doc = self._build_document(raw, folder_id, folder_path, base_url)
if doc:
count += 1
yield doc
logger.info("PolicyHub stream complete", total_documents=count)
def _build_document(
self,
raw: Dict[str, Any],
folder_id: str,
folder_path: str,
base_url: str,
) -> Optional[Document]:
doc_id = raw.get("DocumentId")
doc_title = raw.get("Name", "Untitled")
try:
data = self.get_document_data(doc_id)
except Exception as exc:
logger.warning("Could not fetch document data",
document_id=doc_id, title=doc_title, error=str(exc))
return None
mime_type = (data.get("MimeType") or "") if data else ""
revision_id = str(data.get("RevisionId", "")) if data else ""
version = str(data.get("Version", "")) if data else ""
language = (data.get("Language") or "") if data else ""
locale = (data.get("Locale") or "") if data else ""
raw_bytes = data.get("Data") if data else None
ext = _mime_to_ext(mime_type)
content = self.extract_text(raw_bytes or b"", f"{doc_title}{ext}")
if not content.strip():
logger.debug("Skipping empty document", document_id=doc_id, title=doc_title)
return None
return Document(
id=self._make_doc_id(str(doc_id)),
content=content.strip(),
timestamp=_parse_date(raw.get("LastModifiedDateUtc")),
metadata={
"document_id": str(doc_id),
"document_name": doc_title,
"documentlink": f"{base_url}/PolicyHub/Document/{doc_id}",
"language": language,
"locale": locale,
"revisionid": revision_id,
"source": self.wsdl_url,
"upload_date": str(raw.get("PublishedDateUtc", "")),
"version": version,
"folder_id": str(folder_id),
"folder_path": folder_path,
"description": raw.get("Description", ""),
"author": raw.get("AuthorName", ""),
"mime_type": mime_type,
"file_name": f"{doc_title}{ext}",
"created_at": str(raw.get("CreationDateUtc", "")),
"modified_at": str(raw.get("LastModifiedDateUtc", "")),
},
)
def _traverse_folders(self, node: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
yield node
children = (node.get("Children") or {}).get("PolicyHubFolder", [])
if not isinstance(children, list):
children = [children]
for child in children:
if isinstance(child, dict):
yield from self._traverse_folders(child)
# ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _extract_list(result: Any, key: str) -> List[Dict[str, Any]]:
if isinstance(result, list):
return result
if isinstance(result, dict):
val = result.get(key)
if val is None:
return []
return val if isinstance(val, list) else [val]
return []
def _mime_to_ext(mime_type: str) -> str:
mime = (mime_type or "").lower()
if "pdf" in mime:
return ".pdf"
if "word" in mime or "document" in mime or "officedocument" in mime:
return ".docx"
if "text" in mime:
return ".txt"
return ".bin"
def _parse_date(value: Any) -> Optional[datetime]:
if value is None:
return None
if isinstance(value, datetime):
return value
try:
return datetime.fromisoformat(str(value))
except Exception:
return None
src/ingestion/__init__.py
from .models import PolicyHubChunkDocument, PolicyHubIngestionResult
from .pipeline import PolicyHubIngestionPipeline
__all__ = [
"PolicyHubChunkDocument",
"PolicyHubIngestionResult",
"PolicyHubIngestionPipeline",
]
src/ingestion/models.py
"""Domain models for the basic-rag-app.
PolicyHubChunkDocument β Document subclass with PolicyHub metadata as typed
indexed fields. Passed to AzureAISearchIndexBuilder
so every field becomes a dedicated index field
(not a value buried in the document_data blob).
PolicyHubIngestionResult β plain result dataclass returned by the ingestion
pipeline after a run completes.
"""
from dataclasses import dataclass, field
from typing import Optional
from gmf_forge_ai_data.vector_stores import Document
@dataclass
class PolicyHubChunkDocument(Document):
"""
A chunked PolicyHub document with all metadata as first-class index fields.
Pass this class as ``document_type`` to ``AzureAISearchIndexBuilder`` to
automatically provision one Azure AI Search field per attribute below.
``AzureAISearchVectorStore`` will write each populated attribute as an
indexed field on every upload.
"""
document_id: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": True, "sortable": False})
document_name: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": False, "sortable": True})
documentlink: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
language: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": True, "sortable": False})
locale: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": True, "sortable": False})
revisionid: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
source: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": True, "facetable": True, "sortable": False})
upload_date: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": True})
version: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
folder_id: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
folder_path: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
author: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": True, "sortable": False})
description: Optional[str] = field(default=None, metadata={"searchable": True, "filterable": False, "facetable": False, "sortable": False})
mime_type: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": True, "sortable": False})
parent_doc_id: Optional[str] = field(default=None, metadata={"filterable": True, "facetable": False, "sortable": False})
@dataclass
class PolicyHubIngestionResult:
"""Counts and timing returned after a completed ingestion run."""
documents_loaded: int
chunks_created: int
chunks_indexed: int
duration_ms: int
src/ingestion/pipeline.py
"""Ingestion pipeline β chunks, embeds, and indexes PolicyHub documents.
Pipeline (per document batch):
PolicyHubAdminConnector (connector/) -> stream raw Documents
-> RecursiveChunker -> split into overlapping chunks
-> BatchEmbeddings -> embed each chunk via Azure OpenAI
-> AzureAISearchVectorStore -> write batch to Azure AI Search
Documents are processed in batches of DOC_BATCH_SIZE to keep memory bounded
and avoid HTTP 413 errors on the index upload step.
The index schema must be provisioned first via POST /createindex or
``python app.py createindex`` (uses AzureAISearchIndexBuilder with
``document_type=PolicyHubChunkDocument``). This pipeline never touches
the index schema.
"""
import time
from typing import Iterator, List, Optional
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
from gmf_forge_ai_data.chunkers import RecursiveChunker
from gmf_forge_ai_data.embeddings import AzureOpenAIEmbeddings, BatchEmbeddings
from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore
from config import AppConfig
from connector import PolicyHubAdminConnector
from .models import PolicyHubChunkDocument, PolicyHubIngestionResult
logger = BasicLogger(__name__)
# Number of source documents to chunk/embed/index per iteration.
# Tune this to balance memory usage vs. number of API round-trips.
_DOC_BATCH_SIZE = 50
# Maximum chunks per Azure AI Search upload call.
# Azure AI Search enforces a 16 MB per-request limit. Each chunk carries
# ~6 KB of embedding (1536 Γ float32) plus text and metadata, so 200 chunks
# β 3β4 MB β comfortably under the limit. This also avoids the SDK bug
# where its internal 413-retry-split passes error_map twice (TypeError).
_CHUNK_UPLOAD_BATCH_SIZE = 200
def _batched(it: Iterator, size: int):
"""Yield successive lists of `size` items from iterator `it`."""
batch = []
for item in it:
batch.append(item)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
class PolicyHubIngestionPipeline:
"""Stream -> chunk -> embed -> index, one document batch at a time."""
def __init__(self, config: AppConfig):
self._cfg = config
self._tracer = get_tracer()
def run(self, max_docs: Optional[int] = None) -> PolicyHubIngestionResult:
started = time.monotonic()
cfg = self._cfg
embedder = AzureOpenAIEmbeddings(
endpoint=cfg.openai_endpoint,
api_key=cfg.openai_api_key,
deployment_name=cfg.embedding_deployment,
ssl_cert_path=cfg.ssl_cert_path,
)
chunker = RecursiveChunker(
chunk_size=cfg.chunk_size,
chunk_overlap=cfg.chunk_overlap,
)
store = AzureAISearchVectorStore(
endpoint=cfg.search_endpoint,
index_name=cfg.search_index,
api_key=cfg.search_api_key,
embedding_dimension=cfg.embedding_dimension,
document_type=PolicyHubChunkDocument,
)
total_docs = 0
total_chunks_created = 0
total_chunks_indexed = 0
batch_num = 0
with self._tracer.trace("policyhub_ingestion", input={"max_docs": max_docs}) as trace:
connector = PolicyHubAdminConnector(
wsdl_url=cfg.admin_wsdl_url,
username=cfg.api_username,
password=cfg.api_password,
ssl_verify=cfg.ssl_cert_path is not None,
ssl_cert_path=cfg.ssl_cert_path,
)
with connector:
doc_stream = connector.stream(max_docs=max_docs)
for doc_batch in _batched(doc_stream, _DOC_BATCH_SIZE):
batch_num += 1
total_docs += len(doc_batch)
batch_label = f"batch_{batch_num}"
with trace.span(batch_label, input={"docs": len(doc_batch), "total_so_far": total_docs}) as batch_span:
# --- Chunk ---
with batch_span.span("chunk", input={"docs": len(doc_batch)}) as span:
chunks: List[PolicyHubChunkDocument] = []
for doc in doc_batch:
meta = doc.metadata
for chunk in chunker.chunk(doc.content, metadata=meta):
chunks.append(PolicyHubChunkDocument(
id=f"{doc.id}_c{total_chunks_created + len(chunks)}",
content=chunk.text,
metadata=chunk.metadata,
document_id=meta.get("document_id"),
document_name=meta.get("document_name"),
documentlink=meta.get("documentlink"),
language=meta.get("language") or None,
locale=meta.get("locale") or None,
revisionid=meta.get("revisionid") or None,
source=meta.get("source"),
upload_date=meta.get("upload_date") or None,
version=meta.get("version") or None,
folder_id=meta.get("folder_id"),
folder_path=meta.get("folder_path"),
author=meta.get("author") or None,
description=meta.get("description") or None,
mime_type=meta.get("mime_type") or None,
parent_doc_id=doc.id,
))
total_chunks_created += len(chunks)
span.set_output({"chunks": len(chunks)})
# --- Embed ---
with batch_span.span("embed", input={"chunks": len(chunks)}) as span:
vectors = BatchEmbeddings(
provider=embedder, show_progress=False
).embed_batch([c.content for c in chunks])
for chunk, vector in zip(chunks, vectors):
chunk.embedding = vector
span.set_output({"embedded": len(vectors)})
# --- Index ---
# Split into sub-batches to stay under Azure AI Search's
# 16 MB per-request limit and avoid the SDK's broken
# 413-retry-split (error_map duplicate kwarg bug).
with batch_span.span("index", input={"chunks": len(chunks)}) as span:
for chunk_sub in _batched(iter(chunks), _CHUNK_UPLOAD_BATCH_SIZE):
store.add_documents(chunk_sub)
total_chunks_indexed += len(chunks)
span.set_output({"indexed": len(chunks)})
batch_span.set_output({
"chunks_in_batch": len(chunks),
"total_docs": total_docs,
"total_indexed": total_chunks_indexed,
})
logger.info(
"Batch complete",
batch=batch_num,
docs_in_batch=len(doc_batch),
chunks_in_batch=len(chunks),
total_docs=total_docs,
total_indexed=total_chunks_indexed,
)
duration_ms = int((time.monotonic() - started) * 1000)
trace.set_output({
"documents": total_docs,
"chunks": total_chunks_indexed,
"duration_ms": duration_ms,
})
logger.info(
"Ingestion complete",
documents=total_docs,
chunks_created=total_chunks_created,
chunks_indexed=total_chunks_indexed,
duration_ms=duration_ms,
)
return PolicyHubIngestionResult(
documents_loaded=total_docs,
chunks_created=total_chunks_created,
chunks_indexed=total_chunks_indexed,
duration_ms=duration_ms,
)
src/query/__init__.py
from .pipeline import QueryPipeline, QueryResult
from .generation import GenerationPipeline, GenerationResult
__all__ = [
"QueryPipeline",
"QueryResult",
"GenerationPipeline",
"GenerationResult",
]
src/query/generation.py
"""Generation pipeline β assembles context into a structured LLM prompt and
returns a templated response.
Response template (LLM is instructed to produce exactly this):
<SummarizedContent>
LLM-generated answer to the user's question
</SummarizedContent>
<Citations>
Verbatim excerpts from source documents that support the answer
</Citations>
<References>
<Item1>url or document name</Item1>
<Item2>url or document name</Item2>
</References>
"""
import re
import time
from dataclasses import dataclass
from typing import Dict, List
from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers.azure_openai_provider import AzureOpenAIProvider
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
from gmf_forge_ai_data.context import WindowedContext
from config import AppConfig
from registry import prompt_registry
from registry.llm_registry import llm_registry
logger = BasicLogger(__name__)
@dataclass
class GenerationResult:
query: str
answer: str # full templated LLM response
summarized_content: str # extracted <SummarizedContent> text
citations: str # extracted <Citations> text
references: List[str] # extracted <Item*> values from <References>
sources: List[Dict[str, str]] # [{document_name, documentlink, score, rank}]
duration_ms: int
class GenerationPipeline:
"""
Assembles windowed context into a structured prompt and calls the LLM.
The LLM is instructed to respond in a fixed XML-like template. The
template is parsed so each section (summary, citations, references) is
returned separately for independent rendering in the API response or UI.
"""
def __init__(self, config: AppConfig):
self._tracer = get_tracer()
# Pass all registered LLMs to UnifiedLLMGateway
self._llm_gateway = UnifiedLLMGateway(
provider_registry=llm_registry
)
self._deployment = config.chat_deployment
async def generate(self, query: str, context: WindowedContext) -> GenerationResult:
started = time.monotonic()
context_blocks: List[str] = []
sources: List[Dict[str, str]] = []
# Maps source number (1-based) -> link or name, used to derive references
# from whichever sources the LLM actually cited.
source_map: Dict[int, str] = {}
for i, result in enumerate(context.results, 1):
doc = result.document
meta = doc.metadata
# Prefer typed fields (PolicyHubChunkDocument); fall back to metadata dict
name = (
getattr(doc, "document_name", None)
or meta.get("document_name", meta.get("file_name", f"Document {i}"))
)
link = (
getattr(doc, "documentlink", None)
or meta.get("documentlink", meta.get("source", ""))
)
context_blocks.append(f"[Source {i}: {name}]\n{result.document.content}")
sources.append({
"document_name": name,
"documentlink": link,
"score": f"{result.score:.4f}",
"rank": str(result.rank),
})
source_map[i] = link or name
prompt = (
prompt_registry.format("policyhub_system", context_blocks="\n\n".join(context_blocks))
+ "\n\n"
+ prompt_registry.format("policyhub_user", query=query)
)
with self._tracer.trace("policyhub_generation", input=query) as trace:
with trace.generation("llm_response", model=self._deployment) as span:
# Use UnifiedLLMGateway to make the call
response = await self._llm_gateway.complete(prompt=prompt, temperature=0.2)
answer = response.content if hasattr(response, "content") else str(response)
span.set_output({"chars": len(answer), "sources": len(sources)})
duration_ms = int((time.monotonic() - started) * 1000)
summarized, citations, _ = _parse_template(answer)
references = _cited_references(citations, source_map)
logger.info("Generation complete", query=query, chars=len(answer), sources=len(sources))
return GenerationResult(
query=query,
answer=answer,
summarized_content=summarized,
citations=citations,
references=references,
sources=sources,
duration_ms=duration_ms,
)
def _parse_template(text: str):
"""Extract the three sections from the LLM template response."""
def _extract(tag: str) -> str:
m = re.search(rf"<{tag}>(.*?)</{tag}>", text, re.DOTALL)
return m.group(1).strip() if m else ""
summarized = _extract("SummarizedContent")
citations = _extract("Citations")
return summarized, citations, None
def _cited_references(citations: str, source_map: Dict[int, str]) -> List[str]:
"""Return deduplicated links/names for only the sources cited by the LLM.
Parses '[Source N:' patterns from the Citations block and maps each N back
to the link or document name stored in source_map.
"""
cited_nums = [int(m) for m in re.findall(r"\[Source\s+(\d+):", citations)]
seen: set[str] = set()
refs: List[str] = []
for n in cited_nums:
ref = source_map.get(n)
if ref and ref not in seen:
seen.add(ref)
refs.append(ref)
return refs
src/query/pipeline.py
"""Online query pipeline β rewrites, embeds, retrieves, filters, and windows context.
Pipeline:
QueryRewriter (LLM-powered query improvement)
-> AzureOpenAIEmbeddings (embed the rewritten query)
-> HybridRetriever (vector + keyword search against Azure AI Search)
-> RelevanceFilter (drop results below the score threshold)
-> ContextWindowManager (fit remaining results into the token budget)
"""
import time
from dataclasses import dataclass
from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
from gmf_forge_ai_data.context import ContextWindowManager, RelevanceFilter, WindowedContext
from gmf_forge_ai_data.embeddings import AzureOpenAIEmbeddings
from gmf_forge_ai_data.query import QueryRewriter
from gmf_forge_ai_data.retrieval import HybridRetriever, RetrievalQuery
from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore
from config import AppConfig
from ingestion import PolicyHubChunkDocument
from registry.llm_registry import llm_registry
logger = BasicLogger(__name__)
@dataclass
class QueryResult:
original_query: str
rewritten_query: str
context: WindowedContext
duration_ms: int
class QueryPipeline:
"""
Online query pipeline β runs per user request.
QueryRewriter
-> AzureOpenAIEmbeddings
-> HybridRetriever
-> RelevanceFilter
-> ContextWindowManager
"""
def __init__(self, config: AppConfig):
cfg = config
self._tracer = get_tracer()
self._top_k = cfg.top_k
self._embedder = AzureOpenAIEmbeddings(
endpoint=cfg.openai_endpoint,
api_key=cfg.openai_api_key,
deployment_name=cfg.embedding_deployment,
ssl_cert_path=cfg.ssl_cert_path,
)
self._retriever = HybridRetriever(
AzureAISearchVectorStore(
endpoint=cfg.search_endpoint,
index_name=cfg.search_index,
api_key=cfg.search_api_key,
embedding_dimension=cfg.embedding_dimension,
document_type=PolicyHubChunkDocument,
)
)
self._filter = RelevanceFilter(min_score=cfg.min_relevance_score)
self._window = ContextWindowManager(max_tokens=cfg.max_context_tokens)
llm = UnifiedLLMGateway(
provider_registry=llm_registry
)
self._rewriter = QueryRewriter(llm_gateway=llm)
async def run(self, query_text: str) -> QueryResult:
started = time.monotonic()
with self._tracer.trace("policyhub_query", input=query_text) as trace:
# 1. Rewrite
with trace.span("rewrite", input=query_text) as span:
rewritten = await self._rewriter.rewrite(query_text)
span.set_output({"rewritten": rewritten.rewritten})
logger.info("Query rewritten", original=query_text, rewritten=rewritten.rewritten)
# 2. Embed
with trace.span("embed", input=rewritten.rewritten) as span:
embedding = self._embedder.embed_text(rewritten.rewritten)
span.set_output({"dimension": len(embedding)})
# 3. Retrieve
with trace.span("retrieve", input={"query": rewritten.rewritten, "top_k": self._top_k}) as span:
results = self._retriever.retrieve(
RetrievalQuery(text=rewritten.rewritten, embedding=embedding, top_k=self._top_k)
)
span.set_output({"results": len(results)})
logger.info("Retrieved chunks", count=len(results))
# 4. Filter
with trace.span("filter", input={"results": len(results)}) as span:
filtered = self._filter.filter(results)
span.set_output({"kept": len(filtered), "dropped": len(results) - len(filtered)})
logger.info("Filtered chunks", kept=len(filtered), dropped=len(results) - len(filtered))
# 5. Window
with trace.span("window", input={"results": len(filtered)}) as span:
context = self._window.fit(filtered)
span.set_output({"tokens": context.total_tokens, "docs": len(context.results)})
duration_ms = int((time.monotonic() - started) * 1000)
trace.set_output({"rewritten": rewritten.rewritten, "context_docs": len(context.results)})
return QueryResult(
original_query=query_text,
rewritten_query=rewritten.rewritten,
context=context,
duration_ms=duration_ms,
)
src/registry/__init__.py
from .prompt_registry import prompt_registry
from .llm_registry import llm_registry
__all__ = ["prompt_registry", "llm_registry"]
src/registry/llm_registry.py
"""LLM Provider registry for managing and registering LLM configurations.
This file centralizes the registration of LLM providers, allowing the application
components to retrieve LLM instances dynamically based on configuration.
"""
from gmf_forge_ai_shared_core.registry import LLMProviderRegistry
from config import load_config
from gmf_forge_ai_shared_core.llm_gateway.providers.azure_openai_provider import AzureOpenAIProvider
import os
llm_registry = LLMProviderRegistry()
# Load configuration from AppConfig
config = load_config()
# Resolve ssl_cert_path directly from the environment
ssl_cert_path = config.ssl_cert_path
if not os.path.exists(ssl_cert_path):
raise FileNotFoundError(f"SSL certificate not found at {ssl_cert_path}")
# Create an instance of AzureOpenAIProvider
azure_openai_provider = AzureOpenAIProvider(
endpoint=config.openai_endpoint,
api_key=config.openai_api_key,
deployment_name=config.chat_deployment,
api_version=config.chat_api_version,
ssl_cert_path=ssl_cert_path,
)
# Register the provider
llm_registry.register(
name="openai",
provider=azure_openai_provider
)
src/registry/prompt_registry.py
"""Prompt registry for the PolicyHub generation pipeline.
All LLM prompts are versioned and registered here. To iterate on a prompt,
add a new registration with a bumped version β the pipeline always picks up
the latest version automatically via PromptRegistry.get().
"""
from gmf_forge_ai_shared_core.registry import PromptRegistry
prompt_registry = PromptRegistry()
prompt_registry.register(
name="policyhub_system",
version="1.0",
variables=["context_blocks"],
description="System prompt for the PolicyHub corporate policy assistant.",
template="""\
You are a corporate policy assistant helping employees understand company policies \
and compliance requirements.
Answer the user's question using ONLY the provided policy document excerpts below.
If the answer cannot be determined from the provided context, state that clearly.
You MUST format your response using EXACTLY this structure β do not add any text \
outside these tags:
<SummarizedContent>
Write a clear, concise answer to the user's question based solely on the policy \
documents. Use plain language. Use bullet points if listing multiple items.
</SummarizedContent>
<Citations>
For each document excerpt that directly supports your answer, quote it verbatim \
on its own line using this format:
[Document Name] "exact quoted text"
</Citations>
<References>
List the URL (or document name if no URL) for each source you actually cited above,
one entry per unique document using <Item1>, <Item2>, ... format.
Omit any sources you did not cite. If nothing was cited, leave this section empty.
</References>
--- Policy Document Excerpts ---
{context_blocks}""",
)
prompt_registry.register(
name="policyhub_user",
version="1.0",
variables=["query"],
description="User turn prompt wrapping the employee's question.",
template="Question: {query}",
)