gmf_forge_ai_shared_core.observability

Observability — logging, metrics, performance monitoring, and tracing.

 1"""Observability — logging, metrics, performance monitoring, and tracing."""
 2
 3from gmf_forge_ai_shared_core.observability.logger import BasicLogger
 4from gmf_forge_ai_shared_core.observability.metrics_collector import BasicMetricsCollector
 5from gmf_forge_ai_shared_core.observability.performance_monitor import BasicPerformanceMonitor
 6from gmf_forge_ai_shared_core.observability.tracing import (
 7    TracingProvider,
 8    TracingConfig,
 9    Span,
10    NullTracer,
11    get_tracer,
12)
13
14__all__ = [
15    "BasicLogger",
16    "BasicMetricsCollector",
17    "BasicPerformanceMonitor",
18    "TracingProvider",
19    "TracingConfig",
20    "Span",
21    "NullTracer",
22    "get_tracer",
23]
class BasicLogger:
32class BasicLogger:
33    """
34    Basic structured logger for platform AI applications.
35
36    Provides consistent logging across all packages.  Respects the
37    ``LOG_LEVEL`` environment variable (default ``INFO``).
38    """
39
40    def __init__(self, name: str):
41        """Initialize logger with a name."""
42        self.logger = structlog.get_logger(name)
43
44    def info(self, message: str, **kwargs: Any) -> None:
45        self.logger.info(message, **kwargs)
46
47    def error(self, message: str, **kwargs: Any) -> None:
48        self.logger.error(message, **kwargs)
49
50    def warning(self, message: str, **kwargs: Any) -> None:
51        self.logger.warning(message, **kwargs)
52
53    def debug(self, message: str, **kwargs: Any) -> None:
54        self.logger.debug(message, **kwargs)

Basic structured logger for platform AI applications.

Provides consistent logging across all packages. Respects the LOG_LEVEL environment variable (default INFO).

BasicLogger(name: str)
40    def __init__(self, name: str):
41        """Initialize logger with a name."""
42        self.logger = structlog.get_logger(name)

Initialize logger with a name.

logger
def info(self, message: str, **kwargs: Any) -> None:
44    def info(self, message: str, **kwargs: Any) -> None:
45        self.logger.info(message, **kwargs)
def error(self, message: str, **kwargs: Any) -> None:
47    def error(self, message: str, **kwargs: Any) -> None:
48        self.logger.error(message, **kwargs)
def warning(self, message: str, **kwargs: Any) -> None:
50    def warning(self, message: str, **kwargs: Any) -> None:
51        self.logger.warning(message, **kwargs)
def debug(self, message: str, **kwargs: Any) -> None:
53    def debug(self, message: str, **kwargs: Any) -> None:
54        self.logger.debug(message, **kwargs)
class BasicMetricsCollector:
 8class BasicMetricsCollector:
 9    """
10    Basic metrics collector for tracking application metrics.
11    
12    Tracks counters, gauges, and histograms.
13    """
14    
15    def __init__(self):
16        """Initialize metrics collector."""
17        self._counters: Dict[str, int] = defaultdict(int)
18        self._gauges: Dict[str, float] = {}
19        self._histograms: Dict[str, list] = defaultdict(list)
20    
21    def increment(self, metric_name: str, value: int = 1, **tags: Any) -> None:
22        """Increment a counter."""
23        key = self._make_key(metric_name, tags)
24        self._counters[key] += value
25    
26    def gauge(self, metric_name: str, value: float, **tags: Any) -> None:
27        """Set a gauge value."""
28        key = self._make_key(metric_name, tags)
29        self._gauges[key] = value
30    
31    def histogram(self, metric_name: str, value: float, **tags: Any) -> None:
32        """Record a histogram value."""
33        key = self._make_key(metric_name, tags)
34        self._histograms[key].append(value)
35    
36    def get_metrics(self) -> Dict[str, Any]:
37        """Get all metrics."""
38        return {
39            "counters": dict(self._counters),
40            "gauges": dict(self._gauges),
41            "histograms": dict(self._histograms),
42        }
43    
44    @staticmethod
45    def _make_key(metric_name: str, tags: Dict[str, Any]) -> str:
46        """Create a unique key from metric name and tags."""
47        if not tags:
48            return metric_name
49        tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items()))
50        return f"{metric_name}[{tag_str}]"

Basic metrics collector for tracking application metrics.

Tracks counters, gauges, and histograms.

BasicMetricsCollector()
15    def __init__(self):
16        """Initialize metrics collector."""
17        self._counters: Dict[str, int] = defaultdict(int)
18        self._gauges: Dict[str, float] = {}
19        self._histograms: Dict[str, list] = defaultdict(list)

Initialize metrics collector.

def increment(self, metric_name: str, value: int = 1, **tags: Any) -> None:
21    def increment(self, metric_name: str, value: int = 1, **tags: Any) -> None:
22        """Increment a counter."""
23        key = self._make_key(metric_name, tags)
24        self._counters[key] += value

Increment a counter.

def gauge(self, metric_name: str, value: float, **tags: Any) -> None:
26    def gauge(self, metric_name: str, value: float, **tags: Any) -> None:
27        """Set a gauge value."""
28        key = self._make_key(metric_name, tags)
29        self._gauges[key] = value

Set a gauge value.

def histogram(self, metric_name: str, value: float, **tags: Any) -> None:
31    def histogram(self, metric_name: str, value: float, **tags: Any) -> None:
32        """Record a histogram value."""
33        key = self._make_key(metric_name, tags)
34        self._histograms[key].append(value)

Record a histogram value.

def get_metrics(self) -> Dict[str, Any]:
36    def get_metrics(self) -> Dict[str, Any]:
37        """Get all metrics."""
38        return {
39            "counters": dict(self._counters),
40            "gauges": dict(self._gauges),
41            "histograms": dict(self._histograms),
42        }

Get all metrics.

class BasicPerformanceMonitor:
 33class BasicPerformanceMonitor:
 34    """
 35    Monitor LLM performance with basic metrics collection.
 36    
 37    Tracks:
 38    - Request latency (time to first token, total time)
 39    - Token usage (prompt, completion, total)
 40    - Throughput (tokens per second)
 41    - Success/failure rates
 42    - Provider-level aggregates
 43    
 44    Example:
 45        >>> monitor = BasicPerformanceMonitor()
 46        >>> 
 47        >>> # Start tracking a request
 48        >>> request_id = monitor.start_request("azure", "gpt-4")
 49        >>> 
 50        >>> # ... make LLM call ...
 51        >>> 
 52        >>> # Record completion
 53        >>> monitor.end_request(
 54        ...     request_id=request_id,
 55        ...     prompt_tokens=100,
 56        ...     completion_tokens=50,
 57        ...     success=True
 58        ... )
 59        >>> 
 60        >>> # Get statistics
 61        >>> stats = monitor.get_stats()
 62        >>> print(f"Average latency: {stats['avg_latency_ms']:.2f}ms")
 63    """
 64    
 65    def __init__(self):
 66        """Initialize the performance monitor."""
 67        self._active_requests: Dict[str, Dict[str, Any]] = {}
 68        self._completed_requests: List[PerformanceMetrics] = []
 69        self._provider_stats: Dict[str, Dict[str, Any]] = defaultdict(
 70            lambda: {
 71                "total_requests": 0,
 72                "successful_requests": 0,
 73                "failed_requests": 0,
 74                "total_tokens": 0,
 75                "total_latency_ms": 0.0,
 76            }
 77        )
 78    
 79    def start_request(
 80        self,
 81        provider: str,
 82        model: str,
 83        request_id: Optional[str] = None,
 84        **metadata: Any
 85    ) -> str:
 86        """
 87        Start tracking a new request.
 88        
 89        Args:
 90            provider: Provider name (e.g., "azure", "openai")
 91            model: Model name
 92            request_id: Optional request ID (generated if not provided)
 93            **metadata: Additional metadata to track
 94            
 95        Returns:
 96            Request ID for tracking
 97        """
 98        if request_id is None:
 99            request_id = f"{provider}_{model}_{int(time.time() * 1000000)}"
100        
101        self._active_requests[request_id] = {
102            "provider": provider,
103            "model": model,
104            "start_time": datetime.now(timezone.utc),
105            "metadata": metadata
106        }
107        
108        return request_id
109    
110    def end_request(
111        self,
112        request_id: str,
113        prompt_tokens: int,
114        completion_tokens: int,
115        success: bool = True,
116        error: Optional[str] = None
117    ) -> PerformanceMetrics:
118        """
119        End tracking for a request and record metrics.
120        
121        Args:
122            request_id: Request ID from start_request()
123            prompt_tokens: Number of tokens in prompt
124            completion_tokens: Number of tokens in completion
125            success: Whether request succeeded
126            error: Optional error message if failed
127            
128        Returns:
129            PerformanceMetrics object with calculated metrics
130            
131        Raises:
132            KeyError: If request_id not found
133        """
134        if request_id not in self._active_requests:
135            raise KeyError(f"Request ID '{request_id}' not found in active requests")
136        
137        request_data = self._active_requests.pop(request_id)
138        end_time = datetime.now(timezone.utc)
139        start_time = request_data["start_time"]
140        
141        # Calculate metrics
142        latency_ms = (end_time - start_time).total_seconds() * 1000
143        total_tokens = prompt_tokens + completion_tokens
144        tokens_per_second = total_tokens / (latency_ms / 1000) if latency_ms > 0 else 0
145        
146        # Create metrics object
147        metrics = PerformanceMetrics(
148            request_id=request_id,
149            provider=request_data["provider"],
150            model=request_data["model"],
151            start_time=start_time,
152            end_time=end_time,
153            latency_ms=latency_ms,
154            prompt_tokens=prompt_tokens,
155            completion_tokens=completion_tokens,
156            total_tokens=total_tokens,
157            tokens_per_second=tokens_per_second,
158            success=success,
159            error=error,
160            metadata=request_data["metadata"]
161        )
162        
163        # Store completed request
164        self._completed_requests.append(metrics)
165        
166        # Update provider stats
167        provider_stats = self._provider_stats[request_data["provider"]]
168        provider_stats["total_requests"] += 1
169        if success:
170            provider_stats["successful_requests"] += 1
171        else:
172            provider_stats["failed_requests"] += 1
173        provider_stats["total_tokens"] += total_tokens
174        provider_stats["total_latency_ms"] += latency_ms
175        
176        return metrics
177    
178    def get_stats(
179        self,
180        provider: Optional[str] = None,
181        last_n_requests: Optional[int] = None
182    ) -> Dict[str, Any]:
183        """
184        Get aggregated performance statistics.
185        
186        Args:
187            provider: Optional provider to filter by
188            last_n_requests: Optional limit to last N requests
189            
190        Returns:
191            Dictionary containing performance statistics
192        """
193        # Filter requests
194        requests = self._completed_requests
195        if provider:
196            requests = [r for r in requests if r.provider == provider]
197        if last_n_requests:
198            requests = requests[-last_n_requests:]
199        
200        if not requests:
201            return {
202                "total_requests": 0,
203                "successful_requests": 0,
204                "failed_requests": 0,
205                "success_rate": 0.0,
206                "avg_latency_ms": 0.0,
207                "min_latency_ms": 0.0,
208                "max_latency_ms": 0.0,
209                "avg_tokens_per_second": 0.0,
210                "total_tokens": 0,
211                "total_prompt_tokens": 0,
212                "total_completion_tokens": 0,
213                "avg_prompt_tokens": 0.0,
214                "avg_completion_tokens": 0.0
215            }
216        
217        # Calculate statistics
218        total_requests = len(requests)
219        successful_requests = sum(1 for r in requests if r.success)
220        failed_requests = total_requests - successful_requests
221        
222        latencies = [r.latency_ms for r in requests]
223        tokens_per_sec = [r.tokens_per_second for r in requests]
224        
225        return {
226            "total_requests": total_requests,
227            "successful_requests": successful_requests,
228            "failed_requests": failed_requests,
229            "success_rate": successful_requests / total_requests if total_requests > 0 else 0.0,
230            "avg_latency_ms": sum(latencies) / len(latencies),
231            "min_latency_ms": min(latencies),
232            "max_latency_ms": max(latencies),
233            "avg_tokens_per_second": sum(tokens_per_sec) / len(tokens_per_sec),
234            "total_tokens": sum(r.total_tokens for r in requests),
235            "total_prompt_tokens": sum(r.prompt_tokens for r in requests),
236            "total_completion_tokens": sum(r.completion_tokens for r in requests),
237            "avg_prompt_tokens": sum(r.prompt_tokens for r in requests) / total_requests,
238            "avg_completion_tokens": sum(r.completion_tokens for r in requests) / total_requests
239        }
240    
241    def get_provider_stats(self) -> Dict[str, Dict[str, Any]]:
242        """
243        Get statistics grouped by provider.
244        
245        Returns:
246            Dictionary mapping provider names to their statistics
247        """
248        result = {}
249        for provider, stats in self._provider_stats.items():
250            total = stats["total_requests"]
251            if total > 0:
252                avg_latency_ms = stats["total_latency_ms"] / total
253                avg_tokens_per_request = stats["total_tokens"] / total
254                # Calculate tokens per second (throughput)
255                avg_tokens_per_second = (stats["total_tokens"] / stats["total_latency_ms"] * 1000) if stats["total_latency_ms"] > 0 else 0
256                
257                result[provider] = {
258                    "total_requests": total,
259                    "successful_requests": stats["successful_requests"],
260                    "failed_requests": stats["failed_requests"],
261                    "success_rate": stats["successful_requests"] / total,
262                    "avg_latency_ms": avg_latency_ms,
263                    "total_tokens": stats["total_tokens"],
264                    "avg_tokens_per_request": avg_tokens_per_request,
265                    "avg_tokens_per_second": avg_tokens_per_second
266                }
267        return result
268    
269    def get_recent_requests(
270        self,
271        count: int = 10,
272        provider: Optional[str] = None
273    ) -> List[PerformanceMetrics]:
274        """
275        Get recent requests.
276        
277        Args:
278            count: Number of recent requests to return
279            provider: Optional provider to filter by
280            
281        Returns:
282            List of recent PerformanceMetrics
283        """
284        requests = self._completed_requests
285        if provider:
286            requests = [r for r in requests if r.provider == provider]
287        return requests[-count:]
288    
289    def clear_history(self) -> None:
290        """Clear all historical metrics (keep active requests)."""
291        self._completed_requests.clear()
292        self._provider_stats.clear()
293    
294    def __len__(self) -> int:
295        """Return number of completed requests."""
296        return len(self._completed_requests)

Monitor LLM performance with basic metrics collection.

Tracks:

  • Request latency (time to first token, total time)
  • Token usage (prompt, completion, total)
  • Throughput (tokens per second)
  • Success/failure rates
  • Provider-level aggregates

Example:

monitor = BasicPerformanceMonitor()

Start tracking a request

request_id = monitor.start_request("azure", "gpt-4")

... make LLM call ...

Record completion

monitor.end_request( ... request_id=request_id, ... prompt_tokens=100, ... completion_tokens=50, ... success=True ... )

Get statistics

stats = monitor.get_stats() print(f"Average latency: {stats['avg_latency_ms']:.2f}ms")

BasicPerformanceMonitor()
65    def __init__(self):
66        """Initialize the performance monitor."""
67        self._active_requests: Dict[str, Dict[str, Any]] = {}
68        self._completed_requests: List[PerformanceMetrics] = []
69        self._provider_stats: Dict[str, Dict[str, Any]] = defaultdict(
70            lambda: {
71                "total_requests": 0,
72                "successful_requests": 0,
73                "failed_requests": 0,
74                "total_tokens": 0,
75                "total_latency_ms": 0.0,
76            }
77        )

Initialize the performance monitor.

def start_request( self, provider: str, model: str, request_id: Optional[str] = None, **metadata: Any) -> str:
 79    def start_request(
 80        self,
 81        provider: str,
 82        model: str,
 83        request_id: Optional[str] = None,
 84        **metadata: Any
 85    ) -> str:
 86        """
 87        Start tracking a new request.
 88        
 89        Args:
 90            provider: Provider name (e.g., "azure", "openai")
 91            model: Model name
 92            request_id: Optional request ID (generated if not provided)
 93            **metadata: Additional metadata to track
 94            
 95        Returns:
 96            Request ID for tracking
 97        """
 98        if request_id is None:
 99            request_id = f"{provider}_{model}_{int(time.time() * 1000000)}"
100        
101        self._active_requests[request_id] = {
102            "provider": provider,
103            "model": model,
104            "start_time": datetime.now(timezone.utc),
105            "metadata": metadata
106        }
107        
108        return request_id

Start tracking a new request.

Args: provider: Provider name (e.g., "azure", "openai") model: Model name request_id: Optional request ID (generated if not provided) **metadata: Additional metadata to track

Returns: Request ID for tracking

def end_request( self, request_id: str, prompt_tokens: int, completion_tokens: int, success: bool = True, error: Optional[str] = None) -> gmf_forge_ai_shared_core.observability.performance_monitor.PerformanceMetrics:
110    def end_request(
111        self,
112        request_id: str,
113        prompt_tokens: int,
114        completion_tokens: int,
115        success: bool = True,
116        error: Optional[str] = None
117    ) -> PerformanceMetrics:
118        """
119        End tracking for a request and record metrics.
120        
121        Args:
122            request_id: Request ID from start_request()
123            prompt_tokens: Number of tokens in prompt
124            completion_tokens: Number of tokens in completion
125            success: Whether request succeeded
126            error: Optional error message if failed
127            
128        Returns:
129            PerformanceMetrics object with calculated metrics
130            
131        Raises:
132            KeyError: If request_id not found
133        """
134        if request_id not in self._active_requests:
135            raise KeyError(f"Request ID '{request_id}' not found in active requests")
136        
137        request_data = self._active_requests.pop(request_id)
138        end_time = datetime.now(timezone.utc)
139        start_time = request_data["start_time"]
140        
141        # Calculate metrics
142        latency_ms = (end_time - start_time).total_seconds() * 1000
143        total_tokens = prompt_tokens + completion_tokens
144        tokens_per_second = total_tokens / (latency_ms / 1000) if latency_ms > 0 else 0
145        
146        # Create metrics object
147        metrics = PerformanceMetrics(
148            request_id=request_id,
149            provider=request_data["provider"],
150            model=request_data["model"],
151            start_time=start_time,
152            end_time=end_time,
153            latency_ms=latency_ms,
154            prompt_tokens=prompt_tokens,
155            completion_tokens=completion_tokens,
156            total_tokens=total_tokens,
157            tokens_per_second=tokens_per_second,
158            success=success,
159            error=error,
160            metadata=request_data["metadata"]
161        )
162        
163        # Store completed request
164        self._completed_requests.append(metrics)
165        
166        # Update provider stats
167        provider_stats = self._provider_stats[request_data["provider"]]
168        provider_stats["total_requests"] += 1
169        if success:
170            provider_stats["successful_requests"] += 1
171        else:
172            provider_stats["failed_requests"] += 1
173        provider_stats["total_tokens"] += total_tokens
174        provider_stats["total_latency_ms"] += latency_ms
175        
176        return metrics

End tracking for a request and record metrics.

Args: request_id: Request ID from start_request() prompt_tokens: Number of tokens in prompt completion_tokens: Number of tokens in completion success: Whether request succeeded error: Optional error message if failed

Returns: PerformanceMetrics object with calculated metrics

Raises: KeyError: If request_id not found

def get_stats( self, provider: Optional[str] = None, last_n_requests: Optional[int] = None) -> Dict[str, Any]:
178    def get_stats(
179        self,
180        provider: Optional[str] = None,
181        last_n_requests: Optional[int] = None
182    ) -> Dict[str, Any]:
183        """
184        Get aggregated performance statistics.
185        
186        Args:
187            provider: Optional provider to filter by
188            last_n_requests: Optional limit to last N requests
189            
190        Returns:
191            Dictionary containing performance statistics
192        """
193        # Filter requests
194        requests = self._completed_requests
195        if provider:
196            requests = [r for r in requests if r.provider == provider]
197        if last_n_requests:
198            requests = requests[-last_n_requests:]
199        
200        if not requests:
201            return {
202                "total_requests": 0,
203                "successful_requests": 0,
204                "failed_requests": 0,
205                "success_rate": 0.0,
206                "avg_latency_ms": 0.0,
207                "min_latency_ms": 0.0,
208                "max_latency_ms": 0.0,
209                "avg_tokens_per_second": 0.0,
210                "total_tokens": 0,
211                "total_prompt_tokens": 0,
212                "total_completion_tokens": 0,
213                "avg_prompt_tokens": 0.0,
214                "avg_completion_tokens": 0.0
215            }
216        
217        # Calculate statistics
218        total_requests = len(requests)
219        successful_requests = sum(1 for r in requests if r.success)
220        failed_requests = total_requests - successful_requests
221        
222        latencies = [r.latency_ms for r in requests]
223        tokens_per_sec = [r.tokens_per_second for r in requests]
224        
225        return {
226            "total_requests": total_requests,
227            "successful_requests": successful_requests,
228            "failed_requests": failed_requests,
229            "success_rate": successful_requests / total_requests if total_requests > 0 else 0.0,
230            "avg_latency_ms": sum(latencies) / len(latencies),
231            "min_latency_ms": min(latencies),
232            "max_latency_ms": max(latencies),
233            "avg_tokens_per_second": sum(tokens_per_sec) / len(tokens_per_sec),
234            "total_tokens": sum(r.total_tokens for r in requests),
235            "total_prompt_tokens": sum(r.prompt_tokens for r in requests),
236            "total_completion_tokens": sum(r.completion_tokens for r in requests),
237            "avg_prompt_tokens": sum(r.prompt_tokens for r in requests) / total_requests,
238            "avg_completion_tokens": sum(r.completion_tokens for r in requests) / total_requests
239        }

Get aggregated performance statistics.

Args: provider: Optional provider to filter by last_n_requests: Optional limit to last N requests

Returns: Dictionary containing performance statistics

def get_provider_stats(self) -> Dict[str, Dict[str, Any]]:
241    def get_provider_stats(self) -> Dict[str, Dict[str, Any]]:
242        """
243        Get statistics grouped by provider.
244        
245        Returns:
246            Dictionary mapping provider names to their statistics
247        """
248        result = {}
249        for provider, stats in self._provider_stats.items():
250            total = stats["total_requests"]
251            if total > 0:
252                avg_latency_ms = stats["total_latency_ms"] / total
253                avg_tokens_per_request = stats["total_tokens"] / total
254                # Calculate tokens per second (throughput)
255                avg_tokens_per_second = (stats["total_tokens"] / stats["total_latency_ms"] * 1000) if stats["total_latency_ms"] > 0 else 0
256                
257                result[provider] = {
258                    "total_requests": total,
259                    "successful_requests": stats["successful_requests"],
260                    "failed_requests": stats["failed_requests"],
261                    "success_rate": stats["successful_requests"] / total,
262                    "avg_latency_ms": avg_latency_ms,
263                    "total_tokens": stats["total_tokens"],
264                    "avg_tokens_per_request": avg_tokens_per_request,
265                    "avg_tokens_per_second": avg_tokens_per_second
266                }
267        return result

Get statistics grouped by provider.

Returns: Dictionary mapping provider names to their statistics

def get_recent_requests( self, count: int = 10, provider: Optional[str] = None) -> List[gmf_forge_ai_shared_core.observability.performance_monitor.PerformanceMetrics]:
269    def get_recent_requests(
270        self,
271        count: int = 10,
272        provider: Optional[str] = None
273    ) -> List[PerformanceMetrics]:
274        """
275        Get recent requests.
276        
277        Args:
278            count: Number of recent requests to return
279            provider: Optional provider to filter by
280            
281        Returns:
282            List of recent PerformanceMetrics
283        """
284        requests = self._completed_requests
285        if provider:
286            requests = [r for r in requests if r.provider == provider]
287        return requests[-count:]

Get recent requests.

Args: count: Number of recent requests to return provider: Optional provider to filter by

Returns: List of recent PerformanceMetrics

def clear_history(self) -> None:
289    def clear_history(self) -> None:
290        """Clear all historical metrics (keep active requests)."""
291        self._completed_requests.clear()
292        self._provider_stats.clear()

Clear all historical metrics (keep active requests).

class TracingProvider(abc.ABC):
206class TracingProvider(ABC):
207    """
208    Abstract tracing provider.
209
210    Implement this class to add a new backend.  Applications only ever
211    import :class:`TracingProvider` and call :func:`get_tracer` — they
212    have no knowledge of the concrete backend in use.
213    """
214
215    # ── Core operations ───────────────────────────────────────────────────────
216
217    @abstractmethod
218    def trace(self, name: str, **kwargs: Any) -> Span:
219        """
220        Start a new root trace.
221
222        Args:
223            name:   Human-readable name for the operation (e.g. ``"rag_query"``).
224            **kwargs: Optional fields forwarded to the backend:
225                - ``input``   — the operation input (question, prompt, …)
226                - ``user_id`` — end-user identifier for session grouping
227                - ``session_id`` — conversation or session ID
228                - ``metadata`` — dict of arbitrary key/value pairs
229                - ``tags``    — list of string tags
230
231        Returns:
232            A :class:`Span` to use as a context manager.
233        """
234
235    @abstractmethod
236    def score(
237        self,
238        trace_id: str,
239        name: str,
240        value: float,
241        comment: str = "",
242    ) -> None:
243        """
244        Attach a quality score to a completed trace.
245
246        Args:
247            trace_id: ID of the trace to score (``span.trace_id``).
248            name:     Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``.
249            value:    Numeric score — typically 0.0–1.0 but provider-specific.
250            comment:  Optional free-text explanation.
251        """
252
253    @abstractmethod
254    def flush(self) -> None:
255        """Flush any buffered events to the backend. Call before process exit."""
256
257    @abstractmethod
258    def shutdown(self) -> None:
259        """Gracefully shut down the provider and release resources."""
260
261    # ── Convenience helpers (non-abstract, built on trace()) ──────────────────
262
263    def trace_rag_query(
264        self,
265        question: str,
266        user_id: str = "",
267        session_id: str = "",
268        **metadata: Any,
269    ) -> Span:
270        """Shortcut to open a root trace for a RAG question-answering call."""
271        return self.trace(
272            "rag_query",
273            input=question,
274            user_id=user_id or None,
275            session_id=session_id or None,
276            metadata=metadata or None,
277            tags=["rag"],
278        )
279
280    def trace_agent_run(
281        self,
282        agent_name: str,
283        input: Any,
284        user_id: str = "",
285        session_id: str = "",
286        **metadata: Any,
287    ) -> Span:
288        """Shortcut to open a root trace for an agent invocation."""
289        return self.trace(
290            f"agent:{agent_name}",
291            input=input,
292            user_id=user_id or None,
293            session_id=session_id or None,
294            metadata=metadata or None,
295            tags=["agent", agent_name],
296        )

Abstract tracing provider.

Implement this class to add a new backend. Applications only ever import TracingProvider and call get_tracer() — they have no knowledge of the concrete backend in use.

@abstractmethod
def trace( self, name: str, **kwargs: Any) -> Span:
217    @abstractmethod
218    def trace(self, name: str, **kwargs: Any) -> Span:
219        """
220        Start a new root trace.
221
222        Args:
223            name:   Human-readable name for the operation (e.g. ``"rag_query"``).
224            **kwargs: Optional fields forwarded to the backend:
225                - ``input``   — the operation input (question, prompt, …)
226                - ``user_id`` — end-user identifier for session grouping
227                - ``session_id`` — conversation or session ID
228                - ``metadata`` — dict of arbitrary key/value pairs
229                - ``tags``    — list of string tags
230
231        Returns:
232            A :class:`Span` to use as a context manager.
233        """

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

@abstractmethod
def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
235    @abstractmethod
236    def score(
237        self,
238        trace_id: str,
239        name: str,
240        value: float,
241        comment: str = "",
242    ) -> None:
243        """
244        Attach a quality score to a completed trace.
245
246        Args:
247            trace_id: ID of the trace to score (``span.trace_id``).
248            name:     Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``.
249            value:    Numeric score — typically 0.0–1.0 but provider-specific.
250            comment:  Optional free-text explanation.
251        """

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

@abstractmethod
def flush(self) -> None:
253    @abstractmethod
254    def flush(self) -> None:
255        """Flush any buffered events to the backend. Call before process exit."""

Flush any buffered events to the backend. Call before process exit.

@abstractmethod
def shutdown(self) -> None:
257    @abstractmethod
258    def shutdown(self) -> None:
259        """Gracefully shut down the provider and release resources."""

Gracefully shut down the provider and release resources.

def trace_rag_query( self, question: str, user_id: str = '', session_id: str = '', **metadata: Any) -> Span:
263    def trace_rag_query(
264        self,
265        question: str,
266        user_id: str = "",
267        session_id: str = "",
268        **metadata: Any,
269    ) -> Span:
270        """Shortcut to open a root trace for a RAG question-answering call."""
271        return self.trace(
272            "rag_query",
273            input=question,
274            user_id=user_id or None,
275            session_id=session_id or None,
276            metadata=metadata or None,
277            tags=["rag"],
278        )

Shortcut to open a root trace for a RAG question-answering call.

def trace_agent_run( self, agent_name: str, input: Any, user_id: str = '', session_id: str = '', **metadata: Any) -> Span:
280    def trace_agent_run(
281        self,
282        agent_name: str,
283        input: Any,
284        user_id: str = "",
285        session_id: str = "",
286        **metadata: Any,
287    ) -> Span:
288        """Shortcut to open a root trace for an agent invocation."""
289        return self.trace(
290            f"agent:{agent_name}",
291            input=input,
292            user_id=user_id or None,
293            session_id=session_id or None,
294            metadata=metadata or None,
295            tags=["agent", agent_name],
296        )

Shortcut to open a root trace for an agent invocation.

@dataclass
class TracingConfig:
303@dataclass
304class TracingConfig:
305    """
306    Configuration for a tracing provider, loaded from env vars by
307    :func:`get_tracer`.
308
309    All fields are optional — providers only read what they need.
310    """
311    provider: str = "null"
312
313    # Langfuse
314    langfuse_public_key: str = ""
315    langfuse_secret_key: str = ""
316    langfuse_host: str = "https://cloud.langfuse.com"
317
318    # LangSmith
319    langsmith_api_key: str = ""
320    langsmith_project: str = "default"
321    langsmith_endpoint: str = "https://api.smith.langchain.com"
322
323    # Azure AI Foundry
324    azure_ai_connection_string: str = ""   # from AI Foundry project settings
325    azure_ai_project_name: str = ""
326
327    # Splunk (via OpenTelemetry OTLP)
328    splunk_otlp_endpoint: str = ""
329    splunk_access_token: str = ""
330    splunk_service_name: str = "gmf-forge-ai"
331
332    # Common
333    environment: str = "development"
334    release: str = ""
335    extra: Dict[str, Any] = field(default_factory=dict)

Configuration for a tracing provider, loaded from env vars by get_tracer().

All fields are optional — providers only read what they need.

TracingConfig( provider: str = 'null', langfuse_public_key: str = '', langfuse_secret_key: str = '', langfuse_host: str = 'https://cloud.langfuse.com', langsmith_api_key: str = '', langsmith_project: str = 'default', langsmith_endpoint: str = 'https://api.smith.langchain.com', azure_ai_connection_string: str = '', azure_ai_project_name: str = '', splunk_otlp_endpoint: str = '', splunk_access_token: str = '', splunk_service_name: str = 'gmf-forge-ai', environment: str = 'development', release: str = '', extra: Dict[str, Any] = <factory>)
provider: str = 'null'
langfuse_public_key: str = ''
langfuse_secret_key: str = ''
langfuse_host: str = 'https://cloud.langfuse.com'
langsmith_api_key: str = ''
langsmith_project: str = 'default'
langsmith_endpoint: str = 'https://api.smith.langchain.com'
azure_ai_connection_string: str = ''
azure_ai_project_name: str = ''
splunk_otlp_endpoint: str = ''
splunk_access_token: str = ''
splunk_service_name: str = 'gmf-forge-ai'
environment: str = 'development'
release: str = ''
extra: Dict[str, Any]
class Span:
 84class Span:
 85    """
 86    A single unit of work inside a trace.
 87
 88    Use as a context manager::
 89
 90        with tracer.trace("rag_query", input=question) as span:
 91            span.set_output(answer)
 92
 93    Instances are returned by :meth:`TracingProvider.trace`,
 94    :meth:`Span.span`, and :meth:`Span.generation`.  Child spans created
 95    via :meth:`Span.span` or :meth:`Span.generation` are automatically
 96    linked to the parent.
 97
 98    Attributes:
 99        id:        Unique identifier for this span/generation.
100        trace_id:  Identifier of the root trace this span belongs to.
101        name:      Human-readable operation name.
102        is_root:   True when this span *is* the root trace.
103    """
104
105    def __init__(
106        self,
107        name: str,
108        trace_id: Optional[str] = None,
109        span_id: Optional[str] = None,
110        is_root: bool = False,
111        kind: str = "span",          # "trace" | "span" | "generation"
112    ):
113        self.name = name
114        self.id: str = span_id or str(uuid.uuid4())
115        self.trace_id: str = trace_id or self.id
116        self.is_root = is_root
117        self.kind = kind
118
119        self._input: Any = None
120        self._output: Any = None
121        self._metadata: Dict[str, Any] = {}
122        self._error: Optional[Exception] = None
123        self._token_usage: Dict[str, int] = {}
124        self._model: Optional[str] = None
125        self._active_token: Any = None  # contextvars token for cleanup
126
127    # ── Mutators ─────────────────────────────────────────────────────────────
128
129    def set_input(self, input: Any) -> "Span":
130        self._input = input
131        return self
132
133    def set_output(self, output: Any) -> "Span":
134        self._output = output
135        return self
136
137    def set_metadata(self, **kwargs: Any) -> "Span":
138        self._metadata.update(kwargs)
139        return self
140
141    def set_error(self, error: Exception) -> "Span":
142        self._error = error
143        return self
144
145    def set_token_usage(
146        self,
147        prompt_tokens: int = 0,
148        completion_tokens: int = 0,
149        total_tokens: Optional[int] = None,
150    ) -> "Span":
151        self._token_usage = {
152            "prompt_tokens": prompt_tokens,
153            "completion_tokens": completion_tokens,
154        }
155        if total_tokens is not None:
156            self._token_usage["total_tokens"] = total_tokens
157        return self
158
159    # ── Child span factories (overridden by backend implementations) ──────────
160
161    def span(self, name: str, **kwargs: Any) -> "Span":
162        """Create a child span under this span. Override in subclasses."""
163        return Span(name=name, trace_id=self.trace_id, kind="span")
164
165    def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span":
166        """Create a child generation span (LLM call). Override in subclasses."""
167        s = Span(name=name, trace_id=self.trace_id, kind="generation")
168        s._model = model
169        return s
170
171    # ── Context manager ───────────────────────────────────────────────────────
172
173    def __enter__(self) -> "Span":
174        # Track this span as active in the current context for decorator nesting
175        self._active_token = _set_active_span(self)
176        return self
177
178    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
179        if exc_val is not None:
180            self.set_error(exc_val)
181        self._end()
182        # Restore previous active span (only if token was set)
183        if self._active_token is not None:
184            try:
185                _active_span.reset(self._active_token)  # type: ignore[attr-defined]
186            except (AttributeError, LookupError):
187                pass
188        return False  # do not suppress exceptions
189
190    def _end(self) -> None:
191        """Called when the context manager exits. Override to flush to backend."""
192
193    # ── Helpers ───────────────────────────────────────────────────────────────
194
195    def __repr__(self) -> str:
196        return (
197            f"<Span name={self.name!r} id={self.id[:8]} "
198            f"trace_id={self.trace_id[:8]} kind={self.kind}>"
199        )

A single unit of work inside a trace.

Use as a context manager::

with tracer.trace("rag_query", input=question) as span:
    span.set_output(answer)

Instances are returned by TracingProvider.trace(), Span.span(), and Span.generation(). Child spans created via Span.span() or Span.generation() are automatically linked to the parent.

Attributes: id: Unique identifier for this span/generation. trace_id: Identifier of the root trace this span belongs to. name: Human-readable operation name. is_root: True when this span is the root trace.

Span( name: str, trace_id: Optional[str] = None, span_id: Optional[str] = None, is_root: bool = False, kind: str = 'span')
105    def __init__(
106        self,
107        name: str,
108        trace_id: Optional[str] = None,
109        span_id: Optional[str] = None,
110        is_root: bool = False,
111        kind: str = "span",          # "trace" | "span" | "generation"
112    ):
113        self.name = name
114        self.id: str = span_id or str(uuid.uuid4())
115        self.trace_id: str = trace_id or self.id
116        self.is_root = is_root
117        self.kind = kind
118
119        self._input: Any = None
120        self._output: Any = None
121        self._metadata: Dict[str, Any] = {}
122        self._error: Optional[Exception] = None
123        self._token_usage: Dict[str, int] = {}
124        self._model: Optional[str] = None
125        self._active_token: Any = None  # contextvars token for cleanup
name
id: str
trace_id: str
is_root
kind
def set_input( self, input: Any) -> Span:
129    def set_input(self, input: Any) -> "Span":
130        self._input = input
131        return self
def set_output( self, output: Any) -> Span:
133    def set_output(self, output: Any) -> "Span":
134        self._output = output
135        return self
def set_metadata( self, **kwargs: Any) -> Span:
137    def set_metadata(self, **kwargs: Any) -> "Span":
138        self._metadata.update(kwargs)
139        return self
def set_error( self, error: Exception) -> Span:
141    def set_error(self, error: Exception) -> "Span":
142        self._error = error
143        return self
def set_token_usage( self, prompt_tokens: int = 0, completion_tokens: int = 0, total_tokens: Optional[int] = None) -> Span:
145    def set_token_usage(
146        self,
147        prompt_tokens: int = 0,
148        completion_tokens: int = 0,
149        total_tokens: Optional[int] = None,
150    ) -> "Span":
151        self._token_usage = {
152            "prompt_tokens": prompt_tokens,
153            "completion_tokens": completion_tokens,
154        }
155        if total_tokens is not None:
156            self._token_usage["total_tokens"] = total_tokens
157        return self
def span( self, name: str, **kwargs: Any) -> Span:
161    def span(self, name: str, **kwargs: Any) -> "Span":
162        """Create a child span under this span. Override in subclasses."""
163        return Span(name=name, trace_id=self.trace_id, kind="span")

Create a child span under this span. Override in subclasses.

def generation( self, name: str, model: str = '', **kwargs: Any) -> Span:
165    def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span":
166        """Create a child generation span (LLM call). Override in subclasses."""
167        s = Span(name=name, trace_id=self.trace_id, kind="generation")
168        s._model = model
169        return s

Create a child generation span (LLM call). Override in subclasses.

 68class NullTracer(TracingProvider):
 69    """
 70    Default tracing provider — logs span lifecycle to the console.
 71
 72    Used when ``TRACING_PROVIDER`` is not set (or set to ``"null"`` /
 73    ``"none"``).  Each span emits a *started* and *finished* log line so
 74    the tracing structure is visible during local development and in
 75    example runs, without shipping data to any backend.
 76
 77    Switch to a real backend at any time by setting ``TRACING_PROVIDER``
 78    in ``.env`` — application code stays untouched.
 79
 80    Example::
 81
 82        tracer = NullTracer()
 83
 84        with tracer.trace("rag_query", input=question) as trace:
 85            with trace.span("retrieval", input=question) as span:
 86                docs = retriever.retrieve(question)
 87                span.set_output({"doc_count": len(docs)})
 88    """
 89
 90    def trace(self, name: str, **kwargs: Any) -> _NullSpan:
 91        span = _NullSpan(name=name, is_root=True, kind="trace")
 92        if "input" in kwargs:
 93            span.set_input(kwargs["input"])
 94        return span
 95
 96    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
 97        log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value}
 98        if comment:
 99            log_kw["comment"] = comment
100        _logger.info("[trace] score", **log_kw)
101
102    def flush(self) -> None:
103        pass
104
105    def shutdown(self) -> None:
106        pass

Default tracing provider — logs span lifecycle to the console.

Used when TRACING_PROVIDER is not set (or set to "null" / "none"). Each span emits a started and finished log line so the tracing structure is visible during local development and in example runs, without shipping data to any backend.

Switch to a real backend at any time by setting TRACING_PROVIDER in .env — application code stays untouched.

Example::

tracer = NullTracer()

with tracer.trace("rag_query", input=question) as trace:
    with trace.span("retrieval", input=question) as span:
        docs = retriever.retrieve(question)
        span.set_output({"doc_count": len(docs)})
def trace( self, name: str, **kwargs: Any) -> gmf_forge_ai_shared_core.observability.tracing.null_tracer._NullSpan:
90    def trace(self, name: str, **kwargs: Any) -> _NullSpan:
91        span = _NullSpan(name=name, is_root=True, kind="trace")
92        if "input" in kwargs:
93            span.set_input(kwargs["input"])
94        return span

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
 96    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
 97        log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value}
 98        if comment:
 99            log_kw["comment"] = comment
100        _logger.info("[trace] score", **log_kw)

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

def flush(self) -> None:
102    def flush(self) -> None:
103        pass

Flush any buffered events to the backend. Call before process exit.

def shutdown(self) -> None:
105    def shutdown(self) -> None:
106        pass

Gracefully shut down the provider and release resources.

def get_tracer( reset: bool = False) -> TracingProvider:
112def get_tracer(reset: bool = False) -> TracingProvider:
113    """
114    Return the process-wide tracing provider, initialising it on first call.
115
116    Reads configuration from environment variables.  Pass ``reset=True``
117    to force re-initialisation (useful in tests).
118
119    Environment variables
120    ~~~~~~~~~~~~~~~~~~~~~
121    TRACING_PROVIDER
122        Which backend to use.  One of: ``null``, ``splunk``, ``langfuse``.
123        Defaults to ``null`` when not set.
124
125    (Splunk)
126    SPLUNK_OTLP_ENDPOINT    OTLP/HTTP export endpoint
127    SPLUNK_ACCESS_TOKEN     Splunk Observability Cloud access token
128    SPLUNK_SERVICE_NAME     Service name shown in APM (default: gmf-forge-ai)
129
130    (Langfuse)
131    LANGFUSE_PUBLIC_KEY     Langfuse public key
132    LANGFUSE_SECRET_KEY     Langfuse secret key
133    LANGFUSE_HOST           Langfuse host (default: https://cloud.langfuse.com)
134
135    (Common)
136    ENVIRONMENT             Deployment environment tag (default: development)
137    RELEASE                 App release/version tag (optional)
138
139    Returns:
140        A :class:`TracingProvider` instance ready to use.
141    """
142    global _tracer_instance
143    if _tracer_instance is not None and not reset:
144        return _tracer_instance
145
146    config = _config_from_env()
147    provider_key = config.provider.lower().strip()
148    provider_cls = _PROVIDERS.get(provider_key, NullTracer)
149
150    if provider_key not in _PROVIDERS:
151        _logger.warning(
152            "Unknown TRACING_PROVIDER — falling back to NullTracer",
153            provider=provider_key,
154            supported=list(_PROVIDERS),
155        )
156
157    _tracer_instance = provider_cls(config) if provider_cls is not NullTracer else NullTracer()
158    return _tracer_instance

Return the process-wide tracing provider, initialising it on first call.

Reads configuration from environment variables. Pass reset=True to force re-initialisation (useful in tests).

Environment variables ~~~~~ TRACING_PROVIDER Which backend to use. One of: null, splunk, langfuse. Defaults to null when not set.

(Splunk) SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai)

(Langfuse) LANGFUSE_PUBLIC_KEY Langfuse public key LANGFUSE_SECRET_KEY Langfuse secret key LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com)

(Common) ENVIRONMENT Deployment environment tag (default: development) RELEASE App release/version tag (optional)

Returns: A TracingProvider instance ready to use.