gmf_forge_ai_shared_core.observability.tracing
Tracing sub-package — provider-agnostic observability for RAG and agent apps.
Quick start
Set one env var to pick a backend; application code never changes::
# .env
TRACING_PROVIDER=splunk # or "null" (default)
SPLUNK_OTLP_ENDPOINT=https://ingest.us0.signalfx.com/v2/trace/otlp
SPLUNK_ACCESS_TOKEN=your-token
Then in your app — explicit style (most control)::
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
tracer = get_tracer() # reads env; returns NullTracer if nothing configured
with tracer.trace("rag_query", input=question, user_id="u1") as trace:
with trace.span("retrieval", input=question) as span:
docs = retriever.retrieve(question)
span.set_output({"doc_count": len(docs)})
with trace.generation("llm", model="gpt-4o-mini", input=prompt) as gen:
reply = llm.complete(prompt)
gen.set_output(reply.text)
gen.set_token_usage(prompt_tokens=120, completion_tokens=80)
trace.set_output(reply.text)
tracer.flush()
Or decorator style (less boilerplate)::
from gmf_forge_ai_shared_core.observability.tracing import traced, trace_span, trace_generation, get_tracer
@traced(name="rag_query")
async def answer(question: str):
docs = await retrieve(question)
reply = await generate(question, docs)
return reply
@trace_span(name="retrieval")
async def retrieve(question: str):
return ["doc1", "doc2"]
@trace_generation(name="llm_call", model="gpt-4o-mini")
async def generate(question: str, docs: list[str]):
return "answer"
tracer = get_tracer()
tracer.flush() # at shutdown
Supported TRACING_PROVIDER values
null — no-op, zero overhead (default when env var is absent) splunk — Splunk Observability Cloud via OpenTelemetry OTLP/HTTP langfuse — Langfuse tracing backend
Adding a new backend
- Create
my_tracer.pyalongside this file implementing~gmf_forge_ai_shared_core.observability.tracing.base.TracingProvider. - Add an entry to
_PROVIDERSin this module. - Set
TRACING_PROVIDER=my_backendin .env — done.
1""" 2Tracing sub-package — provider-agnostic observability for RAG and agent apps. 3 4Quick start 5----------- 6Set one env var to pick a backend; application code never changes:: 7 8 # .env 9 TRACING_PROVIDER=splunk # or "null" (default) 10 SPLUNK_OTLP_ENDPOINT=https://ingest.us0.signalfx.com/v2/trace/otlp 11 SPLUNK_ACCESS_TOKEN=your-token 12 13Then in your app — explicit style (most control):: 14 15 from gmf_forge_ai_shared_core.observability.tracing import get_tracer 16 17 tracer = get_tracer() # reads env; returns NullTracer if nothing configured 18 19 with tracer.trace("rag_query", input=question, user_id="u1") as trace: 20 21 with trace.span("retrieval", input=question) as span: 22 docs = retriever.retrieve(question) 23 span.set_output({"doc_count": len(docs)}) 24 25 with trace.generation("llm", model="gpt-4o-mini", input=prompt) as gen: 26 reply = llm.complete(prompt) 27 gen.set_output(reply.text) 28 gen.set_token_usage(prompt_tokens=120, completion_tokens=80) 29 30 trace.set_output(reply.text) 31 32 tracer.flush() 33 34Or decorator style (less boilerplate):: 35 36 from gmf_forge_ai_shared_core.observability.tracing import traced, trace_span, trace_generation, get_tracer 37 38 @traced(name="rag_query") 39 async def answer(question: str): 40 docs = await retrieve(question) 41 reply = await generate(question, docs) 42 return reply 43 44 @trace_span(name="retrieval") 45 async def retrieve(question: str): 46 return ["doc1", "doc2"] 47 48 @trace_generation(name="llm_call", model="gpt-4o-mini") 49 async def generate(question: str, docs: list[str]): 50 return "answer" 51 52 tracer = get_tracer() 53 tracer.flush() # at shutdown 54 55Supported TRACING_PROVIDER values 56---------------------------------- 57null — no-op, zero overhead (default when env var is absent) 58splunk — Splunk Observability Cloud via OpenTelemetry OTLP/HTTP 59langfuse — Langfuse tracing backend 60 61Adding a new backend 62-------------------- 631. Create ``my_tracer.py`` alongside this file implementing 64 :class:`~gmf_forge_ai_shared_core.observability.tracing.base.TracingProvider`. 652. Add an entry to ``_PROVIDERS`` in this module. 663. Set ``TRACING_PROVIDER=my_backend`` in .env — done. 67""" 68 69from __future__ import annotations 70 71import os 72from typing import Optional 73 74from .base import Span, TracingConfig, TracingProvider 75from .decorators import traced, trace_generation, trace_span 76from .ai_foundry_tracer import AiFoundryTracer 77from .langfuse_tracer import LangfuseTracer 78from .null_tracer import NullTracer 79from .splunk_tracer import SplunkTracer 80from gmf_forge_ai_shared_core.observability.logger import BasicLogger 81 82_logger = BasicLogger(__name__) 83 84__all__ = [ 85 "TracingProvider", 86 "TracingConfig", 87 "Span", 88 "NullTracer", 89 "SplunkTracer", 90 "LangfuseTracer", 91 "AiFoundryTracer", 92 "get_tracer", 93 "traced", 94 "trace_span", 95 "trace_generation", 96] 97 98# Registry: TRACING_PROVIDER value → provider class 99_PROVIDERS: dict[str, type[TracingProvider]] = { 100 "null": NullTracer, 101 "none": NullTracer, 102 "splunk": SplunkTracer, 103 "langfuse": LangfuseTracer, 104 "ai_foundry": AiFoundryTracer, 105} 106 107# Module-level singleton — one tracer per process 108_tracer_instance: Optional[TracingProvider] = None 109 110 111def get_tracer(reset: bool = False) -> TracingProvider: 112 """ 113 Return the process-wide tracing provider, initialising it on first call. 114 115 Reads configuration from environment variables. Pass ``reset=True`` 116 to force re-initialisation (useful in tests). 117 118 Environment variables 119 ~~~~~~~~~~~~~~~~~~~~~ 120 TRACING_PROVIDER 121 Which backend to use. One of: ``null``, ``splunk``, ``langfuse``. 122 Defaults to ``null`` when not set. 123 124 (Splunk) 125 SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint 126 SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token 127 SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai) 128 129 (Langfuse) 130 LANGFUSE_PUBLIC_KEY Langfuse public key 131 LANGFUSE_SECRET_KEY Langfuse secret key 132 LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com) 133 134 (Common) 135 ENVIRONMENT Deployment environment tag (default: development) 136 RELEASE App release/version tag (optional) 137 138 Returns: 139 A :class:`TracingProvider` instance ready to use. 140 """ 141 global _tracer_instance 142 if _tracer_instance is not None and not reset: 143 return _tracer_instance 144 145 config = _config_from_env() 146 provider_key = config.provider.lower().strip() 147 provider_cls = _PROVIDERS.get(provider_key, NullTracer) 148 149 if provider_key not in _PROVIDERS: 150 _logger.warning( 151 "Unknown TRACING_PROVIDER — falling back to NullTracer", 152 provider=provider_key, 153 supported=list(_PROVIDERS), 154 ) 155 156 _tracer_instance = provider_cls(config) if provider_cls is not NullTracer else NullTracer() 157 return _tracer_instance 158 159 160def _config_from_env() -> TracingConfig: 161 """Build a TracingConfig by reading environment variables.""" 162 return TracingConfig( 163 provider=os.getenv("TRACING_PROVIDER", "null"), 164 165 # Langfuse 166 langfuse_public_key=os.getenv("LANGFUSE_PUBLIC_KEY", ""), 167 langfuse_secret_key=os.getenv("LANGFUSE_SECRET_KEY", ""), 168 langfuse_host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), 169 170 # Azure AI Foundry 171 azure_ai_connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING", ""), 172 173 # Splunk 174 splunk_otlp_endpoint=os.getenv("SPLUNK_OTLP_ENDPOINT", ""), 175 splunk_access_token=os.getenv("SPLUNK_ACCESS_TOKEN", ""), 176 splunk_service_name=os.getenv("SPLUNK_SERVICE_NAME", "gmf-forge-ai"), 177 178 # Common 179 environment=os.getenv("ENVIRONMENT", "development"), 180 release=os.getenv("RELEASE", ""), 181 )
206class TracingProvider(ABC): 207 """ 208 Abstract tracing provider. 209 210 Implement this class to add a new backend. Applications only ever 211 import :class:`TracingProvider` and call :func:`get_tracer` — they 212 have no knowledge of the concrete backend in use. 213 """ 214 215 # ── Core operations ─────────────────────────────────────────────────────── 216 217 @abstractmethod 218 def trace(self, name: str, **kwargs: Any) -> Span: 219 """ 220 Start a new root trace. 221 222 Args: 223 name: Human-readable name for the operation (e.g. ``"rag_query"``). 224 **kwargs: Optional fields forwarded to the backend: 225 - ``input`` — the operation input (question, prompt, …) 226 - ``user_id`` — end-user identifier for session grouping 227 - ``session_id`` — conversation or session ID 228 - ``metadata`` — dict of arbitrary key/value pairs 229 - ``tags`` — list of string tags 230 231 Returns: 232 A :class:`Span` to use as a context manager. 233 """ 234 235 @abstractmethod 236 def score( 237 self, 238 trace_id: str, 239 name: str, 240 value: float, 241 comment: str = "", 242 ) -> None: 243 """ 244 Attach a quality score to a completed trace. 245 246 Args: 247 trace_id: ID of the trace to score (``span.trace_id``). 248 name: Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``. 249 value: Numeric score — typically 0.0–1.0 but provider-specific. 250 comment: Optional free-text explanation. 251 """ 252 253 @abstractmethod 254 def flush(self) -> None: 255 """Flush any buffered events to the backend. Call before process exit.""" 256 257 @abstractmethod 258 def shutdown(self) -> None: 259 """Gracefully shut down the provider and release resources.""" 260 261 # ── Convenience helpers (non-abstract, built on trace()) ────────────────── 262 263 def trace_rag_query( 264 self, 265 question: str, 266 user_id: str = "", 267 session_id: str = "", 268 **metadata: Any, 269 ) -> Span: 270 """Shortcut to open a root trace for a RAG question-answering call.""" 271 return self.trace( 272 "rag_query", 273 input=question, 274 user_id=user_id or None, 275 session_id=session_id or None, 276 metadata=metadata or None, 277 tags=["rag"], 278 ) 279 280 def trace_agent_run( 281 self, 282 agent_name: str, 283 input: Any, 284 user_id: str = "", 285 session_id: str = "", 286 **metadata: Any, 287 ) -> Span: 288 """Shortcut to open a root trace for an agent invocation.""" 289 return self.trace( 290 f"agent:{agent_name}", 291 input=input, 292 user_id=user_id or None, 293 session_id=session_id or None, 294 metadata=metadata or None, 295 tags=["agent", agent_name], 296 )
Abstract tracing provider.
Implement this class to add a new backend. Applications only ever
import TracingProvider and call get_tracer() — they
have no knowledge of the concrete backend in use.
217 @abstractmethod 218 def trace(self, name: str, **kwargs: Any) -> Span: 219 """ 220 Start a new root trace. 221 222 Args: 223 name: Human-readable name for the operation (e.g. ``"rag_query"``). 224 **kwargs: Optional fields forwarded to the backend: 225 - ``input`` — the operation input (question, prompt, …) 226 - ``user_id`` — end-user identifier for session grouping 227 - ``session_id`` — conversation or session ID 228 - ``metadata`` — dict of arbitrary key/value pairs 229 - ``tags`` — list of string tags 230 231 Returns: 232 A :class:`Span` to use as a context manager. 233 """
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
235 @abstractmethod 236 def score( 237 self, 238 trace_id: str, 239 name: str, 240 value: float, 241 comment: str = "", 242 ) -> None: 243 """ 244 Attach a quality score to a completed trace. 245 246 Args: 247 trace_id: ID of the trace to score (``span.trace_id``). 248 name: Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``. 249 value: Numeric score — typically 0.0–1.0 but provider-specific. 250 comment: Optional free-text explanation. 251 """
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
253 @abstractmethod 254 def flush(self) -> None: 255 """Flush any buffered events to the backend. Call before process exit."""
Flush any buffered events to the backend. Call before process exit.
257 @abstractmethod 258 def shutdown(self) -> None: 259 """Gracefully shut down the provider and release resources."""
Gracefully shut down the provider and release resources.
263 def trace_rag_query( 264 self, 265 question: str, 266 user_id: str = "", 267 session_id: str = "", 268 **metadata: Any, 269 ) -> Span: 270 """Shortcut to open a root trace for a RAG question-answering call.""" 271 return self.trace( 272 "rag_query", 273 input=question, 274 user_id=user_id or None, 275 session_id=session_id or None, 276 metadata=metadata or None, 277 tags=["rag"], 278 )
Shortcut to open a root trace for a RAG question-answering call.
280 def trace_agent_run( 281 self, 282 agent_name: str, 283 input: Any, 284 user_id: str = "", 285 session_id: str = "", 286 **metadata: Any, 287 ) -> Span: 288 """Shortcut to open a root trace for an agent invocation.""" 289 return self.trace( 290 f"agent:{agent_name}", 291 input=input, 292 user_id=user_id or None, 293 session_id=session_id or None, 294 metadata=metadata or None, 295 tags=["agent", agent_name], 296 )
Shortcut to open a root trace for an agent invocation.
303@dataclass 304class TracingConfig: 305 """ 306 Configuration for a tracing provider, loaded from env vars by 307 :func:`get_tracer`. 308 309 All fields are optional — providers only read what they need. 310 """ 311 provider: str = "null" 312 313 # Langfuse 314 langfuse_public_key: str = "" 315 langfuse_secret_key: str = "" 316 langfuse_host: str = "https://cloud.langfuse.com" 317 318 # LangSmith 319 langsmith_api_key: str = "" 320 langsmith_project: str = "default" 321 langsmith_endpoint: str = "https://api.smith.langchain.com" 322 323 # Azure AI Foundry 324 azure_ai_connection_string: str = "" # from AI Foundry project settings 325 azure_ai_project_name: str = "" 326 327 # Splunk (via OpenTelemetry OTLP) 328 splunk_otlp_endpoint: str = "" 329 splunk_access_token: str = "" 330 splunk_service_name: str = "gmf-forge-ai" 331 332 # Common 333 environment: str = "development" 334 release: str = "" 335 extra: Dict[str, Any] = field(default_factory=dict)
Configuration for a tracing provider, loaded from env vars by
get_tracer().
All fields are optional — providers only read what they need.
84class Span: 85 """ 86 A single unit of work inside a trace. 87 88 Use as a context manager:: 89 90 with tracer.trace("rag_query", input=question) as span: 91 span.set_output(answer) 92 93 Instances are returned by :meth:`TracingProvider.trace`, 94 :meth:`Span.span`, and :meth:`Span.generation`. Child spans created 95 via :meth:`Span.span` or :meth:`Span.generation` are automatically 96 linked to the parent. 97 98 Attributes: 99 id: Unique identifier for this span/generation. 100 trace_id: Identifier of the root trace this span belongs to. 101 name: Human-readable operation name. 102 is_root: True when this span *is* the root trace. 103 """ 104 105 def __init__( 106 self, 107 name: str, 108 trace_id: Optional[str] = None, 109 span_id: Optional[str] = None, 110 is_root: bool = False, 111 kind: str = "span", # "trace" | "span" | "generation" 112 ): 113 self.name = name 114 self.id: str = span_id or str(uuid.uuid4()) 115 self.trace_id: str = trace_id or self.id 116 self.is_root = is_root 117 self.kind = kind 118 119 self._input: Any = None 120 self._output: Any = None 121 self._metadata: Dict[str, Any] = {} 122 self._error: Optional[Exception] = None 123 self._token_usage: Dict[str, int] = {} 124 self._model: Optional[str] = None 125 self._active_token: Any = None # contextvars token for cleanup 126 127 # ── Mutators ───────────────────────────────────────────────────────────── 128 129 def set_input(self, input: Any) -> "Span": 130 self._input = input 131 return self 132 133 def set_output(self, output: Any) -> "Span": 134 self._output = output 135 return self 136 137 def set_metadata(self, **kwargs: Any) -> "Span": 138 self._metadata.update(kwargs) 139 return self 140 141 def set_error(self, error: Exception) -> "Span": 142 self._error = error 143 return self 144 145 def set_token_usage( 146 self, 147 prompt_tokens: int = 0, 148 completion_tokens: int = 0, 149 total_tokens: Optional[int] = None, 150 ) -> "Span": 151 self._token_usage = { 152 "prompt_tokens": prompt_tokens, 153 "completion_tokens": completion_tokens, 154 } 155 if total_tokens is not None: 156 self._token_usage["total_tokens"] = total_tokens 157 return self 158 159 # ── Child span factories (overridden by backend implementations) ────────── 160 161 def span(self, name: str, **kwargs: Any) -> "Span": 162 """Create a child span under this span. Override in subclasses.""" 163 return Span(name=name, trace_id=self.trace_id, kind="span") 164 165 def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span": 166 """Create a child generation span (LLM call). Override in subclasses.""" 167 s = Span(name=name, trace_id=self.trace_id, kind="generation") 168 s._model = model 169 return s 170 171 # ── Context manager ─────────────────────────────────────────────────────── 172 173 def __enter__(self) -> "Span": 174 # Track this span as active in the current context for decorator nesting 175 self._active_token = _set_active_span(self) 176 return self 177 178 def __exit__(self, exc_type, exc_val, exc_tb) -> bool: 179 if exc_val is not None: 180 self.set_error(exc_val) 181 self._end() 182 # Restore previous active span (only if token was set) 183 if self._active_token is not None: 184 try: 185 _active_span.reset(self._active_token) # type: ignore[attr-defined] 186 except (AttributeError, LookupError): 187 pass 188 return False # do not suppress exceptions 189 190 def _end(self) -> None: 191 """Called when the context manager exits. Override to flush to backend.""" 192 193 # ── Helpers ─────────────────────────────────────────────────────────────── 194 195 def __repr__(self) -> str: 196 return ( 197 f"<Span name={self.name!r} id={self.id[:8]} " 198 f"trace_id={self.trace_id[:8]} kind={self.kind}>" 199 )
A single unit of work inside a trace.
Use as a context manager::
with tracer.trace("rag_query", input=question) as span:
span.set_output(answer)
Instances are returned by TracingProvider.trace(),
Span.span(), and Span.generation(). Child spans created
via Span.span() or Span.generation() are automatically
linked to the parent.
Attributes: id: Unique identifier for this span/generation. trace_id: Identifier of the root trace this span belongs to. name: Human-readable operation name. is_root: True when this span is the root trace.
105 def __init__( 106 self, 107 name: str, 108 trace_id: Optional[str] = None, 109 span_id: Optional[str] = None, 110 is_root: bool = False, 111 kind: str = "span", # "trace" | "span" | "generation" 112 ): 113 self.name = name 114 self.id: str = span_id or str(uuid.uuid4()) 115 self.trace_id: str = trace_id or self.id 116 self.is_root = is_root 117 self.kind = kind 118 119 self._input: Any = None 120 self._output: Any = None 121 self._metadata: Dict[str, Any] = {} 122 self._error: Optional[Exception] = None 123 self._token_usage: Dict[str, int] = {} 124 self._model: Optional[str] = None 125 self._active_token: Any = None # contextvars token for cleanup
145 def set_token_usage( 146 self, 147 prompt_tokens: int = 0, 148 completion_tokens: int = 0, 149 total_tokens: Optional[int] = None, 150 ) -> "Span": 151 self._token_usage = { 152 "prompt_tokens": prompt_tokens, 153 "completion_tokens": completion_tokens, 154 } 155 if total_tokens is not None: 156 self._token_usage["total_tokens"] = total_tokens 157 return self
161 def span(self, name: str, **kwargs: Any) -> "Span": 162 """Create a child span under this span. Override in subclasses.""" 163 return Span(name=name, trace_id=self.trace_id, kind="span")
Create a child span under this span. Override in subclasses.
165 def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span": 166 """Create a child generation span (LLM call). Override in subclasses.""" 167 s = Span(name=name, trace_id=self.trace_id, kind="generation") 168 s._model = model 169 return s
Create a child generation span (LLM call). Override in subclasses.
68class NullTracer(TracingProvider): 69 """ 70 Default tracing provider — logs span lifecycle to the console. 71 72 Used when ``TRACING_PROVIDER`` is not set (or set to ``"null"`` / 73 ``"none"``). Each span emits a *started* and *finished* log line so 74 the tracing structure is visible during local development and in 75 example runs, without shipping data to any backend. 76 77 Switch to a real backend at any time by setting ``TRACING_PROVIDER`` 78 in ``.env`` — application code stays untouched. 79 80 Example:: 81 82 tracer = NullTracer() 83 84 with tracer.trace("rag_query", input=question) as trace: 85 with trace.span("retrieval", input=question) as span: 86 docs = retriever.retrieve(question) 87 span.set_output({"doc_count": len(docs)}) 88 """ 89 90 def trace(self, name: str, **kwargs: Any) -> _NullSpan: 91 span = _NullSpan(name=name, is_root=True, kind="trace") 92 if "input" in kwargs: 93 span.set_input(kwargs["input"]) 94 return span 95 96 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 97 log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value} 98 if comment: 99 log_kw["comment"] = comment 100 _logger.info("[trace] score", **log_kw) 101 102 def flush(self) -> None: 103 pass 104 105 def shutdown(self) -> None: 106 pass
Default tracing provider — logs span lifecycle to the console.
Used when TRACING_PROVIDER is not set (or set to "null" /
"none"). Each span emits a started and finished log line so
the tracing structure is visible during local development and in
example runs, without shipping data to any backend.
Switch to a real backend at any time by setting TRACING_PROVIDER
in .env — application code stays untouched.
Example::
tracer = NullTracer()
with tracer.trace("rag_query", input=question) as trace:
with trace.span("retrieval", input=question) as span:
docs = retriever.retrieve(question)
span.set_output({"doc_count": len(docs)})
90 def trace(self, name: str, **kwargs: Any) -> _NullSpan: 91 span = _NullSpan(name=name, is_root=True, kind="trace") 92 if "input" in kwargs: 93 span.set_input(kwargs["input"]) 94 return span
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
96 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 97 log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value} 98 if comment: 99 log_kw["comment"] = comment 100 _logger.info("[trace] score", **log_kw)
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
138class SplunkTracer(TracingProvider): 139 """ 140 Tracing provider that exports to Splunk Observability Cloud via 141 OpenTelemetry OTLP/HTTP. 142 143 Traces appear in **Splunk APM** under the configured service name. 144 Token usage, model info, and scores are stored as span attributes 145 and can be surfaced via Splunk's MetricSets or custom dashboards. 146 147 Example:: 148 149 import os 150 os.environ["TRACING_PROVIDER"] = "splunk" 151 os.environ["SPLUNK_OTLP_ENDPOINT"] = "https://ingest.us0.signalfx.com/v2/trace/otlp" 152 os.environ["SPLUNK_ACCESS_TOKEN"] = "your-token" 153 os.environ["SPLUNK_SERVICE_NAME"] = "policy-rag" 154 155 from gmf_forge_ai_shared_core.observability.tracing import get_tracer 156 157 tracer = get_tracer() 158 159 with tracer.trace("rag_query", input=question) as trace: 160 with trace.span("retrieval", input=question) as span: 161 docs = retriever.retrieve(question) 162 span.set_output({"doc_count": len(docs)}) 163 with trace.generation("llm", model="gpt-4o", input=prompt) as gen: 164 reply = llm.complete(prompt) 165 gen.set_output(reply.text) 166 gen.set_token_usage(prompt_tokens=120, completion_tokens=80) 167 trace.set_output(reply.text) 168 169 tracer.flush() 170 """ 171 172 def __init__(self, config: TracingConfig): 173 self._config = config 174 self._otel_tracer: Optional[Any] = None 175 self._init_client() 176 177 def _init_client(self) -> None: 178 try: 179 from opentelemetry import trace as otel_trace # type: ignore 180 from opentelemetry.sdk.trace import TracerProvider # type: ignore 181 from opentelemetry.sdk.trace.export import BatchSpanProcessor # type: ignore 182 from opentelemetry.sdk.resources import Resource # type: ignore 183 from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( # type: ignore 184 OTLPSpanExporter, 185 ) 186 except ImportError: 187 _logger.warning( 188 "opentelemetry-sdk or opentelemetry-exporter-otlp-proto-http not found — falling back to no-op tracing", 189 install_cmd="pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http", 190 ) 191 return 192 193 endpoint = self._config.splunk_otlp_endpoint 194 token = self._config.splunk_access_token 195 service = self._config.splunk_service_name or "gmf-forge-ai" 196 environment = self._config.environment or "development" 197 198 if not endpoint or not token: 199 _logger.warning( 200 "SPLUNK_OTLP_ENDPOINT / SPLUNK_ACCESS_TOKEN not set — falling back to no-op tracing", 201 missing=[v for v, val in [("SPLUNK_OTLP_ENDPOINT", endpoint), ("SPLUNK_ACCESS_TOKEN", token)] if not val], 202 ) 203 return 204 205 try: 206 resource = Resource.create({ 207 "service.name": service, 208 "deployment.environment": environment, 209 }) 210 exporter = OTLPSpanExporter( 211 endpoint=endpoint, 212 headers={"X-SF-Token": token}, 213 ) 214 provider = TracerProvider(resource=resource) 215 provider.add_span_processor(BatchSpanProcessor(exporter)) 216 otel_trace.set_tracer_provider(provider) 217 self._otel_tracer = provider.get_tracer(service) 218 except Exception as exc: 219 _logger.error("Splunk tracer init failed", error=str(exc)) 220 221 # ── TracingProvider interface ───────────────────────────────────────────── 222 223 def trace(self, name: str, **kwargs: Any) -> Span: 224 from .null_tracer import _NullSpan 225 226 if self._otel_tracer is None: 227 return _NullSpan(name=name, is_root=True, kind="trace") 228 229 input_data = kwargs.get("input") 230 user_id = kwargs.get("user_id") 231 session_id = kwargs.get("session_id") 232 tags = kwargs.get("tags") or [] 233 234 try: 235 otel_span = self._otel_tracer.start_span(name) 236 if input_data is not None: 237 otel_span.set_attribute("input", str(input_data)) 238 if user_id: 239 otel_span.set_attribute("user_id", str(user_id)) 240 if session_id: 241 otel_span.set_attribute("session_id", str(session_id)) 242 if tags: 243 otel_span.set_attribute("tags", ",".join(tags)) 244 245 trace_id = format(otel_span.get_span_context().trace_id, "032x") 246 span = _SplunkSpan( 247 otel_span=otel_span, 248 otel_tracer=self._otel_tracer, 249 trace_id=trace_id, 250 name=name, 251 kind="trace", 252 ) 253 if input_data is not None: 254 span._input = input_data 255 return span 256 except Exception as exc: 257 _logger.error("Splunk trace() failed", error=str(exc)) 258 return _NullSpan(name=name, is_root=True, kind="trace") 259 260 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 261 # Splunk APM doesn't have a native scoring API; scores can be 262 # emitted as custom metrics via the Splunk metrics ingest endpoint. 263 # For now we record it as a log-level attribute on the active span. 264 pass 265 266 def flush(self) -> None: 267 try: 268 from opentelemetry import trace as otel_trace # type: ignore 269 provider = otel_trace.get_tracer_provider() 270 if hasattr(provider, "force_flush"): 271 provider.force_flush() 272 except Exception: 273 pass 274 275 def shutdown(self) -> None: 276 try: 277 from opentelemetry import trace as otel_trace # type: ignore 278 provider = otel_trace.get_tracer_provider() 279 if hasattr(provider, "shutdown"): 280 provider.shutdown() 281 except Exception: 282 pass
Tracing provider that exports to Splunk Observability Cloud via OpenTelemetry OTLP/HTTP.
Traces appear in Splunk APM under the configured service name. Token usage, model info, and scores are stored as span attributes and can be surfaced via Splunk's MetricSets or custom dashboards.
Example::
import os
os.environ["TRACING_PROVIDER"] = "splunk"
os.environ["SPLUNK_OTLP_ENDPOINT"] = "https://ingest.us0.signalfx.com/v2/trace/otlp"
os.environ["SPLUNK_ACCESS_TOKEN"] = "your-token"
os.environ["SPLUNK_SERVICE_NAME"] = "policy-rag"
from gmf_forge_ai_shared_core.observability.tracing import get_tracer
tracer = get_tracer()
with tracer.trace("rag_query", input=question) as trace:
with trace.span("retrieval", input=question) as span:
docs = retriever.retrieve(question)
span.set_output({"doc_count": len(docs)})
with trace.generation("llm", model="gpt-4o", input=prompt) as gen:
reply = llm.complete(prompt)
gen.set_output(reply.text)
gen.set_token_usage(prompt_tokens=120, completion_tokens=80)
trace.set_output(reply.text)
tracer.flush()
223 def trace(self, name: str, **kwargs: Any) -> Span: 224 from .null_tracer import _NullSpan 225 226 if self._otel_tracer is None: 227 return _NullSpan(name=name, is_root=True, kind="trace") 228 229 input_data = kwargs.get("input") 230 user_id = kwargs.get("user_id") 231 session_id = kwargs.get("session_id") 232 tags = kwargs.get("tags") or [] 233 234 try: 235 otel_span = self._otel_tracer.start_span(name) 236 if input_data is not None: 237 otel_span.set_attribute("input", str(input_data)) 238 if user_id: 239 otel_span.set_attribute("user_id", str(user_id)) 240 if session_id: 241 otel_span.set_attribute("session_id", str(session_id)) 242 if tags: 243 otel_span.set_attribute("tags", ",".join(tags)) 244 245 trace_id = format(otel_span.get_span_context().trace_id, "032x") 246 span = _SplunkSpan( 247 otel_span=otel_span, 248 otel_tracer=self._otel_tracer, 249 trace_id=trace_id, 250 name=name, 251 kind="trace", 252 ) 253 if input_data is not None: 254 span._input = input_data 255 return span 256 except Exception as exc: 257 _logger.error("Splunk trace() failed", error=str(exc)) 258 return _NullSpan(name=name, is_root=True, kind="trace")
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
260 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 261 # Splunk APM doesn't have a native scoring API; scores can be 262 # emitted as custom metrics via the Splunk metrics ingest endpoint. 263 # For now we record it as a log-level attribute on the active span. 264 pass
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
266 def flush(self) -> None: 267 try: 268 from opentelemetry import trace as otel_trace # type: ignore 269 provider = otel_trace.get_tracer_provider() 270 if hasattr(provider, "force_flush"): 271 provider.force_flush() 272 except Exception: 273 pass
Flush any buffered events to the backend. Call before process exit.
275 def shutdown(self) -> None: 276 try: 277 from opentelemetry import trace as otel_trace # type: ignore 278 provider = otel_trace.get_tracer_provider() 279 if hasattr(provider, "shutdown"): 280 provider.shutdown() 281 except Exception: 282 pass
Gracefully shut down the provider and release resources.
169class LangfuseTracer(TracingProvider): 170 """Tracing provider that exports traces to Langfuse (SDK v3+).""" 171 172 def __init__(self, config: TracingConfig): 173 self._config = config 174 self._client: Optional[Any] = None 175 self._init_client() 176 177 def _init_client(self) -> None: 178 try: 179 from langfuse import Langfuse # type: ignore 180 except ImportError: 181 _logger.warning( 182 "langfuse package not found — falling back to no-op tracing", 183 install_cmd="pip install langfuse", 184 ) 185 return 186 187 public_key = self._config.langfuse_public_key 188 secret_key = self._config.langfuse_secret_key 189 host = self._config.langfuse_host or "https://cloud.langfuse.com" 190 191 missing = [ 192 name 193 for name, value in [ 194 ("LANGFUSE_PUBLIC_KEY", public_key), 195 ("LANGFUSE_SECRET_KEY", secret_key), 196 ] 197 if not value 198 ] 199 if missing: 200 _logger.warning( 201 "LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY not set — falling back to no-op tracing", 202 missing=missing, 203 ) 204 return 205 206 try: 207 self._client = Langfuse( 208 public_key=public_key, 209 secret_key=secret_key, 210 host=host, 211 ) 212 except Exception as exc: 213 _logger.error("Langfuse tracer init failed", error=str(exc)) 214 215 def trace(self, name: str, **kwargs: Any) -> Span: 216 from .null_tracer import _NullSpan 217 218 if self._client is None: 219 return _NullSpan(name=name, is_root=True, kind="trace") 220 221 input_data = kwargs.get("input") 222 user_id = kwargs.get("user_id") 223 session_id = kwargs.get("session_id") 224 tags = kwargs.get("tags") 225 226 # Merge caller metadata with trace-level fields not supported as direct 227 # parameters on start_as_current_observation in SDK v3+. 228 metadata: dict[str, Any] = dict(kwargs.get("metadata") or {}) 229 if user_id: 230 metadata["user_id"] = user_id 231 if session_id: 232 metadata["session_id"] = session_id 233 if tags: 234 metadata["tags"] = tags 235 if self._config.environment: 236 metadata["environment"] = self._config.environment 237 238 try: 239 from langfuse.types import TraceContext # type: ignore 240 241 # Pre-allocate a deterministic trace ID so callers can reference it 242 # before the observation is flushed. 243 trace_id = self._client.create_trace_id() 244 245 lf_ctx = self._client.start_as_current_observation( 246 trace_context=TraceContext(trace_id=trace_id), 247 name=name, 248 as_type="span", 249 input=input_data, 250 metadata=metadata or None, 251 version=self._config.release or None, 252 ) 253 span = _LangfuseSpan( 254 lf_client=self._client, 255 lf_ctx=lf_ctx, 256 trace_id=trace_id, 257 name=name, 258 kind="trace", 259 ) 260 span.id = trace_id 261 span.is_root = True 262 if input_data is not None: 263 span.set_input(input_data) 264 return span 265 except Exception as exc: 266 _logger.error("Langfuse trace() failed", error=str(exc)) 267 return _NullSpan(name=name, is_root=True, kind="trace") 268 269 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 270 if self._client is None: 271 return 272 try: 273 self._client.create_score( 274 trace_id=trace_id, 275 name=name, 276 value=value, 277 comment=comment or None, 278 ) 279 except Exception: 280 pass 281 282 def flush(self) -> None: 283 if self._client is None: 284 return 285 try: 286 if hasattr(self._client, "flush"): 287 self._client.flush() 288 except Exception: 289 pass 290 291 def shutdown(self) -> None: 292 if self._client is None: 293 return 294 try: 295 if hasattr(self._client, "shutdown"): 296 self._client.shutdown() 297 elif hasattr(self._client, "flush"): 298 self._client.flush() 299 except Exception: 300 pass
Tracing provider that exports traces to Langfuse (SDK v3+).
215 def trace(self, name: str, **kwargs: Any) -> Span: 216 from .null_tracer import _NullSpan 217 218 if self._client is None: 219 return _NullSpan(name=name, is_root=True, kind="trace") 220 221 input_data = kwargs.get("input") 222 user_id = kwargs.get("user_id") 223 session_id = kwargs.get("session_id") 224 tags = kwargs.get("tags") 225 226 # Merge caller metadata with trace-level fields not supported as direct 227 # parameters on start_as_current_observation in SDK v3+. 228 metadata: dict[str, Any] = dict(kwargs.get("metadata") or {}) 229 if user_id: 230 metadata["user_id"] = user_id 231 if session_id: 232 metadata["session_id"] = session_id 233 if tags: 234 metadata["tags"] = tags 235 if self._config.environment: 236 metadata["environment"] = self._config.environment 237 238 try: 239 from langfuse.types import TraceContext # type: ignore 240 241 # Pre-allocate a deterministic trace ID so callers can reference it 242 # before the observation is flushed. 243 trace_id = self._client.create_trace_id() 244 245 lf_ctx = self._client.start_as_current_observation( 246 trace_context=TraceContext(trace_id=trace_id), 247 name=name, 248 as_type="span", 249 input=input_data, 250 metadata=metadata or None, 251 version=self._config.release or None, 252 ) 253 span = _LangfuseSpan( 254 lf_client=self._client, 255 lf_ctx=lf_ctx, 256 trace_id=trace_id, 257 name=name, 258 kind="trace", 259 ) 260 span.id = trace_id 261 span.is_root = True 262 if input_data is not None: 263 span.set_input(input_data) 264 return span 265 except Exception as exc: 266 _logger.error("Langfuse trace() failed", error=str(exc)) 267 return _NullSpan(name=name, is_root=True, kind="trace")
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
269 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 270 if self._client is None: 271 return 272 try: 273 self._client.create_score( 274 trace_id=trace_id, 275 name=name, 276 value=value, 277 comment=comment or None, 278 ) 279 except Exception: 280 pass
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
282 def flush(self) -> None: 283 if self._client is None: 284 return 285 try: 286 if hasattr(self._client, "flush"): 287 self._client.flush() 288 except Exception: 289 pass
Flush any buffered events to the backend. Call before process exit.
291 def shutdown(self) -> None: 292 if self._client is None: 293 return 294 try: 295 if hasattr(self._client, "shutdown"): 296 self._client.shutdown() 297 elif hasattr(self._client, "flush"): 298 self._client.flush() 299 except Exception: 300 pass
Gracefully shut down the provider and release resources.
178class AiFoundryTracer(TracingProvider): 179 """ 180 Tracing provider that exports spans to Azure Monitor / Application Insights 181 via the Azure Monitor OpenTelemetry distro. 182 183 Mirrors the ``AIFoundryObserver`` class from ``observability_decorator.py`` 184 but implements the gmf-forge-ai ``TracingProvider`` interface so it can be 185 used interchangeably with ``LangfuseTracer`` and ``SplunkTracer``. 186 """ 187 188 def __init__(self, config: TracingConfig): 189 self._config = config 190 self._otel_tracer: Optional[Any] = None 191 self._init_tracer() 192 193 def _init_tracer(self) -> None: 194 try: 195 from azure.monitor.opentelemetry import configure_azure_monitor # type: ignore 196 from opentelemetry import trace as otel_trace # type: ignore 197 except ImportError: 198 _logger.warning( 199 "azure-monitor-opentelemetry / opentelemetry-sdk not installed " 200 "— falling back to no-op tracing", 201 install_cmd="pip install azure-monitor-opentelemetry", 202 ) 203 return 204 205 connection_string = self._config.azure_ai_connection_string 206 if not connection_string: 207 _logger.warning( 208 "APPLICATIONINSIGHTS_CONNECTION_STRING not set " 209 "— falling back to no-op tracing", 210 ) 211 return 212 213 service_name = ( 214 self._config.splunk_service_name # reuse existing config field 215 or "gmf-forge-ai" 216 ) 217 218 try: 219 configure_azure_monitor( 220 connection_string=connection_string, 221 service_name=service_name, 222 disable_offline_storage=False, 223 ) 224 self._otel_tracer = otel_trace.get_tracer( 225 __name__, 226 schema_url="https://opentelemetry.io/schemas/1.11.0", 227 ) 228 _logger.info( 229 "AI Foundry tracer (Azure Monitor) initialized", 230 service=service_name, 231 ) 232 except Exception as exc: 233 _logger.error("AiFoundryTracer init failed", error=str(exc)) 234 235 # ── TracingProvider interface ───────────────────────────────────────────── 236 237 def trace(self, name: str, **kwargs: Any) -> Span: 238 from .null_tracer import _NullSpan 239 240 if self._otel_tracer is None: 241 return _NullSpan(name=name, is_root=True, kind="trace") 242 243 input_data = kwargs.get("input") 244 user_id = kwargs.get("user_id") 245 session_id = kwargs.get("session_id") 246 metadata: dict[str, Any] = dict(kwargs.get("metadata") or {}) 247 tags = kwargs.get("tags") 248 249 try: 250 from opentelemetry import trace as otel_trace # type: ignore 251 252 otel_span = self._otel_tracer.start_span(name) 253 254 # Attach standard attributes matching AIFoundryObserver pattern 255 otel_span.set_attribute("function.name", name) 256 otel_span.set_attribute("span.kind", "trace") 257 if self._config.environment: 258 otel_span.set_attribute("deployment.environment", self._config.environment) 259 if self._config.release: 260 otel_span.set_attribute("service.version", self._config.release) 261 if user_id: 262 otel_span.set_attribute("enduser.id", user_id) 263 if session_id: 264 otel_span.set_attribute("session.id", session_id) 265 if tags: 266 otel_span.set_attribute("tags", ", ".join(tags) if isinstance(tags, list) else str(tags)) 267 for key, value in metadata.items(): 268 if isinstance(value, (str, int, float, bool)): 269 otel_span.set_attribute(f"metadata.{key}", value) 270 if input_data is not None: 271 otel_span.set_attribute("input", str(input_data)) 272 273 # Derive trace_id from OTel span context 274 otel_ctx = otel_span.get_span_context() 275 trace_id = format(otel_ctx.trace_id, "032x") if otel_ctx else "" 276 277 span = _AIFoundrySpan( 278 otel_span=otel_span, 279 otel_tracer=self._otel_tracer, 280 trace_id=trace_id, 281 name=name, 282 kind="trace", 283 ) 284 span.is_root = True 285 span.id = trace_id 286 if input_data is not None: 287 span.set_input(input_data) 288 return span 289 290 except Exception as exc: 291 _logger.error("AiFoundryTracer.trace() failed", error=str(exc)) 292 return _NullSpan(name=name, is_root=True, kind="trace") 293 294 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 295 # Application Insights has no built-in quality-score concept; 296 # emit it as a custom event attribute on the active span instead. 297 if self._otel_tracer is None: 298 return 299 try: 300 from opentelemetry import trace as otel_trace # type: ignore 301 302 current = otel_trace.get_current_span() 303 if current: 304 current.set_attribute(f"score.{name}", value) 305 if comment: 306 current.set_attribute(f"score.{name}.comment", comment) 307 except Exception: 308 pass 309 310 def flush(self) -> None: 311 # The Azure Monitor exporter flushes automatically; nothing extra needed. 312 pass 313 314 def shutdown(self) -> None: 315 try: 316 from opentelemetry import trace as otel_trace # type: ignore 317 318 provider = otel_trace.get_tracer_provider() 319 if hasattr(provider, "shutdown"): 320 provider.shutdown() 321 except Exception: 322 pass
Tracing provider that exports spans to Azure Monitor / Application Insights via the Azure Monitor OpenTelemetry distro.
Mirrors the AIFoundryObserver class from observability_decorator.py
but implements the gmf-forge-ai TracingProvider interface so it can be
used interchangeably with LangfuseTracer and SplunkTracer.
237 def trace(self, name: str, **kwargs: Any) -> Span: 238 from .null_tracer import _NullSpan 239 240 if self._otel_tracer is None: 241 return _NullSpan(name=name, is_root=True, kind="trace") 242 243 input_data = kwargs.get("input") 244 user_id = kwargs.get("user_id") 245 session_id = kwargs.get("session_id") 246 metadata: dict[str, Any] = dict(kwargs.get("metadata") or {}) 247 tags = kwargs.get("tags") 248 249 try: 250 from opentelemetry import trace as otel_trace # type: ignore 251 252 otel_span = self._otel_tracer.start_span(name) 253 254 # Attach standard attributes matching AIFoundryObserver pattern 255 otel_span.set_attribute("function.name", name) 256 otel_span.set_attribute("span.kind", "trace") 257 if self._config.environment: 258 otel_span.set_attribute("deployment.environment", self._config.environment) 259 if self._config.release: 260 otel_span.set_attribute("service.version", self._config.release) 261 if user_id: 262 otel_span.set_attribute("enduser.id", user_id) 263 if session_id: 264 otel_span.set_attribute("session.id", session_id) 265 if tags: 266 otel_span.set_attribute("tags", ", ".join(tags) if isinstance(tags, list) else str(tags)) 267 for key, value in metadata.items(): 268 if isinstance(value, (str, int, float, bool)): 269 otel_span.set_attribute(f"metadata.{key}", value) 270 if input_data is not None: 271 otel_span.set_attribute("input", str(input_data)) 272 273 # Derive trace_id from OTel span context 274 otel_ctx = otel_span.get_span_context() 275 trace_id = format(otel_ctx.trace_id, "032x") if otel_ctx else "" 276 277 span = _AIFoundrySpan( 278 otel_span=otel_span, 279 otel_tracer=self._otel_tracer, 280 trace_id=trace_id, 281 name=name, 282 kind="trace", 283 ) 284 span.is_root = True 285 span.id = trace_id 286 if input_data is not None: 287 span.set_input(input_data) 288 return span 289 290 except Exception as exc: 291 _logger.error("AiFoundryTracer.trace() failed", error=str(exc)) 292 return _NullSpan(name=name, is_root=True, kind="trace")
Start a new root trace.
Args:
name: Human-readable name for the operation (e.g. "rag_query").
**kwargs: Optional fields forwarded to the backend:
- input — the operation input (question, prompt, …)
- user_id — end-user identifier for session grouping
- session_id — conversation or session ID
- metadata — dict of arbitrary key/value pairs
- tags — list of string tags
Returns:
A Span to use as a context manager.
294 def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None: 295 # Application Insights has no built-in quality-score concept; 296 # emit it as a custom event attribute on the active span instead. 297 if self._otel_tracer is None: 298 return 299 try: 300 from opentelemetry import trace as otel_trace # type: ignore 301 302 current = otel_trace.get_current_span() 303 if current: 304 current.set_attribute(f"score.{name}", value) 305 if comment: 306 current.set_attribute(f"score.{name}.comment", comment) 307 except Exception: 308 pass
Attach a quality score to a completed trace.
Args:
trace_id: ID of the trace to score (span.trace_id).
name: Score dimension name, e.g. "faithfulness", "relevance".
value: Numeric score — typically 0.0–1.0 but provider-specific.
comment: Optional free-text explanation.
310 def flush(self) -> None: 311 # The Azure Monitor exporter flushes automatically; nothing extra needed. 312 pass
Flush any buffered events to the backend. Call before process exit.
314 def shutdown(self) -> None: 315 try: 316 from opentelemetry import trace as otel_trace # type: ignore 317 318 provider = otel_trace.get_tracer_provider() 319 if hasattr(provider, "shutdown"): 320 provider.shutdown() 321 except Exception: 322 pass
Gracefully shut down the provider and release resources.
112def get_tracer(reset: bool = False) -> TracingProvider: 113 """ 114 Return the process-wide tracing provider, initialising it on first call. 115 116 Reads configuration from environment variables. Pass ``reset=True`` 117 to force re-initialisation (useful in tests). 118 119 Environment variables 120 ~~~~~~~~~~~~~~~~~~~~~ 121 TRACING_PROVIDER 122 Which backend to use. One of: ``null``, ``splunk``, ``langfuse``. 123 Defaults to ``null`` when not set. 124 125 (Splunk) 126 SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint 127 SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token 128 SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai) 129 130 (Langfuse) 131 LANGFUSE_PUBLIC_KEY Langfuse public key 132 LANGFUSE_SECRET_KEY Langfuse secret key 133 LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com) 134 135 (Common) 136 ENVIRONMENT Deployment environment tag (default: development) 137 RELEASE App release/version tag (optional) 138 139 Returns: 140 A :class:`TracingProvider` instance ready to use. 141 """ 142 global _tracer_instance 143 if _tracer_instance is not None and not reset: 144 return _tracer_instance 145 146 config = _config_from_env() 147 provider_key = config.provider.lower().strip() 148 provider_cls = _PROVIDERS.get(provider_key, NullTracer) 149 150 if provider_key not in _PROVIDERS: 151 _logger.warning( 152 "Unknown TRACING_PROVIDER — falling back to NullTracer", 153 provider=provider_key, 154 supported=list(_PROVIDERS), 155 ) 156 157 _tracer_instance = provider_cls(config) if provider_cls is not NullTracer else NullTracer() 158 return _tracer_instance
Return the process-wide tracing provider, initialising it on first call.
Reads configuration from environment variables. Pass reset=True
to force re-initialisation (useful in tests).
Environment variables
~~~~~
TRACING_PROVIDER
Which backend to use. One of: null, splunk, langfuse.
Defaults to null when not set.
(Splunk) SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai)
(Langfuse) LANGFUSE_PUBLIC_KEY Langfuse public key LANGFUSE_SECRET_KEY Langfuse secret key LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com)
(Common) ENVIRONMENT Deployment environment tag (default: development) RELEASE App release/version tag (optional)
Returns:
A TracingProvider instance ready to use.
67def traced( 68 func: Optional[F] = None, 69 *, 70 name: Optional[str] = None, 71 capture_input: bool = True, 72 capture_output: bool = True, 73 metadata_fn: Optional[Callable[[Any, dict], None]] = None, 74 tags: Optional[list[str]] = None, 75) -> Union[F, Callable[[F], F]]: 76 """ 77 Decorator to open a root trace around a function. 78 79 If a parent span is already active (e.g., called from within @trace_span), 80 this still opens a new root trace. 81 82 Args: 83 func: The function to decorate (when used without parens). 84 name: Override trace name; defaults to function name. 85 capture_input: Include function args in trace input (default: True). 86 capture_output: Include return value in trace output (default: True). 87 metadata_fn: Callback(result, metadata_dict) to add custom metadata. 88 tags: List of string tags to attach to the trace. 89 90 Example:: 91 92 @traced(name="rag_pipeline", tags=["rag"]) 93 async def answer(question: str) -> str: 94 ... 95 """ 96 97 def decorator(fn: F) -> F: 98 trace_name = name or fn.__name__ 99 100 if asyncio.iscoroutinefunction(fn): 101 102 @functools.wraps(fn) 103 async def async_wrapper(*args: Any, **kwargs: Any) -> Any: 104 from . import get_tracer 105 tracer = get_tracer() 106 input_data = (args, kwargs) if capture_input else None 107 with tracer.trace( 108 trace_name, 109 input=input_data, 110 tags=tags, 111 ) as trace: 112 try: 113 result = await fn(*args, **kwargs) 114 except Exception as exc: 115 trace.set_error(exc) 116 raise 117 if capture_output: 118 trace.set_output(result) 119 if metadata_fn is not None: 120 meta: dict[str, Any] = {} 121 metadata_fn(result, meta) 122 trace.set_metadata(**meta) 123 return result 124 125 return async_wrapper # type: ignore[return-value] 126 else: 127 128 @functools.wraps(fn) 129 def sync_wrapper(*args: Any, **kwargs: Any) -> Any: 130 from . import get_tracer 131 tracer = get_tracer() 132 input_data = (args, kwargs) if capture_input else None 133 with tracer.trace( 134 trace_name, 135 input=input_data, 136 tags=tags, 137 ) as trace: 138 try: 139 result = fn(*args, **kwargs) 140 except Exception as exc: 141 trace.set_error(exc) 142 raise 143 if capture_output: 144 trace.set_output(result) 145 if metadata_fn is not None: 146 meta: dict[str, Any] = {} 147 metadata_fn(result, meta) 148 trace.set_metadata(**meta) 149 return result 150 151 return sync_wrapper # type: ignore[return-value] 152 153 if func is not None: 154 # Called as @traced without parens 155 return decorator(func) 156 # Called as @traced(...) with parens 157 return decorator
Decorator to open a root trace around a function.
If a parent span is already active (e.g., called from within @trace_span), this still opens a new root trace.
Args: func: The function to decorate (when used without parens). name: Override trace name; defaults to function name. capture_input: Include function args in trace input (default: True). capture_output: Include return value in trace output (default: True). metadata_fn: Callback(result, metadata_dict) to add custom metadata. tags: List of string tags to attach to the trace.
Example::
@traced(name="rag_pipeline", tags=["rag"])
async def answer(question: str) -> str:
...
183def trace_span( 184 func: Optional[F] = None, 185 *, 186 name: Optional[str] = None, 187 capture_input: bool = True, 188 capture_output: bool = True, 189 metadata_fn: Optional[Callable[[Any, dict], None]] = None, 190) -> Union[F, Callable[[F], F]]: 191 """ 192 Decorator to open a child span around a function. 193 194 If no parent span is active, creates a root trace (failsafe). 195 If a parent span is active, creates a child span under it. 196 197 Args: 198 func: The function to decorate (when used without parens). 199 name: Override span name; defaults to function name. 200 capture_input: Include function args in span input (default: True). 201 capture_output: Include return value in span output (default: True). 202 metadata_fn: Callback(result, metadata_dict) to add custom metadata. 203 204 Example:: 205 206 @trace_span(name="retrieval") 207 async def retrieve(question: str) -> list[str]: 208 ... 209 """ 210 211 def decorator(fn: F) -> F: 212 span_name = name or fn.__name__ 213 214 if asyncio.iscoroutinefunction(fn): 215 216 @functools.wraps(fn) 217 async def async_wrapper(*args: Any, **kwargs: Any) -> Any: 218 active = _get_active_span() 219 input_data = (args, kwargs) if capture_input else None 220 221 if active is not None: 222 # Parent span exists — create child 223 with active.span(span_name, input=input_data) as span: 224 try: 225 result = await fn(*args, **kwargs) 226 except Exception as exc: 227 span.set_error(exc) 228 raise 229 if capture_output: 230 span.set_output(result) 231 if metadata_fn is not None: 232 meta: dict[str, Any] = {} 233 metadata_fn(result, meta) 234 span.set_metadata(**meta) 235 return result 236 else: 237 # No parent — create root trace as fallback 238 from . import get_tracer 239 tracer = get_tracer() 240 with tracer.trace(span_name, input=input_data) as trace: 241 try: 242 result = await fn(*args, **kwargs) 243 except Exception as exc: 244 trace.set_error(exc) 245 raise 246 if capture_output: 247 trace.set_output(result) 248 if metadata_fn is not None: 249 meta: dict[str, Any] = {} 250 metadata_fn(result, meta) 251 trace.set_metadata(**meta) 252 return result 253 254 return async_wrapper # type: ignore[return-value] 255 else: 256 257 @functools.wraps(fn) 258 def sync_wrapper(*args: Any, **kwargs: Any) -> Any: 259 active = _get_active_span() 260 input_data = (args, kwargs) if capture_input else None 261 262 if active is not None: 263 # Parent span exists — create child 264 with active.span(span_name, input=input_data) as span: 265 try: 266 result = fn(*args, **kwargs) 267 except Exception as exc: 268 span.set_error(exc) 269 raise 270 if capture_output: 271 span.set_output(result) 272 if metadata_fn is not None: 273 meta: dict[str, Any] = {} 274 metadata_fn(result, meta) 275 span.set_metadata(**meta) 276 return result 277 else: 278 # No parent — create root trace as fallback 279 from . import get_tracer 280 tracer = get_tracer() 281 with tracer.trace(span_name, input=input_data) as trace: 282 try: 283 result = fn(*args, **kwargs) 284 except Exception as exc: 285 trace.set_error(exc) 286 raise 287 if capture_output: 288 trace.set_output(result) 289 if metadata_fn is not None: 290 meta: dict[str, Any] = {} 291 metadata_fn(result, meta) 292 trace.set_metadata(**meta) 293 return result 294 295 return sync_wrapper # type: ignore[return-value] 296 297 if func is not None: 298 return decorator(func) 299 return decorator
Decorator to open a child span around a function.
If no parent span is active, creates a root trace (failsafe). If a parent span is active, creates a child span under it.
Args: func: The function to decorate (when used without parens). name: Override span name; defaults to function name. capture_input: Include function args in span input (default: True). capture_output: Include return value in span output (default: True). metadata_fn: Callback(result, metadata_dict) to add custom metadata.
Example::
@trace_span(name="retrieval")
async def retrieve(question: str) -> list[str]:
...
327def trace_generation( 328 func: Optional[F] = None, 329 *, 330 name: Optional[str] = None, 331 model: Optional[str] = None, 332 capture_input: bool = True, 333 capture_output: bool = True, 334 capture_usage: bool = True, 335 metadata_fn: Optional[Callable[[Any, dict], None]] = None, 336) -> Union[F, Callable[[F], F]]: 337 """ 338 Decorator to open a generation (LLM) span around a function. 339 340 Use for any LLM call. Automatically creates a child span if a parent 341 is active; otherwise creates a root trace. 342 343 Args: 344 func: The function to decorate (when used without parens). 345 name: Override span name; defaults to function name. 346 model: Model name to attach (e.g. "gpt-4o"). 347 capture_input: Include function args in span input (default: True). 348 capture_output: Include return value in span output (default: True). 349 capture_usage: Try to extract token usage from return dict (default: True). 350 metadata_fn: Callback(result, metadata_dict) to add custom metadata. 351 352 Example:: 353 354 @trace_generation(name="llm_call", model="gpt-4o") 355 async def call_llm(prompt: str) -> dict[str, Any]: 356 # Should return {"text": "...", "usage": {"prompt_tokens": 50, ...}} 357 ... 358 """ 359 360 def decorator(fn: F) -> F: 361 gen_name = name or fn.__name__ 362 363 if asyncio.iscoroutinefunction(fn): 364 365 @functools.wraps(fn) 366 async def async_wrapper(*args: Any, **kwargs: Any) -> Any: 367 active = _get_active_span() 368 input_data = (args, kwargs) if capture_input else None 369 370 if active is not None: 371 # Parent span exists — create child generation 372 with active.generation(gen_name, model=model or "", input=input_data) as gen: 373 try: 374 result = await fn(*args, **kwargs) 375 except Exception as exc: 376 gen.set_error(exc) 377 raise 378 379 if capture_output: 380 gen.set_output(result) 381 382 # Try to extract token usage if result is a dict 383 if capture_usage and isinstance(result, dict) and "usage" in result: 384 usage = result["usage"] 385 gen.set_token_usage( 386 prompt_tokens=usage.get("prompt_tokens", 0), 387 completion_tokens=usage.get("completion_tokens", 0), 388 total_tokens=usage.get("total_tokens"), 389 ) 390 391 if metadata_fn is not None: 392 meta: dict[str, Any] = {} 393 metadata_fn(result, meta) 394 gen.set_metadata(**meta) 395 396 return result 397 else: 398 # No parent — create root trace as fallback 399 from . import get_tracer 400 tracer = get_tracer() 401 with tracer.trace(gen_name, input=input_data) as trace: 402 try: 403 result = await fn(*args, **kwargs) 404 except Exception as exc: 405 trace.set_error(exc) 406 raise 407 408 if capture_output: 409 trace.set_output(result) 410 411 if capture_usage and isinstance(result, dict) and "usage" in result: 412 usage = result["usage"] 413 trace.set_token_usage( 414 prompt_tokens=usage.get("prompt_tokens", 0), 415 completion_tokens=usage.get("completion_tokens", 0), 416 total_tokens=usage.get("total_tokens"), 417 ) 418 419 if metadata_fn is not None: 420 meta: dict[str, Any] = {} 421 metadata_fn(result, meta) 422 trace.set_metadata(**meta) 423 424 return result 425 426 return async_wrapper # type: ignore[return-value] 427 else: 428 429 @functools.wraps(fn) 430 def sync_wrapper(*args: Any, **kwargs: Any) -> Any: 431 active = _get_active_span() 432 input_data = (args, kwargs) if capture_input else None 433 434 if active is not None: 435 # Parent span exists — create child generation 436 with active.generation(gen_name, model=model or "", input=input_data) as gen: 437 try: 438 result = fn(*args, **kwargs) 439 except Exception as exc: 440 gen.set_error(exc) 441 raise 442 443 if capture_output: 444 gen.set_output(result) 445 446 if capture_usage and isinstance(result, dict) and "usage" in result: 447 usage = result["usage"] 448 gen.set_token_usage( 449 prompt_tokens=usage.get("prompt_tokens", 0), 450 completion_tokens=usage.get("completion_tokens", 0), 451 total_tokens=usage.get("total_tokens"), 452 ) 453 454 if metadata_fn is not None: 455 meta: dict[str, Any] = {} 456 metadata_fn(result, meta) 457 gen.set_metadata(**meta) 458 459 return result 460 else: 461 # No parent — create root trace as fallback 462 from . import get_tracer 463 tracer = get_tracer() 464 with tracer.trace(gen_name, input=input_data) as trace: 465 try: 466 result = fn(*args, **kwargs) 467 except Exception as exc: 468 trace.set_error(exc) 469 raise 470 471 if capture_output: 472 trace.set_output(result) 473 474 if capture_usage and isinstance(result, dict) and "usage" in result: 475 usage = result["usage"] 476 trace.set_token_usage( 477 prompt_tokens=usage.get("prompt_tokens", 0), 478 completion_tokens=usage.get("completion_tokens", 0), 479 total_tokens=usage.get("total_tokens"), 480 ) 481 482 if metadata_fn is not None: 483 meta: dict[str, Any] = {} 484 metadata_fn(result, meta) 485 trace.set_metadata(**meta) 486 487 return result 488 489 return sync_wrapper # type: ignore[return-value] 490 491 if func is not None: 492 return decorator(func) 493 return decorator
Decorator to open a generation (LLM) span around a function.
Use for any LLM call. Automatically creates a child span if a parent is active; otherwise creates a root trace.
Args: func: The function to decorate (when used without parens). name: Override span name; defaults to function name. model: Model name to attach (e.g. "gpt-4o"). capture_input: Include function args in span input (default: True). capture_output: Include return value in span output (default: True). capture_usage: Try to extract token usage from return dict (default: True). metadata_fn: Callback(result, metadata_dict) to add custom metadata.
Example::
@trace_generation(name="llm_call", model="gpt-4o")
async def call_llm(prompt: str) -> dict[str, Any]:
# Should return {"text": "...", "usage": {"prompt_tokens": 50, ...}}
...