gmf_forge_ai_shared_core.observability
Observability — logging, metrics, performance monitoring, and tracing.
1"""Observability — logging, metrics, performance monitoring, and tracing.""" 2 3from gmf_forge_ai_shared_core.observability.logger import BasicLogger 4from gmf_forge_ai_shared_core.observability.metrics_collector import BasicMetricsCollector 5from gmf_forge_ai_shared_core.observability.performance_monitor import BasicPerformanceMonitor 6from gmf_forge_ai_shared_core.observability.tracing import ( 7 TracingProvider, 8 TracingConfig, 9 Span, 10 NullTracer, 11 get_tracer, 12) 13 14__all__ = [ 15 "BasicLogger", 16 "BasicMetricsCollector", 17 "BasicPerformanceMonitor", 18 "TracingProvider", 19 "TracingConfig", 20 "Span", 21 "NullTracer", 22 "get_tracer", 23]
32class BasicLogger: 33 """ 34 Basic structured logger for platform AI applications. 35 36 Provides consistent logging across all packages. Respects the 37 ``LOG_LEVEL`` environment variable (default ``INFO``). 38 """ 39 40 def __init__(self, name: str): 41 """Initialize logger with a name.""" 42 self.logger = structlog.get_logger(name) 43 44 def info(self, message: str, **kwargs: Any) -> None: 45 self.logger.info(message, **kwargs) 46 47 def error(self, message: str, **kwargs: Any) -> None: 48 self.logger.error(message, **kwargs) 49 50 def warning(self, message: str, **kwargs: Any) -> None: 51 self.logger.warning(message, **kwargs) 52 53 def debug(self, message: str, **kwargs: Any) -> None: 54 self.logger.debug(message, **kwargs)
Basic structured logger for platform AI applications.
Provides consistent logging across all packages. Respects the
LOG_LEVEL environment variable (default INFO).
40 def __init__(self, name: str): 41 """Initialize logger with a name.""" 42 self.logger = structlog.get_logger(name)
Initialize logger with a name.
8class BasicMetricsCollector: 9 """ 10 Basic metrics collector for tracking application metrics. 11 12 Tracks counters, gauges, and histograms. 13 """ 14 15 def __init__(self): 16 """Initialize metrics collector.""" 17 self._counters: Dict[str, int] = defaultdict(int) 18 self._gauges: Dict[str, float] = {} 19 self._histograms: Dict[str, list] = defaultdict(list) 20 21 def increment(self, metric_name: str, value: int = 1, **tags: Any) -> None: 22 """Increment a counter.""" 23 key = self._make_key(metric_name, tags) 24 self._counters[key] += value 25 26 def gauge(self, metric_name: str, value: float, **tags: Any) -> None: 27 """Set a gauge value.""" 28 key = self._make_key(metric_name, tags) 29 self._gauges[key] = value 30 31 def histogram(self, metric_name: str, value: float, **tags: Any) -> None: 32 """Record a histogram value.""" 33 key = self._make_key(metric_name, tags) 34 self._histograms[key].append(value) 35 36 def get_metrics(self) -> Dict[str, Any]: 37 """Get all metrics.""" 38 return { 39 "counters": dict(self._counters), 40 "gauges": dict(self._gauges), 41 "histograms": dict(self._histograms), 42 } 43 44 @staticmethod 45 def _make_key(metric_name: str, tags: Dict[str, Any]) -> str: 46 """Create a unique key from metric name and tags.""" 47 if not tags: 48 return metric_name 49 tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items())) 50 return f"{metric_name}[{tag_str}]"
Basic metrics collector for tracking application metrics.
Tracks counters, gauges, and histograms.
15 def __init__(self): 16 """Initialize metrics collector.""" 17 self._counters: Dict[str, int] = defaultdict(int) 18 self._gauges: Dict[str, float] = {} 19 self._histograms: Dict[str, list] = defaultdict(list)
Initialize metrics collector.
21 def increment(self, metric_name: str, value: int = 1, **tags: Any) -> None: 22 """Increment a counter.""" 23 key = self._make_key(metric_name, tags) 24 self._counters[key] += value
Increment a counter.
26 def gauge(self, metric_name: str, value: float, **tags: Any) -> None: 27 """Set a gauge value.""" 28 key = self._make_key(metric_name, tags) 29 self._gauges[key] = value
Set a gauge value.
33class BasicPerformanceMonitor: 34 """ 35 Monitor LLM performance with basic metrics collection. 36 37 Tracks: 38 - Request latency (time to first token, total time) 39 - Token usage (prompt, completion, total) 40 - Throughput (tokens per second) 41 - Success/failure rates 42 - Provider-level aggregates 43 44 Example: 45 >>> monitor = BasicPerformanceMonitor() 46 >>> 47 >>> # Start tracking a request 48 >>> request_id = monitor.start_request("azure", "gpt-4") 49 >>> 50 >>> # ... make LLM call ... 51 >>> 52 >>> # Record completion 53 >>> monitor.end_request( 54 ... request_id=request_id, 55 ... prompt_tokens=100, 56 ... completion_tokens=50, 57 ... success=True 58 ... ) 59 >>> 60 >>> # Get statistics 61 >>> stats = monitor.get_stats() 62 >>> print(f"Average latency: {stats['avg_latency_ms']:.2f}ms") 63 """ 64 65 def __init__(self): 66 """Initialize the performance monitor.""" 67 self._active_requests: Dict[str, Dict[str, Any]] = {} 68 self._completed_requests: List[PerformanceMetrics] = [] 69 self._provider_stats: Dict[str, Dict[str, Any]] = defaultdict( 70 lambda: { 71 "total_requests": 0, 72 "successful_requests": 0, 73 "failed_requests": 0, 74 "total_tokens": 0, 75 "total_latency_ms": 0.0, 76 } 77 ) 78 79 def start_request( 80 self, 81 provider: str, 82 model: str, 83 request_id: Optional[str] = None, 84 **metadata: Any 85 ) -> str: 86 """ 87 Start tracking a new request. 88 89 Args: 90 provider: Provider name (e.g., "azure", "openai") 91 model: Model name 92 request_id: Optional request ID (generated if not provided) 93 **metadata: Additional metadata to track 94 95 Returns: 96 Request ID for tracking 97 """ 98 if request_id is None: 99 request_id = f"{provider}_{model}_{int(time.time() * 1000000)}" 100 101 self._active_requests[request_id] = { 102 "provider": provider, 103 "model": model, 104 "start_time": datetime.now(timezone.utc), 105 "metadata": metadata 106 } 107 108 return request_id 109 110 def end_request( 111 self, 112 request_id: str, 113 prompt_tokens: int, 114 completion_tokens: int, 115 success: bool = True, 116 error: Optional[str] = None 117 ) -> PerformanceMetrics: 118 """ 119 End tracking for a request and record metrics. 120 121 Args: 122 request_id: Request ID from start_request() 123 prompt_tokens: Number of tokens in prompt 124 completion_tokens: Number of tokens in completion 125 success: Whether request succeeded 126 error: Optional error message if failed 127 128 Returns: 129 PerformanceMetrics object with calculated metrics 130 131 Raises: 132 KeyError: If request_id not found 133 """ 134 if request_id not in self._active_requests: 135 raise KeyError(f"Request ID '{request_id}' not found in active requests") 136 137 request_data = self._active_requests.pop(request_id) 138 end_time = datetime.now(timezone.utc) 139 start_time = request_data["start_time"] 140 141 # Calculate metrics 142 latency_ms = (end_time - start_time).total_seconds() * 1000 143 total_tokens = prompt_tokens + completion_tokens 144 tokens_per_second = total_tokens / (latency_ms / 1000) if latency_ms > 0 else 0 145 146 # Create metrics object 147 metrics = PerformanceMetrics( 148 request_id=request_id, 149 provider=request_data["provider"], 150 model=request_data["model"], 151 start_time=start_time, 152 end_time=end_time, 153 latency_ms=latency_ms, 154 prompt_tokens=prompt_tokens, 155 completion_tokens=completion_tokens, 156 total_tokens=total_tokens, 157 tokens_per_second=tokens_per_second, 158 success=success, 159 error=error, 160 metadata=request_data["metadata"] 161 ) 162 163 # Store completed request 164 self._completed_requests.append(metrics) 165 166 # Update provider stats 167 provider_stats = self._provider_stats[request_data["provider"]] 168 provider_stats["total_requests"] += 1 169 if success: 170 provider_stats["successful_requests"] += 1 171 else: 172 provider_stats["failed_requests"] += 1 173 provider_stats["total_tokens"] += total_tokens 174 provider_stats["total_latency_ms"] += latency_ms 175 176 return metrics 177 178 def get_stats( 179 self, 180 provider: Optional[str] = None, 181 last_n_requests: Optional[int] = None 182 ) -> Dict[str, Any]: 183 """ 184 Get aggregated performance statistics. 185 186 Args: 187 provider: Optional provider to filter by 188 last_n_requests: Optional limit to last N requests 189 190 Returns: 191 Dictionary containing performance statistics 192 """ 193 # Filter requests 194 requests = self._completed_requests 195 if provider: 196 requests = [r for r in requests if r.provider == provider] 197 if last_n_requests: 198 requests = requests[-last_n_requests:] 199 200 if not requests: 201 return { 202 "total_requests": 0, 203 "successful_requests": 0, 204 "failed_requests": 0, 205 "success_rate": 0.0, 206 "avg_latency_ms": 0.0, 207 "min_latency_ms": 0.0, 208 "max_latency_ms": 0.0, 209 "avg_tokens_per_second": 0.0, 210 "total_tokens": 0, 211 "total_prompt_tokens": 0, 212 "total_completion_tokens": 0, 213 "avg_prompt_tokens": 0.0, 214 "avg_completion_tokens": 0.0 215 } 216 217 # Calculate statistics 218 total_requests = len(requests) 219 successful_requests = sum(1 for r in requests if r.success) 220 failed_requests = total_requests - successful_requests 221 222 latencies = [r.latency_ms for r in requests] 223 tokens_per_sec = [r.tokens_per_second for r in requests] 224 225 return { 226 "total_requests": total_requests, 227 "successful_requests": successful_requests, 228 "failed_requests": failed_requests, 229 "success_rate": successful_requests / total_requests if total_requests > 0 else 0.0, 230 "avg_latency_ms": sum(latencies) / len(latencies), 231 "min_latency_ms": min(latencies), 232 "max_latency_ms": max(latencies), 233 "avg_tokens_per_second": sum(tokens_per_sec) / len(tokens_per_sec), 234 "total_tokens": sum(r.total_tokens for r in requests), 235 "total_prompt_tokens": sum(r.prompt_tokens for r in requests), 236 "total_completion_tokens": sum(r.completion_tokens for r in requests), 237 "avg_prompt_tokens": sum(r.prompt_tokens for r in requests) / total_requests, 238 "avg_completion_tokens": sum(r.completion_tokens for r in requests) / total_requests 239 } 240 241 def get_provider_stats(self) -> Dict[str, Dict[str, Any]]: 242 """ 243 Get statistics grouped by provider. 244 245 Returns: 246 Dictionary mapping provider names to their statistics 247 """ 248 result = {} 249 for provider, stats in self._provider_stats.items(): 250 total = stats["total_requests"] 251 if total > 0: 252 avg_latency_ms = stats["total_latency_ms"] / total 253 avg_tokens_per_request = stats["total_tokens"] / total 254 # Calculate tokens per second (throughput) 255 avg_tokens_per_second = (stats["total_tokens"] / stats["total_latency_ms"] * 1000) if stats["total_latency_ms"] > 0 else 0 256 257 result[provider] = { 258 "total_requests": total, 259 "successful_requests": stats["successful_requests"], 260 "failed_requests": stats["failed_requests"], 261 "success_rate": stats["successful_requests"] / total, 262 "avg_latency_ms": avg_latency_ms, 263 "total_tokens": stats["total_tokens"], 264 "avg_tokens_per_request": avg_tokens_per_request, 265 "avg_tokens_per_second": avg_tokens_per_second 266 } 267 return result 268 269 def get_recent_requests( 270 self, 271 count: int = 10, 272 provider: Optional[str] = None 273 ) -> List[PerformanceMetrics]: 274 """ 275 Get recent requests. 276 277 Args: 278 count: Number of recent requests to return 279 provider: Optional provider to filter by 280 281 Returns: 282 List of recent PerformanceMetrics 283 """ 284 requests = self._completed_requests 285 if provider: 286 requests = [r for r in requests if r.provider == provider] 287 return requests[-count:] 288 289 def clear_history(self) -> None: 290 """Clear all historical metrics (keep active requests).""" 291 self._completed_requests.clear() 292 self._provider_stats.clear() 293 294 def __len__(self) -> int: 295 """Return number of completed requests.""" 296 return len(self._completed_requests)
Monitor LLM performance with basic metrics collection.
Tracks:
- Request latency (time to first token, total time)
- Token usage (prompt, completion, total)
- Throughput (tokens per second)
- Success/failure rates
- Provider-level aggregates
Example:
monitor = BasicPerformanceMonitor()
Start tracking a request
request_id = monitor.start_request("azure", "gpt-4")
... make LLM call ...
Record completion
monitor.end_request( ... request_id=request_id, ... prompt_tokens=100, ... completion_tokens=50, ... success=True ... )
Get statistics
stats = monitor.get_stats() print(f"Average latency: {stats['avg_latency_ms']:.2f}ms")
65 def __init__(self): 66 """Initialize the performance monitor.""" 67 self._active_requests: Dict[str, Dict[str, Any]] = {} 68 self._completed_requests: List[PerformanceMetrics] = [] 69 self._provider_stats: Dict[str, Dict[str, Any]] = defaultdict( 70 lambda: { 71 "total_requests": 0, 72 "successful_requests": 0, 73 "failed_requests": 0, 74 "total_tokens": 0, 75 "total_latency_ms": 0.0, 76 } 77 )
Initialize the performance monitor.
79 def start_request( 80 self, 81 provider: str, 82 model: str, 83 request_id: Optional[str] = None, 84 **metadata: Any 85 ) -> str: 86 """ 87 Start tracking a new request. 88 89 Args: 90 provider: Provider name (e.g., "azure", "openai") 91 model: Model name 92 request_id: Optional request ID (generated if not provided) 93 **metadata: Additional metadata to track 94 95 Returns: 96 Request ID for tracking 97 """ 98 if request_id is None: 99 request_id = f"{provider}_{model}_{int(time.time() * 1000000)}" 100 101 self._active_requests[request_id] = { 102 "provider": provider, 103 "model": model, 104 "start_time": datetime.now(timezone.utc), 105 "metadata": metadata 106 } 107 108 return request_id
Start tracking a new request.
Args: provider: Provider name (e.g., "azure", "openai") model: Model name request_id: Optional request ID (generated if not provided) **metadata: Additional metadata to track
Returns: Request ID for tracking
110 def end_request( 111 self, 112 request_id: str, 113 prompt_tokens: int, 114 completion_tokens: int, 115 success: bool = True, 116 error: Optional[str] = None 117 ) -> PerformanceMetrics: 118 """ 119 End tracking for a request and record metrics. 120 121 Args: 122 request_id: Request ID from start_request() 123 prompt_tokens: Number of tokens in prompt 124 completion_tokens: Number of tokens in completion 125 success: Whether request succeeded 126 error: Optional error message if failed 127 128 Returns: 129 PerformanceMetrics object with calculated metrics 130 131 Raises: 132 KeyError: If request_id not found 133 """ 134 if request_id not in self._active_requests: 135 raise KeyError(f"Request ID '{request_id}' not found in active requests") 136 137 request_data = self._active_requests.pop(request_id) 138 end_time = datetime.now(timezone.utc) 139 start_time = request_data["start_time"] 140 141 # Calculate metrics 142 latency_ms = (end_time - start_time).total_seconds() * 1000 143 total_tokens = prompt_tokens + completion_tokens 144 tokens_per_second = total_tokens / (latency_ms / 1000) if latency_ms > 0 else 0 145 146 # Create metrics object 147 metrics = PerformanceMetrics( 148 request_id=request_id, 149 provider=request_data["provider"], 150 model=request_data["model"], 151 start_time=start_time, 152 end_time=end_time, 153 latency_ms=latency_ms, 154 prompt_tokens=prompt_tokens, 155 completion_tokens=completion_tokens, 156 total_tokens=total_tokens, 157 tokens_per_second=tokens_per_second, 158 success=success, 159 error=error, 160 metadata=request_data["metadata"] 161 ) 162 163 # Store completed request 164 self._completed_requests.append(metrics) 165 166 # Update provider stats 167 provider_stats = self._provider_stats[request_data["provider"]] 168 provider_stats["total_requests"] += 1 169 if success: 170 provider_stats["successful_requests"] += 1 171 else: 172 provider_stats["failed_requests"] += 1 173 provider_stats["total_tokens"] += total_tokens 174 provider_stats["total_latency_ms"] += latency_ms 175 176 return metrics
End tracking for a request and record metrics.
Args: request_id: Request ID from start_request() prompt_tokens: Number of tokens in prompt completion_tokens: Number of tokens in completion success: Whether request succeeded error: Optional error message if failed
Returns: PerformanceMetrics object with calculated metrics
Raises: KeyError: If request_id not found
178 def get_stats( 179 self, 180 provider: Optional[str] = None, 181 last_n_requests: Optional[int] = None 182 ) -> Dict[str, Any]: 183 """ 184 Get aggregated performance statistics. 185 186 Args: 187 provider: Optional provider to filter by 188 last_n_requests: Optional limit to last N requests 189 190 Returns: 191 Dictionary containing performance statistics 192 """ 193 # Filter requests 194 requests = self._completed_requests 195 if provider: 196 requests = [r for r in requests if r.provider == provider] 197 if last_n_requests: 198 requests = requests[-last_n_requests:] 199 200 if not requests: 201 return { 202 "total_requests": 0, 203 "successful_requests": 0, 204 "failed_requests": 0, 205 "success_rate": 0.0, 206 "avg_latency_ms": 0.0, 207 "min_latency_ms": 0.0, 208 "max_latency_ms": 0.0, 209 "avg_tokens_per_second": 0.0, 210 "total_tokens": 0, 211 "total_prompt_tokens": 0, 212 "total_completion_tokens": 0, 213 "avg_prompt_tokens": 0.0, 214 "avg_completion_tokens": 0.0 215 } 216 217 # Calculate statistics 218 total_requests = len(requests) 219 successful_requests = sum(1 for r in requests if r.success) 220 failed_requests = total_requests - successful_requests 221 222 latencies = [r.latency_ms for r in requests] 223 tokens_per_sec = [r.tokens_per_second for r in requests] 224 225 return { 226 "total_requests": total_requests, 227 "successful_requests": successful_requests, 228 "failed_requests": failed_requests, 229 "success_rate": successful_requests / total_requests if total_requests > 0 else 0.0, 230 "avg_latency_ms": sum(latencies) / len(latencies), 231 "min_latency_ms": min(latencies), 232 "max_latency_ms": max(latencies), 233 "avg_tokens_per_second": sum(tokens_per_sec) / len(tokens_per_sec), 234 "total_tokens": sum(r.total_tokens for r in requests), 235 "total_prompt_tokens": sum(r.prompt_tokens for r in requests), 236 "total_completion_tokens": sum(r.completion_tokens for r in requests), 237 "avg_prompt_tokens": sum(r.prompt_tokens for r in requests) / total_requests, 238 "avg_completion_tokens": sum(r.completion_tokens for r in requests) / total_requests 239 }
Get aggregated performance statistics.
Args: provider: Optional provider to filter by last_n_requests: Optional limit to last N requests
Returns: Dictionary containing performance statistics
241 def get_provider_stats(self) -> Dict[str, Dict[str, Any]]: 242 """ 243 Get statistics grouped by provider. 244 245 Returns: 246 Dictionary mapping provider names to their statistics 247 """ 248 result = {} 249 for provider, stats in self._provider_stats.items(): 250 total = stats["total_requests"] 251 if total > 0: 252 avg_latency_ms = stats["total_latency_ms"] / total 253 avg_tokens_per_request = stats["total_tokens"] / total 254 # Calculate tokens per second (throughput) 255 avg_tokens_per_second = (stats["total_tokens"] / stats["total_latency_ms"] * 1000) if stats["total_latency_ms"] > 0 else 0 256 257 result[provider] = { 258 "total_requests": total, 259 "successful_requests": stats["successful_requests"], 260 "failed_requests": stats["failed_requests"], 261 "success_rate": stats["successful_requests"] / total, 262 "avg_latency_ms": avg_latency_ms, 263 "total_tokens": stats["total_tokens"], 264 "avg_tokens_per_request": avg_tokens_per_request, 265 "avg_tokens_per_second": avg_tokens_per_second 266 } 267 return result
Get statistics grouped by provider.
Returns: Dictionary mapping provider names to their statistics
269 def get_recent_requests( 270 self, 271 count: int = 10, 272 provider: Optional[str] = None 273 ) -> List[PerformanceMetrics]: 274 """ 275 Get recent requests. 276 277 Args: 278 count: Number of recent requests to return 279 provider: Optional provider to filter by 280 281 Returns: 282 List of recent PerformanceMetrics 283 """ 284 requests = self._completed_requests 285 if provider: 286 requests = [r for r in requests if r.provider == provider] 287 return requests[-count:]
Get recent requests.
Args: count: Number of recent requests to return provider: Optional provider to filter by
Returns: List of recent PerformanceMetrics
206class TracingProvider(ABC): 207 """ 208 Abstract tracing provider. 209 210 Implement this class to add a new backend. Applications only ever 211 import :class:`TracingProvider` and call :func:`get_tracer` — they 212 have no knowledge of the concrete backend in use. 213 """ 214 215 # ── Core operations ─────────────────────────────────────────────────────── 216 217 @abstractmethod 218 def trace(self, name: str, **kwargs: Any) -> Span: 219 """ 220 Start a new root trace. 221 222 Args: 223 name: Human-readable name for the operation (e.g. ``"rag_query"``). 224 **kwargs: Optional fields forwarded to the backend: 225 - ``input`` — the operation input (question, prompt, …) 226 - ``user_id`` — end-user identifier for session grouping 227 - ``session_id`` — conversation or session ID 228 - ``metadata`` — dict of arbitrary key/value pairs 229 - ``tags`` — list of string tags 230 231 Returns: 232 A :class:`Span` to use as a context manager. 233 """ 234 235 @abstractmethod 236 def score( 237 self, 238 trace_id: str, 239 name: str, 240 value: float, 241 comment: str = "", 242 ) -> None: 243 """ 244 Attach a quality score to a completed trace. 245 246 Args: 247 trace_id: ID of the trace to score (``span.trace_id``). 248 name: Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``. 249 value: Numeric score — typically 0.0–1.0 but provider-specific. 250 comment: Optional free-text explanation. 251 """ 252 253 @abstractmethod 254 def flush(self) -> None: 255 """Flush any buffered events to the backend. Call before process exit.""" 256 257 @abstractmethod 258 def shutdown(self) -> None: 259 """Gracefully shut down the provider and release resources.""" 260 261 # ── Convenience helpers (non-abstract, built on trace()) ────────────────── 262 263 def trace_rag_query( 264 self, 265 question: str, 266 user_id: str = "", 267 session_id: str = "", 268 **metadata: Any, 269 ) -> Span: 270 """Shortcut to open a root trace for a RAG question-answering call.""" 271 return self.trace( 272 "rag_query", 273 input=question, 274 user_id=user_id or None, 275 session_id=session_id or None, 276 metadata=metadata or None, 277 tags=["rag"], 278 ) 279 280 def trace_agent_run( 281 self, 282 agent_name: str, 283 input: Any, 284 user_id: str = "", 285 session_id: str = "", 286 **metadata: Any, 287 ) -> Span: 288 """Shortcut to open a root trace for an agent invocation.""" 289 return self.trace( 290 f"agent:{agent_name}", 291 input=input, 292 user_id=user_id or None, 293 session_id=session_id or None, 294 metadata=metadata or None, 295 tags=["agent", agent_name], 296 )
Abstract tracing provider.
Implement this class to add a new backend. Applications only ever
import TracingProvider and call get_tracer() — they
have no knowledge of the concrete backend in use.
217 @abstractmethod 218 def trace(self, name: str, **kwargs: Any) -> Span: 219 """ 220 Start a new root trace. 221 222 Args: 223 name: Human-readable name for the operation (e.g. ``"rag_query"``). 224 **kwargs: Optional fields forwarded to the backend: 225 - ``input`` — the operation input (question, prompt, …) 226 - ``user_id`` — end-user identifier for session grouping 227 - ``session_id`` — conversation or session ID 228 - ``metadata`` — dict of arbitrary key/value pairs 229 - ``tags`` — list of string tags 230 231 Returns: 232 A :class:`Span` to use as a context manager. 233 """
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
235 @abstractmethod 236 def score( 237 self, 238 trace_id: str, 239 name: str, 240 value: float, 241 comment: str = "", 242 ) -> None: 243 """ 244 Attach a quality score to a completed trace. 245 246 Args: 247 trace_id: ID of the trace to score (``span.trace_id``). 248 name: Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``. 249 value: Numeric score — typically 0.0–1.0 but provider-specific. 250 comment: Optional free-text explanation. 251 """
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
253 @abstractmethod 254 def flush(self) -> None: 255 """Flush any buffered events to the backend. Call before process exit."""
Flush any buffered events to the backend. Call before process exit.
257 @abstractmethod 258 def shutdown(self) -> None: 259 """Gracefully shut down the provider and release resources."""
Gracefully shut down the provider and release resources.
263 def trace_rag_query( 264 self, 265 question: str, 266 user_id: str = "", 267 session_id: str = "", 268 **metadata: Any, 269 ) -> Span: 270 """Shortcut to open a root trace for a RAG question-answering call.""" 271 return self.trace( 272 "rag_query", 273 input=question, 274 user_id=user_id or None, 275 session_id=session_id or None, 276 metadata=metadata or None, 277 tags=["rag"], 278 )
Shortcut to open a root trace for a RAG question-answering call.
280 def trace_agent_run( 281 self, 282 agent_name: str, 283 input: Any, 284 user_id: str = "", 285 session_id: str = "", 286 **metadata: Any, 287 ) -> Span: 288 """Shortcut to open a root trace for an agent invocation.""" 289 return self.trace( 290 f"agent:{agent_name}", 291 input=input, 292 user_id=user_id or None, 293 session_id=session_id or None, 294 metadata=metadata or None, 295 tags=["agent", agent_name], 296 )
Shortcut to open a root trace for an agent invocation.
303@dataclass 304class TracingConfig: 305 """ 306 Configuration for a tracing provider, loaded from env vars by 307 :func:`get_tracer`. 308 309 All fields are optional — providers only read what they need. 310 """ 311 provider: str = "null" 312 313 # Langfuse 314 langfuse_public_key: str = "" 315 langfuse_secret_key: str = "" 316 langfuse_host: str = "https://cloud.langfuse.com" 317 318 # LangSmith 319 langsmith_api_key: str = "" 320 langsmith_project: str = "default" 321 langsmith_endpoint: str = "https://api.smith.langchain.com" 322 323 # Azure AI Foundry 324 azure_ai_connection_string: str = "" # from AI Foundry project settings 325 azure_ai_project_name: str = "" 326 327 # Splunk (via OpenTelemetry OTLP) 328 splunk_otlp_endpoint: str = "" 329 splunk_access_token: str = "" 330 splunk_service_name: str = "gmf-forge-ai" 331 332 # Common 333 environment: str = "development" 334 release: str = "" 335 extra: Dict[str, Any] = field(default_factory=dict)
Configuration for a tracing provider, loaded from env vars by
get_tracer().
All fields are optional — providers only read what they need.
84class Span: 85 """ 86 A single unit of work inside a trace. 87 88 Use as a context manager:: 89 90 with tracer.trace("rag_query", input=question) as span: 91 span.set_output(answer) 92 93 Instances are returned by :meth:`TracingProvider.trace`, 94 :meth:`Span.span`, and :meth:`Span.generation`. Child spans created 95 via :meth:`Span.span` or :meth:`Span.generation` are automatically 96 linked to the parent. 97 98 Attributes: 99 id: Unique identifier for this span/generation. 100 trace_id: Identifier of the root trace this span belongs to. 101 name: Human-readable operation name. 102 is_root: True when this span *is* the root trace. 103 """ 104 105 def __init__( 106 self, 107 name: str, 108 trace_id: Optional[str] = None, 109 span_id: Optional[str] = None, 110 is_root: bool = False, 111 kind: str = "span", # "trace" | "span" | "generation" 112 ): 113 self.name = name 114 self.id: str = span_id or str(uuid.uuid4()) 115 self.trace_id: str = trace_id or self.id 116 self.is_root = is_root 117 self.kind = kind 118 119 self._input: Any = None 120 self._output: Any = None 121 self._metadata: Dict[str, Any] = {} 122 self._error: Optional[Exception] = None 123 self._token_usage: Dict[str, int] = {} 124 self._model: Optional[str] = None 125 self._active_token: Any = None # contextvars token for cleanup 126 127 # ── Mutators ───────────────────────────────────────────────────────────── 128 129 def set_input(self, input: Any) -> "Span": 130 self._input = input 131 return self 132 133 def set_output(self, output: Any) -> "Span": 134 self._output = output 135 return self 136 137 def set_metadata(self, **kwargs: Any) -> "Span": 138 self._metadata.update(kwargs) 139 return self 140 141 def set_error(self, error: Exception) -> "Span": 142 self._error = error 143 return self 144 145 def set_token_usage( 146 self, 147 prompt_tokens: int = 0, 148 completion_tokens: int = 0, 149 total_tokens: Optional[int] = None, 150 ) -> "Span": 151 self._token_usage = { 152 "prompt_tokens": prompt_tokens, 153 "completion_tokens": completion_tokens, 154 } 155 if total_tokens is not None: 156 self._token_usage["total_tokens"] = total_tokens 157 return self 158 159 # ── Child span factories (overridden by backend implementations) ────────── 160 161 def span(self, name: str, **kwargs: Any) -> "Span": 162 """Create a child span under this span. Override in subclasses.""" 163 return Span(name=name, trace_id=self.trace_id, kind="span") 164 165 def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span": 166 """Create a child generation span (LLM call). Override in subclasses.""" 167 s = Span(name=name, trace_id=self.trace_id, kind="generation") 168 s._model = model 169 return s 170 171 # ── Context manager ─────────────────────────────────────────────────────── 172 173 def __enter__(self) -> "Span": 174 # Track this span as active in the current context for decorator nesting 175 self._active_token = _set_active_span(self) 176 return self 177 178 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 179 if exc_val is not None: 180 self.set_error(exc_val) 181 self._end() 182 # Restore previous active span (only if token was set) 183 if self._active_token is not None: 184 try: 185 _active_span.reset(self._active_token) # type: ignore[attr-defined] 186 except (AttributeError, LookupError): 187 pass 188 return False # do not suppress exceptions 189 190 def _end(self) -> None: 191 """Called when the context manager exits. Override to flush to backend.""" 192 193 # ── Helpers ─────────────────────────────────────────────────────────────── 194 195 def __repr__(self) -> str: 196 return ( 197 f"<Span name={self.name!r} id={self.id[:8]} " 198 f"trace_id={self.trace_id[:8]} kind={self.kind}>" 199 )
A single unit of work inside a trace.
Use as a context manager::
with tracer.trace("rag_query", input=question) as span:
span.set_output(answer)
Instances are returned by TracingProvider.trace(),
Span.span(), and Span.generation(). Child spans created
via Span.span() or Span.generation() are automatically
linked to the parent.
Attributes: id: Unique identifier for this span/generation. trace_id: Identifier of the root trace this span belongs to. name: Human-readable operation name. is_root: True when this span is the root trace.
105 def __init__( 106 self, 107 name: str, 108 trace_id: Optional[str] = None, 109 span_id: Optional[str] = None, 110 is_root: bool = False, 111 kind: str = "span", # "trace" | "span" | "generation" 112 ): 113 self.name = name 114 self.id: str = span_id or str(uuid.uuid4()) 115 self.trace_id: str = trace_id or self.id 116 self.is_root = is_root 117 self.kind = kind 118 119 self._input: Any = None 120 self._output: Any = None 121 self._metadata: Dict[str, Any] = {} 122 self._error: Optional[Exception] = None 123 self._token_usage: Dict[str, int] = {} 124 self._model: Optional[str] = None 125 self._active_token: Any = None # contextvars token for cleanup
145 def set_token_usage( 146 self, 147 prompt_tokens: int = 0, 148 completion_tokens: int = 0, 149 total_tokens: Optional[int] = None, 150 ) -> "Span": 151 self._token_usage = { 152 "prompt_tokens": prompt_tokens, 153 "completion_tokens": completion_tokens, 154 } 155 if total_tokens is not None: 156 self._token_usage["total_tokens"] = total_tokens 157 return self
161 def span(self, name: str, **kwargs: Any) -> "Span": 162 """Create a child span under this span. Override in subclasses.""" 163 return Span(name=name, trace_id=self.trace_id, kind="span")
Create a child span under this span. Override in subclasses.
165 def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span": 166 """Create a child generation span (LLM call). Override in subclasses.""" 167 s = Span(name=name, trace_id=self.trace_id, kind="generation") 168 s._model = model 169 return s
Create a child generation span (LLM call). Override in subclasses.
68class NullTracer(TracingProvider): 69 """ 70 Default tracing provider — logs span lifecycle to the console. 71 72 Used when ``TRACING_PROVIDER`` is not set (or set to ``"null"`` / 73 ``"none"``). Each span emits a *started* and *finished* log line so 74 the tracing structure is visible during local development and in 75 example runs, without shipping data to any backend. 76 77 Switch to a real backend at any time by setting ``TRACING_PROVIDER`` 78 in ``.env`` — application code stays untouched. 79 80 Example:: 81 82 tracer = NullTracer() 83 84 with tracer.trace("rag_query", input=question) as trace: 85 with trace.span("retrieval", input=question) as span: 86 docs = retriever.retrieve(question) 87 span.set_output({"doc_count": len(docs)}) 88 """ 89 90 def trace(self, name: str, **kwargs: Any) -> _NullSpan: 91 span = _NullSpan(name=name, is_root=True, kind="trace") 92 if "input" in kwargs: 93 span.set_input(kwargs["input"]) 94 return span 95 96 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 97 log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value} 98 if comment: 99 log_kw["comment"] = comment 100 _logger.info("[trace] score", **log_kw) 101 102 def flush(self) -> None: 103 pass 104 105 def shutdown(self) -> None: 106 pass
Default tracing provider — logs span lifecycle to the console.
Used when TRACING_PROVIDER is not set (or set to "null" /
"none"). Each span emits a started and finished log line so
the tracing structure is visible during local development and in
example runs, without shipping data to any backend.
Switch to a real backend at any time by setting TRACING_PROVIDER
in .env — application code stays untouched.
Example::
tracer = NullTracer()
with tracer.trace("rag_query", input=question) as trace:
with trace.span("retrieval", input=question) as span:
docs = retriever.retrieve(question)
span.set_output({"doc_count": len(docs)})
90 def trace(self, name: str, **kwargs: Any) -> _NullSpan: 91 span = _NullSpan(name=name, is_root=True, kind="trace") 92 if "input" in kwargs: 93 span.set_input(kwargs["input"]) 94 return span
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
96 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 97 log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value} 98 if comment: 99 log_kw["comment"] = comment 100 _logger.info("[trace] score", **log_kw)
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
112def get_tracer(reset: bool = False) -> TracingProvider: 113 """ 114 Return the process-wide tracing provider, initialising it on first call. 115 116 Reads configuration from environment variables. Pass ``reset=True`` 117 to force re-initialisation (useful in tests). 118 119 Environment variables 120 ~~~~~~~~~~~~~~~~~~~~~ 121 TRACING_PROVIDER 122 Which backend to use. One of: ``null``, ``splunk``, ``langfuse``. 123 Defaults to ``null`` when not set. 124 125 (Splunk) 126 SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint 127 SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token 128 SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai) 129 130 (Langfuse) 131 LANGFUSE_PUBLIC_KEY Langfuse public key 132 LANGFUSE_SECRET_KEY Langfuse secret key 133 LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com) 134 135 (Common) 136 ENVIRONMENT Deployment environment tag (default: development) 137 RELEASE App release/version tag (optional) 138 139 Returns: 140 A :class:`TracingProvider` instance ready to use. 141 """ 142 global _tracer_instance 143 if _tracer_instance is not None and not reset: 144 return _tracer_instance 145 146 config = _config_from_env() 147 provider_key = config.provider.lower().strip() 148 provider_cls = _PROVIDERS.get(provider_key, NullTracer) 149 150 if provider_key not in _PROVIDERS: 151 _logger.warning( 152 "Unknown TRACING_PROVIDER — falling back to NullTracer", 153 provider=provider_key, 154 supported=list(_PROVIDERS), 155 ) 156 157 _tracer_instance = provider_cls(config) if provider_cls is not NullTracer else NullTracer() 158 return _tracer_instance
Return the process-wide tracing provider, initialising it on first call.
Reads configuration from environment variables. Pass reset=True
to force re-initialisation (useful in tests).
Environment variables
~~~~~
TRACING_PROVIDER
Which backend to use. One of: null, splunk, langfuse.
Defaults to null when not set.
(Splunk) SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai)
(Langfuse) LANGFUSE_PUBLIC_KEY Langfuse public key LANGFUSE_SECRET_KEY Langfuse secret key LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com)
(Common) ENVIRONMENT Deployment environment tag (default: development) RELEASE App release/version tag (optional)
Returns:
A TracingProvider instance ready to use.