complete_workflow.py
"""
Complete Workflow - Main Orchestrator
Demonstrates end-to-end integration of all shared-core components
using a modular architecture:
- config.py: Configuration management
- auth_service.py: Authentication logic
- registry_setup.py: Registry initialization
- llm_service.py: LLM calling logic
This shows how to structure a real AI application with proper
separation of concerns and observability throughout.
"""
# ============================================================================
# SSL FIX for Corporate Networks with SSL Inspection
# ============================================================================
# If you get "SSL: CERTIFICATE_VERIFY_FAILED" errors, uncomment this:
#
# import ssl
# ssl._create_default_https_context = ssl._create_unverified_context
#
# WARNING: Only use for development in trusted corporate networks!
# For production, install corporate SSL certificates properly.
# NOTE: This example now automatically uses corporate certificates from certs/
# ============================================================================
import asyncio
import time
from datetime import datetime, timezone
# Import modular components
from config import validate_configuration, get_azure_config, get_app_config
from auth_service import create_auth_service
from registry_setup import create_and_initialize_registries
from llm_service import create_llm_service
# Import observability for main orchestrator
from gmf_forge_ai_shared_core.observability import BasicLogger, BasicMetricsCollector, BasicPerformanceMonitor
# Initialize observability
logger = BasicLogger("complete_workflow")
metrics = BasicMetricsCollector()
class AIApplicationPlatform:
"""
Complete AI application platform using modular components.
Demonstrates integration of:
- AuthService: API key management
- RegistryManager: LLM Provider, Prompt, and Tool registries
- LLMService: LLM gateway and completions
- BasicPerformanceMonitor: LLM-specific performance tracking
- Observability: Logging and metrics throughout
"""
def __init__(self, azure_config, app_config):
"""
Initialize the platform with all components.
Args:
azure_config: Azure OpenAI configuration
app_config: Application configuration
"""
logger.info("Initializing AI Application Platform")
# 1. Initialize Authentication Service
logger.info("Initializing authentication service", key_expiry_days=app_config.api_key_expiry_days)
self.auth_service = create_auth_service(
key_expiry_days=app_config.api_key_expiry_days
)
self.api_key = self.auth_service.create_api_key(name="platform-app")
logger.info("Authentication service initialized")
# 2. Initialize Registry Manager
logger.info("Initializing registries")
self.registry_manager = create_and_initialize_registries(azure_config=azure_config)
logger.info("Registries initialized")
# 3. Initialize Performance Monitor (explicit opt-in for LLM tracking)
logger.info("Initializing performance monitor")
self.performance_monitor = BasicPerformanceMonitor()
logger.info("Performance monitor initialized")
# 4. Initialize LLM Service with performance monitoring
logger.info("Initializing LLM service", endpoint=azure_config.endpoint, deployment=azure_config.deployment)
self.llm_service = create_llm_service(azure_config, performance_monitor=self.performance_monitor)
logger.info("LLM service initialized with performance tracking")
# Store config for later use
self.app_config = app_config
metrics.increment("platform.initialized")
logger.info("Platform initialization complete")
async def process_request(self, request_data: dict) -> dict:
"""
Process a complete AI request with authentication, LLM call, and monitoring.
Args:
request_data: Request containing api_key, prompt_name, variables, etc.
Returns:
Response with result and metadata
"""
request_start_time = time.time()
request_id = f"req-{int(datetime.now(timezone.utc).timestamp() * 1000)}"
logger.info("Processing request",
request_id=request_id,
prompt_name=request_data.get("prompt_name"))
metrics.increment("requests.total")
try:
# Step 1: Authenticate using AuthService
api_key = request_data.get("api_key")
if not self.auth_service.validate_api_key(api_key, context=request_id):
logger.warning("Authentication failed", request_id=request_id)
metrics.increment("requests.failed")
return {
"status": "error",
"message": "Invalid or missing API key",
"request_id": request_id
}
logger.info("Authentication successful", request_id=request_id)
# Step 2: Execute tools if needed using Tool Registry
if request_data.get("use_tools"):
search_results = await self.registry_manager.tool_registry.execute(
"search_knowledge_base",
query=request_data.get("variables", {}).get("question", ""),
limit=3
)
logger.info("Tool executed",
request_id=request_id,
tool="search_knowledge_base",
results_count=len(search_results))
# Step 3: Call LLM using LLMService with Prompt Registry
prompt_name = request_data.get("prompt_name")
variables = request_data.get("variables", {})
temperature = request_data.get("temperature", self.app_config.default_temperature)
max_tokens = request_data.get("max_tokens", self.app_config.default_max_tokens)
result = await self.llm_service.complete_with_prompt_template(
prompt_registry=self.registry_manager.prompt_registry,
prompt_name=prompt_name,
variables=variables,
temperature=temperature,
max_tokens=max_tokens,
request_id=request_id
)
# Step 4: Track overall metrics
metrics.increment("requests.successful")
# Calculate total request latency
total_latency_ms = (time.time() - request_start_time) * 1000
# Step 5: Return response
return {
"status": "success",
"request_id": request_id,
"result": result["content"],
"metadata": {
"model": result["model"],
"tokens_used": result["usage"]["total_tokens"],
"prompt_name": prompt_name,
"latency_ms": round(total_latency_ms, 2),
"llm_latency_ms": result["latency_ms"]
}
}
except Exception as e:
logger.error("Request failed",
request_id=request_id,
error=str(e))
metrics.increment("requests.failed")
return {
"status": "error",
"message": str(e),
"request_id": request_id
}
async def demo_complete_workflow():
"""Demonstrate the complete workflow."""
logger.info("COMPLETE WORKFLOW DEMO")
# Validate configuration
if not validate_configuration():
logger.warning("Running without Azure OpenAI - some features disabled")
return
# Load configuration
azure_config = get_azure_config()
app_config = get_app_config()
# Initialize platform
platform = AIApplicationPlatform(azure_config, app_config)
logger.info("Platform initialized")
logger.info("")
logger.info("Starting test", test="1: Question Answering")
request1 = {
"api_key": platform.api_key,
"prompt_name": "question_answering",
"variables": {
"question": "What is the difference between RAG and fine-tuning?"
},
"temperature": 0.7,
"max_tokens": 300
}
logger.info("Question", text=request1['variables']['question'])
response1 = await platform.process_request(request1)
if response1["status"] == "success":
logger.info("Response",
result=response1["result"],
request_id=response1["request_id"],
model=response1["metadata"]["model"],
latency_ms=response1["metadata"]["latency_ms"],
llm_latency_ms=response1["metadata"]["llm_latency_ms"])
else:
logger.error("Request failed", message=response1["message"])
logger.info("")
# Test Case 2: Summarization
logger.info("Starting test", test="2: Summarization")
request2 = {
"api_key": platform.api_key,
"prompt_name": "summarization",
"variables": {
"text": "Vector databases are specialized databases designed to store and query high-dimensional vectors efficiently. They are essential for AI applications that use embeddings, such as semantic search, recommendation systems, and retrieval-augmented generation (RAG). Popular vector databases include Pinecone, Weaviate, Qdrant, and Milvus.",
"num_sentences": "2"
},
"temperature": 0.5,
"max_tokens": 150
}
logger.info("Text to summarize", text=request2['variables']['text'])
response2 = await platform.process_request(request2)
if response2["status"] == "success":
logger.info("Summary", result=response2["result"], latency_ms=response2["metadata"]["latency_ms"])
else:
logger.error("Request failed", message=response2["message"])
logger.info("")
# Test Case 3: Invalid API Key
logger.info("Starting test", test="3: Invalid API Key (Security)")
request3 = {
"api_key": "invalid-key-123",
"prompt_name": "question_answering",
"variables": {
"question": "This should fail"
}
}
response3 = await platform.process_request(request3)
logger.info("Security check", status=response3['status'], detail=response3['message'])
logger.info("")
logger.info("PLATFORM METRICS")
# Aggregate metrics from all services
all_metrics = {
"main": metrics.get_metrics(),
"auth": platform.auth_service.get_metrics(),
"registry": platform.registry_manager.get_metrics(),
"llm": platform.llm_service.get_metrics()
}
# Display counters
for service, service_metrics in all_metrics.items():
if service_metrics["counters"]:
for name, value in sorted(service_metrics["counters"].items()):
logger.info("Counter", service=service.upper(), name=name, value=value)
# Display gauges
for service, service_metrics in all_metrics.items():
for name, value in sorted(service_metrics.get("gauges", {}).items()):
logger.info("Gauge", service=service.upper(), name=name, value=value)
# Display histogram summaries
for service, service_metrics in all_metrics.items():
for name, values in sorted(service_metrics.get("histograms", {}).items()):
if values:
logger.info("Histogram", service=service.upper(), name=name,
count=len(values),
min=round(min(values), 2),
max=round(max(values), 2),
avg=round(sum(values) / len(values), 2))
# Display LLM performance statistics
perf_stats = platform.llm_service.get_performance_stats()
if perf_stats is not None and perf_stats.get('total_requests', 0) > 0:
logger.info("LLM Performance Statistics")
logger.info("Requests",
total=perf_stats['total_requests'],
successful=perf_stats['successful_requests'],
failed=perf_stats['failed_requests'],
success_rate=f"{perf_stats['success_rate'] * 100:.1f}%")
logger.info("Latency",
avg_ms=round(perf_stats['avg_latency_ms'], 2),
min_ms=round(perf_stats['min_latency_ms'], 2),
max_ms=round(perf_stats['max_latency_ms'], 2))
logger.info("Token usage",
total=perf_stats['total_tokens'],
prompt=perf_stats['total_prompt_tokens'],
completion=perf_stats['total_completion_tokens'])
logger.info("Throughput", avg_tokens_per_sec=round(perf_stats['avg_tokens_per_second'], 2))
provider_stats = platform.performance_monitor.get_provider_stats()
if provider_stats:
for provider_name, stats in provider_stats.items():
logger.info("Provider stats",
provider=provider_name,
requests=stats['total_requests'],
avg_latency_ms=round(stats['avg_latency_ms'], 2),
total_tokens=stats['total_tokens'],
avg_tokens_per_sec=round(stats['avg_tokens_per_second'], 2))
else:
logger.info("No LLM performance statistics available",
note="monitor may not be enabled or no requests tracked")
async def main():
"""Run the complete workflow demo."""
logger.info("COMPLETE WORKFLOW — MODULAR ARCHITECTURE")
await demo_complete_workflow()
logger.info("MODULAR ARCHITECTURE OVERVIEW",
module_config="config.py — Configuration management (env vars, validation)",
module_auth="auth_service.py — Authentication with API key management",
module_registry="registry_setup.py — Initialize LLM Provider, Prompt, Tool registries",
module_llm="llm_service.py — LLM gateway with observability",
module_main="complete_workflow.py — Main orchestrator",
observability_1="BasicLogger — Structured logging at every layer",
observability_2="BasicMetricsCollector — Counters, gauges, histograms per service",
observability_3="BasicPerformanceMonitor — LLM-specific tracking (tokens, latency, throughput)")
if __name__ == "__main__":
asyncio.run(main())
decorator_examples.py
"""
Decorator-based tracing examples.
This module demonstrates three decorators from the tracing package and when
to use each one:
1) @traced
Use on top-level entry points (for example: a full request handler,
pipeline function, or agent run). It creates a root trace.
2) @trace_span
Use on regular internal steps (retrieval, parsing, transformation,
routing, tool orchestration). If a parent span/trace is active, this
becomes a child span. If no parent is active, it safely creates a root
trace so the function can still be called standalone.
3) @trace_generation
Use on LLM call boundaries. It creates a generation span (child when a
parent exists, root fallback otherwise) and can attach model + token usage.
Capture controls:
- capture_input controls whether function args are stored in trace data.
- capture_output controls whether return values are stored in trace data.
- capture_usage (generation only) controls token usage extraction from
result["usage"] when present.
Run these with::
export TRACING_PROVIDER=langfuse # or splunk, or leave as null
python decorator_examples.py
All examples use one tracer instance. Changing TRACING_PROVIDER switches
backend behavior without changing application code.
"""
import asyncio
from typing import Any, Dict, Optional
from gmf_forge_ai_shared_core.observability.tracing import (
get_tracer,
trace_generation,
trace_span,
traced,
)
from gmf_forge_ai_shared_core.observability.tracing.base import _get_active_span
# ============================================================================
# Example A: Pure decorator style — straightforward linear flow
# ============================================================================
@traced(name="simple_flow", tags=["demo"])
async def example_a():
"""Simplest pattern — full pipeline with decorators only."""
print("[Example A] Pure decorator style:")
result = await fetch_weather("London")
print(f" Weather: {result}")
return result
@trace_span(name="weather_fetch")
async def fetch_weather(city: str) -> str:
"""Child span — automatically becomes child of traced() root trace."""
await asyncio.sleep(0.1) # simulate API call
return f"Sunny in {city}"
# ============================================================================
# Example B: Decorators with custom input/output capture
# ============================================================================
@traced(
name="pipeline_with_capture",
capture_input=True,
capture_output=True,
)
async def example_b():
"""Decorators with explicit input/output capture control."""
print("[Example B] Decorator with capture control:")
query = "What is 2+2?"
answer = await query_llm(query)
print(f" Q: {query} → A: {answer}")
return answer
@trace_generation(
name="llm_call",
model="gpt-4o-mini",
capture_input=True,
capture_output=True,
capture_usage=True,
)
async def query_llm(question: str) -> str:
"""
Generation decorator — simulates LLM with token tracking.
In real usage, return dict with "usage" key for token capture::
return {
"text": "answer",
"usage": {
"prompt_tokens": 15,
"completion_tokens": 42,
"total_tokens": 57,
},
}
"""
await asyncio.sleep(0.05)
return "4"
# ============================================================================
# Example C: Mixed explicit + decorator style (most control)
# ============================================================================
@traced(name="mixed_style")
async def example_c():
"""
Decorators coexist with explicit tracing — best of both worlds.
Use the active span's .span() to create explicit child spans inside
a decorated function. Use tracer.trace() only at the outermost level.
"""
print("[Example C] Mixed explicit + decorator:")
# Get the active span set by @traced and create a child span explicitly
active = _get_active_span()
with active.span("orchestration_step", input={"phase": "init"}) as span:
# Decorated function is automatically a child of orchestration_step
data = await load_and_process()
span.set_output({"processed": len(data)})
print(f" Processed {len(data)} items")
return data
@trace_span(name="load_and_process", capture_input=True, capture_output=True)
async def load_and_process() -> list[dict[str, Any]]:
"""This becomes a child span of mixed_style's explicit trace."""
await asyncio.sleep(0.05)
return [{"id": 1, "status": "ok"}, {"id": 2, "status": "ok"}]
# ============================================================================
# Example D: Metadata callbacks for dynamic annotation
# ============================================================================
def _analyze_result(result: Dict[str, Any], metadata: Dict[str, Any]) -> None:
"""Custom callback to extract metadata from result."""
if "duration" in result:
metadata["duration_ms"] = round(result["duration"] * 1000, 2)
metadata["performance"] = (
"fast" if result["duration"] < 0.2 else "slow"
)
@traced(
name="analytics_flow",
capture_input=False,
capture_output=False,
metadata_fn=_analyze_result,
)
async def example_d():
"""Use metadata_fn callback to add custom computed metadata."""
print("[Example D] Decorator with metadata callback:")
start = asyncio.get_event_loop().time()
await asyncio.sleep(0.1)
return {"duration": asyncio.get_event_loop().time() - start}
# ============================================================================
# Example E: Async/sync transparently supported
# ============================================================================
@traced(name="mixed_async_sync")
async def example_e():
"""Decorators work on both async and sync functions."""
print("[Example E] Mixed async and sync functions:")
# Sync function called from async — still traces correctly
sync_result = process_data([1, 2, 3])
async_result = await async_process_data([4, 5, 6])
print(f" Sync result: {sync_result}")
print(f" Async result: {async_result}")
return f"sync={sync_result}, async={async_result}"
@trace_span(name="sync_processing")
def process_data(items: list[int]) -> int:
"""Sync function — decorator still creates child span."""
return sum(items)
@trace_span(name="async_processing")
async def async_process_data(items: list[int]) -> int:
"""Async function — decorator still creates child span."""
await asyncio.sleep(0.01)
return sum(items)
# ============================================================================
# Example F: Error handling — exceptions are captured and re-raised
# ============================================================================
@traced(name="error_handling")
async def example_f():
"""Decorator captures exceptions and re-raises them transparently."""
print("[Example F] Error handling:")
try:
result = await potentially_failing_operation()
print(f" Result: {result}")
except ValueError as e:
print(f" Caught error (as expected): {e}")
@trace_span(name="failing_op", capture_input=False)
async def potentially_failing_operation() -> str:
"""Decorator captures error, logs it, then re-raises."""
raise ValueError("Simulated error for demonstration")
# ============================================================================
# Example G: Nesting depth — arbitrary levels work transparently
# ============================================================================
@traced(name="deep_nesting")
async def example_g():
"""Decorators automatically create correct parent-child hierarchy."""
print("[Example G] Deep nesting (3 levels):")
result = await level_1()
print(f" Deep result: {result}")
return result
@trace_span(name="level_1")
async def level_1() -> str:
"""Level 1 — parent is level_2."""
result = await level_2()
return f"L1[{result}]"
@trace_span(name="level_2")
async def level_2() -> str:
"""Level 2 — parent is level_3."""
result = await level_3()
return f"L2[{result}]"
@trace_span(name="level_3")
async def level_3() -> str:
"""Level 3 — parent is traced()."""
return "L3[leaf]"
# ============================================================================
# Main
# ============================================================================
async def main():
"""Run all examples."""
print("=" * 70)
print("Decorator-based tracing examples")
print("=" * 70)
# Get tracer once at startup
tracer = get_tracer()
try:
# Run examples
await example_a()
await example_b()
await example_c()
await example_d()
await example_e()
await example_f()
await example_g()
finally:
# Flush at shutdown to ensure delivery
tracer.flush()
print("\n" + "=" * 70)
print("All examples completed. Check your tracing backend for traces.")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
multi_llm_provider_demo.py
"""
Multi-Provider Example
Demonstrates using multiple LLM providers with LLMProviderRegistry.
Shows how to switch between Azure OpenAI (cloud) and Ollama (local).
"""
import asyncio
import os
from pathlib import Path
from dotenv import load_dotenv
from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
from gmf_forge_ai_shared_core.llm_gateway.providers import AzureOpenAIProvider, OllamaProvider
from gmf_forge_ai_shared_core.registry import LLMProviderRegistry
from gmf_forge_ai_shared_core.observability import BasicLogger
# Load environment variables
load_dotenv()
logger = BasicLogger("multi_llm_provider_demo")
# Corporate SSL certificate path
WORKSPACE_ROOT = Path(__file__).parent.parent.parent.parent
CORPORATE_CERT = WORKSPACE_ROOT / "certs" / "gmf_and_public_cas.pem"
async def setup_providers():
"""
Set up multiple providers and register them.
Returns:
Tuple of (provider_registry, gateway)
"""
logger.info("MULTI-PROVIDER SETUP")
# Initialize registry
registry = LLMProviderRegistry()
# 1. Setup Azure OpenAI (if credentials available)
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
azure_key = os.getenv("AZURE_OPENAI_API_KEY")
azure_deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4")
use_managed_identity = os.getenv("AZURE_USE_MANAGED_IDENTITY", "").lower() in ("1", "true", "yes")
if azure_endpoint and (azure_key or use_managed_identity):
logger.info("Azure OpenAI configured")
provider_kwargs = {
"endpoint": azure_endpoint,
"deployment_name": azure_deployment,
}
if use_managed_identity:
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
provider_kwargs["token_provider"] = get_bearer_token_provider(
DefaultAzureCredential(),
"https://cognitiveservices.azure.com/.default",
)
logger.info("Using managed identity authentication")
else:
provider_kwargs["api_key"] = azure_key
logger.info("Using API key authentication")
# Add corporate certificate if available
if CORPORATE_CERT.exists():
provider_kwargs["ssl_cert_path"] = str(CORPORATE_CERT)
logger.info("Using corporate SSL certificate", cert=CORPORATE_CERT.name)
else:
logger.info("Using default SSL")
azure_provider = AzureOpenAIProvider(**provider_kwargs)
registry.register("azure", azure_provider, is_default=True)
# Register models through the registry
registry.register_model(
"azure",
azure_deployment,
capabilities={
"max_tokens": 16384,
"context_window": 128000,
"supports_functions": True,
"supports_vision": True,
"supports_streaming": True,
},
deployment=azure_deployment
)
logger.info("Azure provider registered", name="azure", default=True, model=azure_deployment)
else:
logger.warning("Azure OpenAI not configured", missing=["AZURE_OPENAI_ENDPOINT", "AZURE_OPENAI_API_KEY"])
# 2. Setup Ollama (local)
logger.info("Ollama configured")
ollama_provider = OllamaProvider(
base_url="http://localhost:11434",
model="llama3"
)
# Check if Ollama is running
if await ollama_provider.validate_credentials():
registry.register("ollama", ollama_provider, is_default=not registry.has_provider("azure"))
# Register models through the registry
registry.register_model(
"ollama",
"llama3",
capabilities={
"context_window": 8192,
"supports_chat": True,
"supports_streaming": True,
"local": True,
}
)
registry.register_model(
"ollama",
"mistral",
capabilities={
"context_window": 8192,
"supports_chat": True,
"supports_streaming": True,
"local": True,
}
)
logger.info("Ollama provider registered", name="ollama", models=["llama3", "mistral"])
else:
logger.warning("Ollama not running",
tip_1="To start: ollama serve",
tip_2="To install: https://ollama.ai/download")
# 3. Create gateway with registry
gateway = UnifiedLLMGateway(provider_registry=registry)
logger.info("Providers ready",
registered=list(registry.list_providers()),
default=registry.get_default_name())
return registry, gateway
async def test_azure_provider(gateway: UnifiedLLMGateway):
"""Test Azure OpenAI provider."""
logger.info("Starting test", test="1: Azure OpenAI (Cloud)")
try:
prompt = "Explain what RAG is in one sentence."
logger.info("Prompt", text=prompt)
response = await gateway.complete(
prompt=prompt,
provider="azure", # Explicitly select Azure
temperature=0.7,
max_tokens=100
)
logger.info("Response",
content=response.content,
tokens=response.usage['total_tokens'],
model=response.model)
except ValueError as e:
logger.error("Azure provider not registered", reason="credentials not configured", error=str(e))
except Exception as e:
logger.error("Azure test failed", error=str(e))
async def test_ollama_provider(gateway: UnifiedLLMGateway):
"""Test Ollama provider."""
logger.info("Starting test", test="2: Ollama (Local)")
try:
prompt = "What is the capital of France?"
logger.info("Prompt", text=prompt)
response = await gateway.complete(
prompt=prompt,
provider="ollama", # Explicitly select Ollama
temperature=0.7,
max_tokens=50
)
logger.info("Response",
content=response.content,
tokens=response.usage['total_tokens'],
model=response.model)
except ValueError as e:
logger.error("Ollama provider not registered", reason="not running", error=str(e))
except ConnectionError as e:
logger.error("Cannot connect to Ollama", error=str(e),
fix_1="Install: https://ollama.ai/download",
fix_2="Run: ollama serve",
fix_3="Pull model: ollama pull llama3")
except Exception as e:
logger.error("Ollama test failed", error=str(e))
async def test_default_provider(gateway: UnifiedLLMGateway):
"""Test using default provider."""
logger.info("Starting test", test="3: Default Provider (Auto-Select)")
try:
prompt = "Name one programming language."
logger.info("Prompt", text=prompt)
# No provider specified - uses default
response = await gateway.complete(
prompt=prompt,
temperature=0.7,
max_tokens=20
)
logger.info("Response",
content=response.content,
tokens=response.usage['total_tokens'],
model=response.model,
provider=response.metadata.get('provider', 'azure'))
except Exception as e:
logger.error("Default provider test failed", error=str(e))
async def test_streaming(gateway: UnifiedLLMGateway):
"""Test streaming with Ollama."""
logger.info("Starting test", test="4: Streaming (Ollama)")
try:
# Get the Ollama provider directly for streaming
registry = gateway.provider_registry
if not registry or not registry.has_provider("ollama"):
logger.error("Ollama provider not available")
return
ollama_provider = registry.get("ollama")
prompt = "Count from 1 to 5."
logger.info("Prompt", text=prompt)
chunks = []
async for chunk in ollama_provider.stream_complete(
prompt=prompt,
temperature=0.7,
max_tokens=50
):
chunks.append(chunk)
logger.info("Streaming complete", response="".join(chunks))
except ValueError:
logger.error("Ollama provider not registered")
except Exception as e:
logger.error("Streaming test failed", error=str(e))
async def main():
"""Run the multi-provider demo."""
logger.info("MULTI-PROVIDER LLM GATEWAY DEMO")
# Setup providers
registry, gateway = await setup_providers()
# Run tests
logger.info("")
await test_azure_provider(gateway)
logger.info("")
await test_ollama_provider(gateway)
logger.info("")
await test_default_provider(gateway)
logger.info("")
await test_streaming(gateway)
logger.info("")
logger.info("SUMMARY",
total_providers=len(registry),
default=registry.get_default_name(),
use_case_1="Azure OpenAI for production (cloud, enterprise-grade)",
use_case_2="Ollama for development (local, no API costs)",
use_case_3="Switch providers without code changes")
if __name__ == "__main__":
asyncio.run(main())
tracing_example.py
"""
Tracing Example — gmf_forge_ai_shared_core observability
Shows how to instrument a RAG query and an agent run with the
provider-agnostic tracing API. Switch backends by changing a single
env var — application code is identical for every provider.
Tracing Model
-------------
Concept What it is
----------- -----------------------------------------------------------
trace One end-to-end request (e.g. a full RAG query or agent run).
Created with tracer.trace(...) or tracer.trace_agent_run(...).
span A named step inside a trace (e.g. retrieval, reranking, tool
call). Created with trace.span(...). Spans can be nested.
generation A specialised span for LLM calls that additionally captures
token usage (prompt + completion tokens) and the model name.
Created with trace.generation(...).
All three support context-manager usage (with ...):
span.set_output(...) — attach result metadata to the span
gen.set_token_usage(...) — record prompt/completion token counts
trace.set_output(...) — attach the final answer to the trace
Provider-agnostic pattern
-------------------------
Call get_tracer() once at startup. It reads TRACING_PROVIDER from the
environment and returns a singleton — NullTracer by default, SplunkTracer
when TRACING_PROVIDER=splunk, or LangfuseTracer when
TRACING_PROVIDER=langfuse. Your instrumentation code never imports a
concrete provider class directly, so switching backends requires only an
env-var change, no code changes.
Supported TRACING_PROVIDER values (set in .env):
null — no-op, no dependencies required (default)
splunk — Splunk Observability Cloud via OpenTelemetry OTLP
langfuse — Langfuse backend
Usage::
cd packages/shared-core
python examples/tracing_example.py
Prerequisites:
pip install python-dotenv
# For Splunk backend only:
pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http
# For Langfuse backend only:
pip install langfuse
"""
import os
import sys
import time
from pathlib import Path
from dotenv import load_dotenv
# ── Env ───────────────────────────────────────────────────────────────────────
env_path = Path(__file__).parent / ".env"
load_dotenv(env_path)
from gmf_forge_ai_shared_core.observability import BasicLogger
from gmf_forge_ai_shared_core.observability.tracing import (
LangfuseTracer,
NullTracer,
SplunkTracer,
get_tracer,
)
logger = BasicLogger(__name__)
# ══════════════════════════════════════════════════════════════════════════════
# Helpers — stand-ins for real RAG / LLM components
# ══════════════════════════════════════════════════════════════════════════════
def _fake_retrieve(question: str) -> list[dict]:
"""Simulate a vector retrieval call."""
time.sleep(0.05)
return [
{"id": "doc_1", "content": "Policy A defines the approval workflow."},
{"id": "doc_2", "content": "Policy B covers exception handling."},
]
def _fake_llm(prompt: str) -> tuple[str, int, int]:
"""Simulate an LLM call. Returns (reply, prompt_tokens, completion_tokens)."""
time.sleep(0.1)
return "Based on the retrieved context, the answer is...", 140, 60
# ══════════════════════════════════════════════════════════════════════════════
# Example 1 — null (no-op)
# ══════════════════════════════════════════════════════════════════════════════
def example_null_tracer():
logger.info("Starting example", example="1: NullTracer (no-op default)")
logger.info("Scenario",
note="When TRACING_PROVIDER is unset or 'null', get_tracer() returns a NullTracer",
behaviour="Every call is a no-op — zero overhead, no dependencies")
tracer = NullTracer()
logger.info("Provider", name=type(tracer).__name__)
question = "What is the approval workflow?"
with tracer.trace("rag_query", input=question, user_id="demo") as trace:
with trace.span("retrieval", input=question) as span:
docs = _fake_retrieve(question)
span.set_output({"doc_count": len(docs)})
prompt = f"Context: {docs[0]['content']}\nQuestion: {question}"
with trace.generation("llm", model="gpt-4o-mini", input=prompt) as gen:
reply, pt, ct = _fake_llm(prompt)
gen.set_output(reply)
gen.set_token_usage(prompt_tokens=pt, completion_tokens=ct)
trace.set_output(reply)
logger.info("Trace complete", trace_id=trace.trace_id, span_id=trace.id, answer=reply)
logger.info("NullTracer — all calls silently succeeded, nothing was sent anywhere")
# ══════════════════════════════════════════════════════════════════════════════
# Example 2 — Splunk
# ══════════════════════════════════════════════════════════════════════════════
def example_splunk_tracer():
logger.info("Starting example", example="2: SplunkTracer (Splunk Observability Cloud)")
logger.info("Scenario",
note="Exports spans to Splunk APM via OpenTelemetry OTLP/HTTP",
requires="SPLUNK_OTLP_ENDPOINT and SPLUNK_ACCESS_TOKEN in .env")
endpoint = os.getenv("SPLUNK_OTLP_ENDPOINT")
token = os.getenv("SPLUNK_ACCESS_TOKEN")
if not endpoint or not token:
logger.warning("SPLUNK_OTLP_ENDPOINT / SPLUNK_ACCESS_TOKEN not set — skipping live demo",
tip_1="Add TRACING_PROVIDER=splunk to your .env",
tip_2="Add SPLUNK_OTLP_ENDPOINT=https://ingest.<realm>.signalfx.com/v2/trace/otlp",
tip_3="Add SPLUNK_ACCESS_TOKEN=your-token",
tip_4="Optionally add SPLUNK_SERVICE_NAME=policy-rag",
install="pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http")
return
# In normal app code you'd just call get_tracer() — it reads the env vars.
tracer = get_tracer(reset=True)
logger.info("Provider", name=type(tracer).__name__)
question = "What are the exception handling steps?"
logger.info("Tracing RAG query", query=question)
with tracer.trace("rag_query", input=question, user_id="demo", session_id="sess_1") as trace:
with trace.span("retrieval", input=question) as span:
docs = _fake_retrieve(question)
span.set_output({"doc_count": len(docs)})
logger.info("Retrieval span", doc_count=len(docs), span_id=span.id[:8])
prompt = "\n".join(d["content"] for d in docs) + f"\n\nQuestion: {question}"
with trace.generation("llm", model="gpt-4o-mini", input=prompt) as gen:
reply, pt, ct = _fake_llm(prompt)
gen.set_output(reply)
gen.set_token_usage(prompt_tokens=pt, completion_tokens=ct)
logger.info("LLM generation span", prompt_tokens=pt, completion_tokens=ct, span_id=gen.id[:8])
trace.set_output(reply)
logger.info("Trace complete", trace_id=trace.trace_id)
tracer.flush()
logger.info("Spans exported to Splunk APM — check your Splunk Observability dashboard")
# ══════════════════════════════════════════════════════════════════════════════
# Example 3 — get_tracer() singleton with env-var switching
# ══════════════════════════════════════════════════════════════════════════════
def example_get_tracer():
logger.info("Starting example", example="3: get_tracer() — env-var driven singleton")
logger.info("Scenario",
note="get_tracer() reads TRACING_PROVIDER from env and returns a singleton",
benefit="Application code never imports a concrete provider class")
provider_name = os.getenv("TRACING_PROVIDER", "null")
tracer = get_tracer(reset=True)
logger.info("Provider resolved", env_var=provider_name, provider=type(tracer).__name__)
# Demonstrate agent run shortcut
with tracer.trace_agent_run("policy-agent", input="Summarise all exception policies") as trace:
with trace.span("tool:search_policies") as s:
time.sleep(0.02)
s.set_output({"results": 5})
with trace.span("tool:summarise") as s:
time.sleep(0.02)
s.set_output("Summary: ...")
logger.info("Agent trace complete", trace_id=trace.trace_id)
tracer.flush()
logger.info("Switching backends requires only a TRACING_PROVIDER env-var change")
# ══════════════════════════════════════════════════════════════════════════════
# Main
# ══════════════════════════════════════════════════════════════════════════════
def main():
logger.info("TRACING EXAMPLES",
note="Demonstrating the provider-agnostic tracing API",
tip="Set TRACING_PROVIDER in .env to switch backends without code changes")
logger.info("")
example_null_tracer()
logger.info("")
example_splunk_tracer()
logger.info("")
example_get_tracer()
logger.info("")
logger.info("SUMMARY")
logger.info("NullTracer", description="default, zero overhead, no dependencies")
logger.info("SplunkTracer", description="OpenTelemetry OTLP export to Splunk Observability Cloud")
logger.info("LangfuseTracer", description="Langfuse backend via LANGFUSE_PUBLIC_KEY/SECRET_KEY")
logger.info("Production tip", note="Switch backends by changing TRACING_PROVIDER env var — application code is identical for every provider")
if __name__ == "__main__":
main()
.env.example
# Azure OpenAI Configuration (Primary Provider)
AZURE_OPENAI_ENDPOINT=your-azure-openai-endpoint
AZURE_OPENAI_API_KEY=your-azure-openai-api-key
AZURE_OPENAI_DEPLOYMENT=your-azure-openai-deployment
AZURE_OPENAI_API_VERSION=your-azure-openai-api-version
# OpenAI Configuration (Fallback Provider)
OPENAI_API_KEY=sk-your-openai-api-key
# Anthropic Configuration (Optional)
ANTHROPIC_API_KEY=your-anthropic-api-key
# Application Settings
LOG_LEVEL=INFO
ENVIRONMENT=development
RELEASE=
# =============================================================================
# Tracing (gmf_forge_ai_shared_core.observability.tracing)
# Set TRACING_PROVIDER to activate a backend. Defaults to "null" (no-op).
# Supported values: null, splunk, langfuse
# =============================================================================
TRACING_PROVIDER=null
# Splunk Observability Cloud (TRACING_PROVIDER=splunk)
# Install: pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http
SPLUNK_OTLP_ENDPOINT=https://ingest.<realm>.signalfx.com/v2/trace/otlp
SPLUNK_ACCESS_TOKEN=your-splunk-access-token
SPLUNK_SERVICE_NAME=gmf-forge-ai
# Langfuse (TRACING_PROVIDER=langfuse)
# Install: pip install langfuse
LANGFUSE_PUBLIC_KEY=your-langfuse-public-key
LANGFUSE_SECRET_KEY=your-langfuse-secret-key
LANGFUSE_HOST=https://cloud.langfuse.com