gmf_forge_ai_shared_core.observability.tracing

Tracing sub-package — provider-agnostic observability for RAG and agent apps.

Quick start

Set one env var to pick a backend; application code never changes::

# .env
TRACING_PROVIDER=splunk          # or "null" (default)
SPLUNK_OTLP_ENDPOINT=https://ingest.us0.signalfx.com/v2/trace/otlp
SPLUNK_ACCESS_TOKEN=your-token

Then in your app — explicit style (most control)::

from gmf_forge_ai_shared_core.observability.tracing import get_tracer

tracer = get_tracer()   # reads env; returns NullTracer if nothing configured

with tracer.trace("rag_query", input=question, user_id="u1") as trace:

    with trace.span("retrieval", input=question) as span:
        docs = retriever.retrieve(question)
        span.set_output({"doc_count": len(docs)})

    with trace.generation("llm", model="gpt-4o-mini", input=prompt) as gen:
        reply = llm.complete(prompt)
        gen.set_output(reply.text)
        gen.set_token_usage(prompt_tokens=120, completion_tokens=80)

    trace.set_output(reply.text)

tracer.flush()

Or decorator style (less boilerplate)::

from gmf_forge_ai_shared_core.observability.tracing import traced, trace_span, trace_generation, get_tracer

@traced(name="rag_query")
async def answer(question: str):
    docs = await retrieve(question)
    reply = await generate(question, docs)
    return reply

@trace_span(name="retrieval")
async def retrieve(question: str):
    return ["doc1", "doc2"]

@trace_generation(name="llm_call", model="gpt-4o-mini")
async def generate(question: str, docs: list[str]):
    return "answer"

tracer = get_tracer()
tracer.flush()  # at shutdown

Supported TRACING_PROVIDER values

null — no-op, zero overhead (default when env var is absent) splunk — Splunk Observability Cloud via OpenTelemetry OTLP/HTTP langfuse — Langfuse tracing backend

Adding a new backend

  1. Create my_tracer.py alongside this file implementing ~gmf_forge_ai_shared_core.observability.tracing.base.TracingProvider.
  2. Add an entry to _PROVIDERS in this module.
  3. Set TRACING_PROVIDER=my_backend in .env — done.
  1"""
  2Tracing sub-package — provider-agnostic observability for RAG and agent apps.
  3
  4Quick start
  5-----------
  6Set one env var to pick a backend; application code never changes::
  7
  8    # .env
  9    TRACING_PROVIDER=splunk          # or "null" (default)
 10    SPLUNK_OTLP_ENDPOINT=https://ingest.us0.signalfx.com/v2/trace/otlp
 11    SPLUNK_ACCESS_TOKEN=your-token
 12
 13Then in your app — explicit style (most control)::
 14
 15    from gmf_forge_ai_shared_core.observability.tracing import get_tracer
 16
 17    tracer = get_tracer()   # reads env; returns NullTracer if nothing configured
 18
 19    with tracer.trace("rag_query", input=question, user_id="u1") as trace:
 20
 21        with trace.span("retrieval", input=question) as span:
 22            docs = retriever.retrieve(question)
 23            span.set_output({"doc_count": len(docs)})
 24
 25        with trace.generation("llm", model="gpt-4o-mini", input=prompt) as gen:
 26            reply = llm.complete(prompt)
 27            gen.set_output(reply.text)
 28            gen.set_token_usage(prompt_tokens=120, completion_tokens=80)
 29
 30        trace.set_output(reply.text)
 31
 32    tracer.flush()
 33
 34Or decorator style (less boilerplate)::
 35
 36    from gmf_forge_ai_shared_core.observability.tracing import traced, trace_span, trace_generation, get_tracer
 37
 38    @traced(name="rag_query")
 39    async def answer(question: str):
 40        docs = await retrieve(question)
 41        reply = await generate(question, docs)
 42        return reply
 43
 44    @trace_span(name="retrieval")
 45    async def retrieve(question: str):
 46        return ["doc1", "doc2"]
 47
 48    @trace_generation(name="llm_call", model="gpt-4o-mini")
 49    async def generate(question: str, docs: list[str]):
 50        return "answer"
 51
 52    tracer = get_tracer()
 53    tracer.flush()  # at shutdown
 54
 55Supported TRACING_PROVIDER values
 56----------------------------------
 57null    — no-op, zero overhead (default when env var is absent)
 58splunk  — Splunk Observability Cloud via OpenTelemetry OTLP/HTTP
 59langfuse — Langfuse tracing backend
 60
 61Adding a new backend
 62--------------------
 631. Create ``my_tracer.py`` alongside this file implementing
 64   :class:`~gmf_forge_ai_shared_core.observability.tracing.base.TracingProvider`.
 652. Add an entry to ``_PROVIDERS`` in this module.
 663. Set ``TRACING_PROVIDER=my_backend`` in .env — done.
 67"""
 68
 69from __future__ import annotations
 70
 71import os
 72from typing import Optional
 73
 74from .base import Span, TracingConfig, TracingProvider
 75from .decorators import traced, trace_generation, trace_span
 76from .ai_foundry_tracer import AiFoundryTracer
 77from .langfuse_tracer import LangfuseTracer
 78from .null_tracer import NullTracer
 79from .splunk_tracer import SplunkTracer
 80from gmf_forge_ai_shared_core.observability.logger import BasicLogger
 81
 82_logger = BasicLogger(__name__)
 83
 84__all__ = [
 85    "TracingProvider",
 86    "TracingConfig",
 87    "Span",
 88    "NullTracer",
 89    "SplunkTracer",
 90    "LangfuseTracer",
 91    "AiFoundryTracer",
 92    "get_tracer",
 93    "traced",
 94    "trace_span",
 95    "trace_generation",
 96]
 97
 98# Registry: TRACING_PROVIDER value → provider class
 99_PROVIDERS: dict[str, type[TracingProvider]] = {
100    "null":       NullTracer,
101    "none":       NullTracer,
102    "splunk":     SplunkTracer,
103    "langfuse":   LangfuseTracer,
104    "ai_foundry": AiFoundryTracer,
105}
106
107# Module-level singleton — one tracer per process
108_tracer_instance: Optional[TracingProvider] = None
109
110
111def get_tracer(reset: bool = False) -> TracingProvider:
112    """
113    Return the process-wide tracing provider, initialising it on first call.
114
115    Reads configuration from environment variables.  Pass ``reset=True``
116    to force re-initialisation (useful in tests).
117
118    Environment variables
119    ~~~~~~~~~~~~~~~~~~~~~
120    TRACING_PROVIDER
121        Which backend to use.  One of: ``null``, ``splunk``, ``langfuse``.
122        Defaults to ``null`` when not set.
123
124    (Splunk)
125    SPLUNK_OTLP_ENDPOINT    OTLP/HTTP export endpoint
126    SPLUNK_ACCESS_TOKEN     Splunk Observability Cloud access token
127    SPLUNK_SERVICE_NAME     Service name shown in APM (default: gmf-forge-ai)
128
129    (Langfuse)
130    LANGFUSE_PUBLIC_KEY     Langfuse public key
131    LANGFUSE_SECRET_KEY     Langfuse secret key
132    LANGFUSE_HOST           Langfuse host (default: https://cloud.langfuse.com)
133
134    (Common)
135    ENVIRONMENT             Deployment environment tag (default: development)
136    RELEASE                 App release/version tag (optional)
137
138    Returns:
139        A :class:`TracingProvider` instance ready to use.
140    """
141    global _tracer_instance
142    if _tracer_instance is not None and not reset:
143        return _tracer_instance
144
145    config = _config_from_env()
146    provider_key = config.provider.lower().strip()
147    provider_cls = _PROVIDERS.get(provider_key, NullTracer)
148
149    if provider_key not in _PROVIDERS:
150        _logger.warning(
151            "Unknown TRACING_PROVIDER — falling back to NullTracer",
152            provider=provider_key,
153            supported=list(_PROVIDERS),
154        )
155
156    _tracer_instance = provider_cls(config) if provider_cls is not NullTracer else NullTracer()
157    return _tracer_instance
158
159
160def _config_from_env() -> TracingConfig:
161    """Build a TracingConfig by reading environment variables."""
162    return TracingConfig(
163        provider=os.getenv("TRACING_PROVIDER", "null"),
164
165        # Langfuse
166        langfuse_public_key=os.getenv("LANGFUSE_PUBLIC_KEY", ""),
167        langfuse_secret_key=os.getenv("LANGFUSE_SECRET_KEY", ""),
168        langfuse_host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"),
169
170        # Azure AI Foundry
171        azure_ai_connection_string=os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING", ""),
172
173        # Splunk
174        splunk_otlp_endpoint=os.getenv("SPLUNK_OTLP_ENDPOINT", ""),
175        splunk_access_token=os.getenv("SPLUNK_ACCESS_TOKEN", ""),
176        splunk_service_name=os.getenv("SPLUNK_SERVICE_NAME", "gmf-forge-ai"),
177
178        # Common
179        environment=os.getenv("ENVIRONMENT", "development"),
180        release=os.getenv("RELEASE", ""),
181    )
class TracingProvider(abc.ABC):
206class TracingProvider(ABC):
207    """
208    Abstract tracing provider.
209
210    Implement this class to add a new backend.  Applications only ever
211    import :class:`TracingProvider` and call :func:`get_tracer` — they
212    have no knowledge of the concrete backend in use.
213    """
214
215    # ── Core operations ───────────────────────────────────────────────────────
216
217    @abstractmethod
218    def trace(self, name: str, **kwargs: Any) -> Span:
219        """
220        Start a new root trace.
221
222        Args:
223            name:   Human-readable name for the operation (e.g. ``"rag_query"``).
224            **kwargs: Optional fields forwarded to the backend:
225                - ``input``   — the operation input (question, prompt, …)
226                - ``user_id`` — end-user identifier for session grouping
227                - ``session_id`` — conversation or session ID
228                - ``metadata`` — dict of arbitrary key/value pairs
229                - ``tags``    — list of string tags
230
231        Returns:
232            A :class:`Span` to use as a context manager.
233        """
234
235    @abstractmethod
236    def score(
237        self,
238        trace_id: str,
239        name: str,
240        value: float,
241        comment: str = "",
242    ) -> None:
243        """
244        Attach a quality score to a completed trace.
245
246        Args:
247            trace_id: ID of the trace to score (``span.trace_id``).
248            name:     Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``.
249            value:    Numeric score — typically 0.0–1.0 but provider-specific.
250            comment:  Optional free-text explanation.
251        """
252
253    @abstractmethod
254    def flush(self) -> None:
255        """Flush any buffered events to the backend. Call before process exit."""
256
257    @abstractmethod
258    def shutdown(self) -> None:
259        """Gracefully shut down the provider and release resources."""
260
261    # ── Convenience helpers (non-abstract, built on trace()) ──────────────────
262
263    def trace_rag_query(
264        self,
265        question: str,
266        user_id: str = "",
267        session_id: str = "",
268        **metadata: Any,
269    ) -> Span:
270        """Shortcut to open a root trace for a RAG question-answering call."""
271        return self.trace(
272            "rag_query",
273            input=question,
274            user_id=user_id or None,
275            session_id=session_id or None,
276            metadata=metadata or None,
277            tags=["rag"],
278        )
279
280    def trace_agent_run(
281        self,
282        agent_name: str,
283        input: Any,
284        user_id: str = "",
285        session_id: str = "",
286        **metadata: Any,
287    ) -> Span:
288        """Shortcut to open a root trace for an agent invocation."""
289        return self.trace(
290            f"agent:{agent_name}",
291            input=input,
292            user_id=user_id or None,
293            session_id=session_id or None,
294            metadata=metadata or None,
295            tags=["agent", agent_name],
296        )

Abstract tracing provider.

Implement this class to add a new backend. Applications only ever import TracingProvider and call get_tracer() — they have no knowledge of the concrete backend in use.

@abstractmethod
def trace( self, name: str, **kwargs: Any) -> Span:
217    @abstractmethod
218    def trace(self, name: str, **kwargs: Any) -> Span:
219        """
220        Start a new root trace.
221
222        Args:
223            name:   Human-readable name for the operation (e.g. ``"rag_query"``).
224            **kwargs: Optional fields forwarded to the backend:
225                - ``input``   — the operation input (question, prompt, …)
226                - ``user_id`` — end-user identifier for session grouping
227                - ``session_id`` — conversation or session ID
228                - ``metadata`` — dict of arbitrary key/value pairs
229                - ``tags``    — list of string tags
230
231        Returns:
232            A :class:`Span` to use as a context manager.
233        """

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

@abstractmethod
def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
235    @abstractmethod
236    def score(
237        self,
238        trace_id: str,
239        name: str,
240        value: float,
241        comment: str = "",
242    ) -> None:
243        """
244        Attach a quality score to a completed trace.
245
246        Args:
247            trace_id: ID of the trace to score (``span.trace_id``).
248            name:     Score dimension name, e.g. ``"faithfulness"``, ``"relevance"``.
249            value:    Numeric score — typically 0.0–1.0 but provider-specific.
250            comment:  Optional free-text explanation.
251        """

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

@abstractmethod
def flush(self) -> None:
253    @abstractmethod
254    def flush(self) -> None:
255        """Flush any buffered events to the backend. Call before process exit."""

Flush any buffered events to the backend. Call before process exit.

@abstractmethod
def shutdown(self) -> None:
257    @abstractmethod
258    def shutdown(self) -> None:
259        """Gracefully shut down the provider and release resources."""

Gracefully shut down the provider and release resources.

def trace_rag_query( self, question: str, user_id: str = '', session_id: str = '', **metadata: Any) -> Span:
263    def trace_rag_query(
264        self,
265        question: str,
266        user_id: str = "",
267        session_id: str = "",
268        **metadata: Any,
269    ) -> Span:
270        """Shortcut to open a root trace for a RAG question-answering call."""
271        return self.trace(
272            "rag_query",
273            input=question,
274            user_id=user_id or None,
275            session_id=session_id or None,
276            metadata=metadata or None,
277            tags=["rag"],
278        )

Shortcut to open a root trace for a RAG question-answering call.

def trace_agent_run( self, agent_name: str, input: Any, user_id: str = '', session_id: str = '', **metadata: Any) -> Span:
280    def trace_agent_run(
281        self,
282        agent_name: str,
283        input: Any,
284        user_id: str = "",
285        session_id: str = "",
286        **metadata: Any,
287    ) -> Span:
288        """Shortcut to open a root trace for an agent invocation."""
289        return self.trace(
290            f"agent:{agent_name}",
291            input=input,
292            user_id=user_id or None,
293            session_id=session_id or None,
294            metadata=metadata or None,
295            tags=["agent", agent_name],
296        )

Shortcut to open a root trace for an agent invocation.

@dataclass
class TracingConfig:
303@dataclass
304class TracingConfig:
305    """
306    Configuration for a tracing provider, loaded from env vars by
307    :func:`get_tracer`.
308
309    All fields are optional — providers only read what they need.
310    """
311    provider: str = "null"
312
313    # Langfuse
314    langfuse_public_key: str = ""
315    langfuse_secret_key: str = ""
316    langfuse_host: str = "https://cloud.langfuse.com"
317
318    # LangSmith
319    langsmith_api_key: str = ""
320    langsmith_project: str = "default"
321    langsmith_endpoint: str = "https://api.smith.langchain.com"
322
323    # Azure AI Foundry
324    azure_ai_connection_string: str = ""   # from AI Foundry project settings
325    azure_ai_project_name: str = ""
326
327    # Splunk (via OpenTelemetry OTLP)
328    splunk_otlp_endpoint: str = ""
329    splunk_access_token: str = ""
330    splunk_service_name: str = "gmf-forge-ai"
331
332    # Common
333    environment: str = "development"
334    release: str = ""
335    extra: Dict[str, Any] = field(default_factory=dict)

Configuration for a tracing provider, loaded from env vars by get_tracer().

All fields are optional — providers only read what they need.

TracingConfig( provider: str = 'null', langfuse_public_key: str = '', langfuse_secret_key: str = '', langfuse_host: str = 'https://cloud.langfuse.com', langsmith_api_key: str = '', langsmith_project: str = 'default', langsmith_endpoint: str = 'https://api.smith.langchain.com', azure_ai_connection_string: str = '', azure_ai_project_name: str = '', splunk_otlp_endpoint: str = '', splunk_access_token: str = '', splunk_service_name: str = 'gmf-forge-ai', environment: str = 'development', release: str = '', extra: Dict[str, Any] = <factory>)
provider: str = 'null'
langfuse_public_key: str = ''
langfuse_secret_key: str = ''
langfuse_host: str = 'https://cloud.langfuse.com'
langsmith_api_key: str = ''
langsmith_project: str = 'default'
langsmith_endpoint: str = 'https://api.smith.langchain.com'
azure_ai_connection_string: str = ''
azure_ai_project_name: str = ''
splunk_otlp_endpoint: str = ''
splunk_access_token: str = ''
splunk_service_name: str = 'gmf-forge-ai'
environment: str = 'development'
release: str = ''
extra: Dict[str, Any]
class Span:
 84class Span:
 85    """
 86    A single unit of work inside a trace.
 87
 88    Use as a context manager::
 89
 90        with tracer.trace("rag_query", input=question) as span:
 91            span.set_output(answer)
 92
 93    Instances are returned by :meth:`TracingProvider.trace`,
 94    :meth:`Span.span`, and :meth:`Span.generation`.  Child spans created
 95    via :meth:`Span.span` or :meth:`Span.generation` are automatically
 96    linked to the parent.
 97
 98    Attributes:
 99        id:        Unique identifier for this span/generation.
100        trace_id:  Identifier of the root trace this span belongs to.
101        name:      Human-readable operation name.
102        is_root:   True when this span *is* the root trace.
103    """
104
105    def __init__(
106        self,
107        name: str,
108        trace_id: Optional[str] = None,
109        span_id: Optional[str] = None,
110        is_root: bool = False,
111        kind: str = "span",          # "trace" | "span" | "generation"
112    ):
113        self.name = name
114        self.id: str = span_id or str(uuid.uuid4())
115        self.trace_id: str = trace_id or self.id
116        self.is_root = is_root
117        self.kind = kind
118
119        self._input: Any = None
120        self._output: Any = None
121        self._metadata: Dict[str, Any] = {}
122        self._error: Optional[Exception] = None
123        self._token_usage: Dict[str, int] = {}
124        self._model: Optional[str] = None
125        self._active_token: Any = None  # contextvars token for cleanup
126
127    # ── Mutators ─────────────────────────────────────────────────────────────
128
129    def set_input(self, input: Any) -> "Span":
130        self._input = input
131        return self
132
133    def set_output(self, output: Any) -> "Span":
134        self._output = output
135        return self
136
137    def set_metadata(self, **kwargs: Any) -> "Span":
138        self._metadata.update(kwargs)
139        return self
140
141    def set_error(self, error: Exception) -> "Span":
142        self._error = error
143        return self
144
145    def set_token_usage(
146        self,
147        prompt_tokens: int = 0,
148        completion_tokens: int = 0,
149        total_tokens: Optional[int] = None,
150    ) -> "Span":
151        self._token_usage = {
152            "prompt_tokens": prompt_tokens,
153            "completion_tokens": completion_tokens,
154        }
155        if total_tokens is not None:
156            self._token_usage["total_tokens"] = total_tokens
157        return self
158
159    # ── Child span factories (overridden by backend implementations) ──────────
160
161    def span(self, name: str, **kwargs: Any) -> "Span":
162        """Create a child span under this span. Override in subclasses."""
163        return Span(name=name, trace_id=self.trace_id, kind="span")
164
165    def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span":
166        """Create a child generation span (LLM call). Override in subclasses."""
167        s = Span(name=name, trace_id=self.trace_id, kind="generation")
168        s._model = model
169        return s
170
171    # ── Context manager ───────────────────────────────────────────────────────
172
173    def __enter__(self) -> "Span":
174        # Track this span as active in the current context for decorator nesting
175        self._active_token = _set_active_span(self)
176        return self
177
178    def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
179        if exc_val is not None:
180            self.set_error(exc_val)
181        self._end()
182        # Restore previous active span (only if token was set)
183        if self._active_token is not None:
184            try:
185                _active_span.reset(self._active_token)  # type: ignore[attr-defined]
186            except (AttributeError, LookupError):
187                pass
188        return False  # do not suppress exceptions
189
190    def _end(self) -> None:
191        """Called when the context manager exits. Override to flush to backend."""
192
193    # ── Helpers ───────────────────────────────────────────────────────────────
194
195    def __repr__(self) -> str:
196        return (
197            f"<Span name={self.name!r} id={self.id[:8]} "
198            f"trace_id={self.trace_id[:8]} kind={self.kind}>"
199        )

A single unit of work inside a trace.

Use as a context manager::

with tracer.trace("rag_query", input=question) as span:
    span.set_output(answer)

Instances are returned by TracingProvider.trace(), Span.span(), and Span.generation(). Child spans created via Span.span() or Span.generation() are automatically linked to the parent.

Attributes: id: Unique identifier for this span/generation. trace_id: Identifier of the root trace this span belongs to. name: Human-readable operation name. is_root: True when this span is the root trace.

Span( name: str, trace_id: Optional[str] = None, span_id: Optional[str] = None, is_root: bool = False, kind: str = 'span')
105    def __init__(
106        self,
107        name: str,
108        trace_id: Optional[str] = None,
109        span_id: Optional[str] = None,
110        is_root: bool = False,
111        kind: str = "span",          # "trace" | "span" | "generation"
112    ):
113        self.name = name
114        self.id: str = span_id or str(uuid.uuid4())
115        self.trace_id: str = trace_id or self.id
116        self.is_root = is_root
117        self.kind = kind
118
119        self._input: Any = None
120        self._output: Any = None
121        self._metadata: Dict[str, Any] = {}
122        self._error: Optional[Exception] = None
123        self._token_usage: Dict[str, int] = {}
124        self._model: Optional[str] = None
125        self._active_token: Any = None  # contextvars token for cleanup
name
id: str
trace_id: str
is_root
kind
def set_input( self, input: Any) -> Span:
129    def set_input(self, input: Any) -> "Span":
130        self._input = input
131        return self
def set_output( self, output: Any) -> Span:
133    def set_output(self, output: Any) -> "Span":
134        self._output = output
135        return self
def set_metadata( self, **kwargs: Any) -> Span:
137    def set_metadata(self, **kwargs: Any) -> "Span":
138        self._metadata.update(kwargs)
139        return self
def set_error( self, error: Exception) -> Span:
141    def set_error(self, error: Exception) -> "Span":
142        self._error = error
143        return self
def set_token_usage( self, prompt_tokens: int = 0, completion_tokens: int = 0, total_tokens: Optional[int] = None) -> Span:
145    def set_token_usage(
146        self,
147        prompt_tokens: int = 0,
148        completion_tokens: int = 0,
149        total_tokens: Optional[int] = None,
150    ) -> "Span":
151        self._token_usage = {
152            "prompt_tokens": prompt_tokens,
153            "completion_tokens": completion_tokens,
154        }
155        if total_tokens is not None:
156            self._token_usage["total_tokens"] = total_tokens
157        return self
def span( self, name: str, **kwargs: Any) -> Span:
161    def span(self, name: str, **kwargs: Any) -> "Span":
162        """Create a child span under this span. Override in subclasses."""
163        return Span(name=name, trace_id=self.trace_id, kind="span")

Create a child span under this span. Override in subclasses.

def generation( self, name: str, model: str = '', **kwargs: Any) -> Span:
165    def generation(self, name: str, model: str = "", **kwargs: Any) -> "Span":
166        """Create a child generation span (LLM call). Override in subclasses."""
167        s = Span(name=name, trace_id=self.trace_id, kind="generation")
168        s._model = model
169        return s

Create a child generation span (LLM call). Override in subclasses.

 68class NullTracer(TracingProvider):
 69    """
 70    Default tracing provider — logs span lifecycle to the console.
 71
 72    Used when ``TRACING_PROVIDER`` is not set (or set to ``"null"`` /
 73    ``"none"``).  Each span emits a *started* and *finished* log line so
 74    the tracing structure is visible during local development and in
 75    example runs, without shipping data to any backend.
 76
 77    Switch to a real backend at any time by setting ``TRACING_PROVIDER``
 78    in ``.env`` — application code stays untouched.
 79
 80    Example::
 81
 82        tracer = NullTracer()
 83
 84        with tracer.trace("rag_query", input=question) as trace:
 85            with trace.span("retrieval", input=question) as span:
 86                docs = retriever.retrieve(question)
 87                span.set_output({"doc_count": len(docs)})
 88    """
 89
 90    def trace(self, name: str, **kwargs: Any) -> _NullSpan:
 91        span = _NullSpan(name=name, is_root=True, kind="trace")
 92        if "input" in kwargs:
 93            span.set_input(kwargs["input"])
 94        return span
 95
 96    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
 97        log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value}
 98        if comment:
 99            log_kw["comment"] = comment
100        _logger.info("[trace] score", **log_kw)
101
102    def flush(self) -> None:
103        pass
104
105    def shutdown(self) -> None:
106        pass

Default tracing provider — logs span lifecycle to the console.

Used when TRACING_PROVIDER is not set (or set to "null" / "none"). Each span emits a started and finished log line so the tracing structure is visible during local development and in example runs, without shipping data to any backend.

Switch to a real backend at any time by setting TRACING_PROVIDER in .env — application code stays untouched.

Example::

tracer = NullTracer()

with tracer.trace("rag_query", input=question) as trace:
    with trace.span("retrieval", input=question) as span:
        docs = retriever.retrieve(question)
        span.set_output({"doc_count": len(docs)})
def trace( self, name: str, **kwargs: Any) -> gmf_forge_ai_shared_core.observability.tracing.null_tracer._NullSpan:
90    def trace(self, name: str, **kwargs: Any) -> _NullSpan:
91        span = _NullSpan(name=name, is_root=True, kind="trace")
92        if "input" in kwargs:
93            span.set_input(kwargs["input"])
94        return span

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
 96    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
 97        log_kw: dict[str, Any] = {"trace_id": trace_id[:8], "name": name, "value": value}
 98        if comment:
 99            log_kw["comment"] = comment
100        _logger.info("[trace] score", **log_kw)

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

def flush(self) -> None:
102    def flush(self) -> None:
103        pass

Flush any buffered events to the backend. Call before process exit.

def shutdown(self) -> None:
105    def shutdown(self) -> None:
106        pass

Gracefully shut down the provider and release resources.

138class SplunkTracer(TracingProvider):
139    """
140    Tracing provider that exports to Splunk Observability Cloud via
141    OpenTelemetry OTLP/HTTP.
142
143    Traces appear in **Splunk APM** under the configured service name.
144    Token usage, model info, and scores are stored as span attributes
145    and can be surfaced via Splunk's MetricSets or custom dashboards.
146
147    Example::
148
149        import os
150        os.environ["TRACING_PROVIDER"]       = "splunk"
151        os.environ["SPLUNK_OTLP_ENDPOINT"]   = "https://ingest.us0.signalfx.com/v2/trace/otlp"
152        os.environ["SPLUNK_ACCESS_TOKEN"]    = "your-token"
153        os.environ["SPLUNK_SERVICE_NAME"]    = "policy-rag"
154
155        from gmf_forge_ai_shared_core.observability.tracing import get_tracer
156
157        tracer = get_tracer()
158
159        with tracer.trace("rag_query", input=question) as trace:
160            with trace.span("retrieval", input=question) as span:
161                docs = retriever.retrieve(question)
162                span.set_output({"doc_count": len(docs)})
163            with trace.generation("llm", model="gpt-4o", input=prompt) as gen:
164                reply = llm.complete(prompt)
165                gen.set_output(reply.text)
166                gen.set_token_usage(prompt_tokens=120, completion_tokens=80)
167            trace.set_output(reply.text)
168
169        tracer.flush()
170    """
171
172    def __init__(self, config: TracingConfig):
173        self._config = config
174        self._otel_tracer: Optional[Any] = None
175        self._init_client()
176
177    def _init_client(self) -> None:
178        try:
179            from opentelemetry import trace as otel_trace                           # type: ignore
180            from opentelemetry.sdk.trace import TracerProvider                      # type: ignore
181            from opentelemetry.sdk.trace.export import BatchSpanProcessor           # type: ignore
182            from opentelemetry.sdk.resources import Resource                        # type: ignore
183            from opentelemetry.exporter.otlp.proto.http.trace_exporter import (    # type: ignore
184                OTLPSpanExporter,
185            )
186        except ImportError:
187            _logger.warning(
188                "opentelemetry-sdk or opentelemetry-exporter-otlp-proto-http not found — falling back to no-op tracing",
189                install_cmd="pip install opentelemetry-sdk opentelemetry-exporter-otlp-proto-http",
190            )
191            return
192
193        endpoint    = self._config.splunk_otlp_endpoint
194        token       = self._config.splunk_access_token
195        service     = self._config.splunk_service_name or "gmf-forge-ai"
196        environment = self._config.environment or "development"
197
198        if not endpoint or not token:
199            _logger.warning(
200                "SPLUNK_OTLP_ENDPOINT / SPLUNK_ACCESS_TOKEN not set — falling back to no-op tracing",
201                missing=[v for v, val in [("SPLUNK_OTLP_ENDPOINT", endpoint), ("SPLUNK_ACCESS_TOKEN", token)] if not val],
202            )
203            return
204
205        try:
206            resource = Resource.create({
207                "service.name":        service,
208                "deployment.environment": environment,
209            })
210            exporter = OTLPSpanExporter(
211                endpoint=endpoint,
212                headers={"X-SF-Token": token},
213            )
214            provider = TracerProvider(resource=resource)
215            provider.add_span_processor(BatchSpanProcessor(exporter))
216            otel_trace.set_tracer_provider(provider)
217            self._otel_tracer = provider.get_tracer(service)
218        except Exception as exc:
219            _logger.error("Splunk tracer init failed", error=str(exc))
220
221    # ── TracingProvider interface ─────────────────────────────────────────────
222
223    def trace(self, name: str, **kwargs: Any) -> Span:
224        from .null_tracer import _NullSpan
225
226        if self._otel_tracer is None:
227            return _NullSpan(name=name, is_root=True, kind="trace")
228
229        input_data = kwargs.get("input")
230        user_id    = kwargs.get("user_id")
231        session_id = kwargs.get("session_id")
232        tags       = kwargs.get("tags") or []
233
234        try:
235            otel_span = self._otel_tracer.start_span(name)
236            if input_data is not None:
237                otel_span.set_attribute("input", str(input_data))
238            if user_id:
239                otel_span.set_attribute("user_id", str(user_id))
240            if session_id:
241                otel_span.set_attribute("session_id", str(session_id))
242            if tags:
243                otel_span.set_attribute("tags", ",".join(tags))
244
245            trace_id = format(otel_span.get_span_context().trace_id, "032x")
246            span = _SplunkSpan(
247                otel_span=otel_span,
248                otel_tracer=self._otel_tracer,
249                trace_id=trace_id,
250                name=name,
251                kind="trace",
252            )
253            if input_data is not None:
254                span._input = input_data
255            return span
256        except Exception as exc:
257            _logger.error("Splunk trace() failed", error=str(exc))
258            return _NullSpan(name=name, is_root=True, kind="trace")
259
260    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
261        # Splunk APM doesn't have a native scoring API; scores can be
262        # emitted as custom metrics via the Splunk metrics ingest endpoint.
263        # For now we record it as a log-level attribute on the active span.
264        pass
265
266    def flush(self) -> None:
267        try:
268            from opentelemetry import trace as otel_trace  # type: ignore
269            provider = otel_trace.get_tracer_provider()
270            if hasattr(provider, "force_flush"):
271                provider.force_flush()
272        except Exception:
273            pass
274
275    def shutdown(self) -> None:
276        try:
277            from opentelemetry import trace as otel_trace  # type: ignore
278            provider = otel_trace.get_tracer_provider()
279            if hasattr(provider, "shutdown"):
280                provider.shutdown()
281        except Exception:
282            pass

Tracing provider that exports to Splunk Observability Cloud via OpenTelemetry OTLP/HTTP.

Traces appear in Splunk APM under the configured service name. Token usage, model info, and scores are stored as span attributes and can be surfaced via Splunk's MetricSets or custom dashboards.

Example::

import os
os.environ["TRACING_PROVIDER"]       = "splunk"
os.environ["SPLUNK_OTLP_ENDPOINT"]   = "https://ingest.us0.signalfx.com/v2/trace/otlp"
os.environ["SPLUNK_ACCESS_TOKEN"]    = "your-token"
os.environ["SPLUNK_SERVICE_NAME"]    = "policy-rag"

from gmf_forge_ai_shared_core.observability.tracing import get_tracer

tracer = get_tracer()

with tracer.trace("rag_query", input=question) as trace:
    with trace.span("retrieval", input=question) as span:
        docs = retriever.retrieve(question)
        span.set_output({"doc_count": len(docs)})
    with trace.generation("llm", model="gpt-4o", input=prompt) as gen:
        reply = llm.complete(prompt)
        gen.set_output(reply.text)
        gen.set_token_usage(prompt_tokens=120, completion_tokens=80)
    trace.set_output(reply.text)

tracer.flush()
SplunkTracer( config: TracingConfig)
172    def __init__(self, config: TracingConfig):
173        self._config = config
174        self._otel_tracer: Optional[Any] = None
175        self._init_client()
def trace( self, name: str, **kwargs: Any) -> Span:
223    def trace(self, name: str, **kwargs: Any) -> Span:
224        from .null_tracer import _NullSpan
225
226        if self._otel_tracer is None:
227            return _NullSpan(name=name, is_root=True, kind="trace")
228
229        input_data = kwargs.get("input")
230        user_id    = kwargs.get("user_id")
231        session_id = kwargs.get("session_id")
232        tags       = kwargs.get("tags") or []
233
234        try:
235            otel_span = self._otel_tracer.start_span(name)
236            if input_data is not None:
237                otel_span.set_attribute("input", str(input_data))
238            if user_id:
239                otel_span.set_attribute("user_id", str(user_id))
240            if session_id:
241                otel_span.set_attribute("session_id", str(session_id))
242            if tags:
243                otel_span.set_attribute("tags", ",".join(tags))
244
245            trace_id = format(otel_span.get_span_context().trace_id, "032x")
246            span = _SplunkSpan(
247                otel_span=otel_span,
248                otel_tracer=self._otel_tracer,
249                trace_id=trace_id,
250                name=name,
251                kind="trace",
252            )
253            if input_data is not None:
254                span._input = input_data
255            return span
256        except Exception as exc:
257            _logger.error("Splunk trace() failed", error=str(exc))
258            return _NullSpan(name=name, is_root=True, kind="trace")

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
260    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
261        # Splunk APM doesn't have a native scoring API; scores can be
262        # emitted as custom metrics via the Splunk metrics ingest endpoint.
263        # For now we record it as a log-level attribute on the active span.
264        pass

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

def flush(self) -> None:
266    def flush(self) -> None:
267        try:
268            from opentelemetry import trace as otel_trace  # type: ignore
269            provider = otel_trace.get_tracer_provider()
270            if hasattr(provider, "force_flush"):
271                provider.force_flush()
272        except Exception:
273            pass

Flush any buffered events to the backend. Call before process exit.

def shutdown(self) -> None:
275    def shutdown(self) -> None:
276        try:
277            from opentelemetry import trace as otel_trace  # type: ignore
278            provider = otel_trace.get_tracer_provider()
279            if hasattr(provider, "shutdown"):
280                provider.shutdown()
281        except Exception:
282            pass

Gracefully shut down the provider and release resources.

169class LangfuseTracer(TracingProvider):
170    """Tracing provider that exports traces to Langfuse (SDK v3+)."""
171
172    def __init__(self, config: TracingConfig):
173        self._config = config
174        self._client: Optional[Any] = None
175        self._init_client()
176
177    def _init_client(self) -> None:
178        try:
179            from langfuse import Langfuse  # type: ignore
180        except ImportError:
181            _logger.warning(
182                "langfuse package not found — falling back to no-op tracing",
183                install_cmd="pip install langfuse",
184            )
185            return
186
187        public_key = self._config.langfuse_public_key
188        secret_key = self._config.langfuse_secret_key
189        host = self._config.langfuse_host or "https://cloud.langfuse.com"
190
191        missing = [
192            name
193            for name, value in [
194                ("LANGFUSE_PUBLIC_KEY", public_key),
195                ("LANGFUSE_SECRET_KEY", secret_key),
196            ]
197            if not value
198        ]
199        if missing:
200            _logger.warning(
201                "LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY not set — falling back to no-op tracing",
202                missing=missing,
203            )
204            return
205
206        try:
207            self._client = Langfuse(
208                public_key=public_key,
209                secret_key=secret_key,
210                host=host,
211            )
212        except Exception as exc:
213            _logger.error("Langfuse tracer init failed", error=str(exc))
214
215    def trace(self, name: str, **kwargs: Any) -> Span:
216        from .null_tracer import _NullSpan
217
218        if self._client is None:
219            return _NullSpan(name=name, is_root=True, kind="trace")
220
221        input_data = kwargs.get("input")
222        user_id = kwargs.get("user_id")
223        session_id = kwargs.get("session_id")
224        tags = kwargs.get("tags")
225
226        # Merge caller metadata with trace-level fields not supported as direct
227        # parameters on start_as_current_observation in SDK v3+.
228        metadata: dict[str, Any] = dict(kwargs.get("metadata") or {})
229        if user_id:
230            metadata["user_id"] = user_id
231        if session_id:
232            metadata["session_id"] = session_id
233        if tags:
234            metadata["tags"] = tags
235        if self._config.environment:
236            metadata["environment"] = self._config.environment
237
238        try:
239            from langfuse.types import TraceContext  # type: ignore
240
241            # Pre-allocate a deterministic trace ID so callers can reference it
242            # before the observation is flushed.
243            trace_id = self._client.create_trace_id()
244
245            lf_ctx = self._client.start_as_current_observation(
246                trace_context=TraceContext(trace_id=trace_id),
247                name=name,
248                as_type="span",
249                input=input_data,
250                metadata=metadata or None,
251                version=self._config.release or None,
252            )
253            span = _LangfuseSpan(
254                lf_client=self._client,
255                lf_ctx=lf_ctx,
256                trace_id=trace_id,
257                name=name,
258                kind="trace",
259            )
260            span.id = trace_id
261            span.is_root = True
262            if input_data is not None:
263                span.set_input(input_data)
264            return span
265        except Exception as exc:
266            _logger.error("Langfuse trace() failed", error=str(exc))
267            return _NullSpan(name=name, is_root=True, kind="trace")
268
269    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
270        if self._client is None:
271            return
272        try:
273            self._client.create_score(
274                trace_id=trace_id,
275                name=name,
276                value=value,
277                comment=comment or None,
278            )
279        except Exception:
280            pass
281
282    def flush(self) -> None:
283        if self._client is None:
284            return
285        try:
286            if hasattr(self._client, "flush"):
287                self._client.flush()
288        except Exception:
289            pass
290
291    def shutdown(self) -> None:
292        if self._client is None:
293            return
294        try:
295            if hasattr(self._client, "shutdown"):
296                self._client.shutdown()
297            elif hasattr(self._client, "flush"):
298                self._client.flush()
299        except Exception:
300            pass

Tracing provider that exports traces to Langfuse (SDK v3+).

LangfuseTracer( config: TracingConfig)
172    def __init__(self, config: TracingConfig):
173        self._config = config
174        self._client: Optional[Any] = None
175        self._init_client()
def trace( self, name: str, **kwargs: Any) -> Span:
215    def trace(self, name: str, **kwargs: Any) -> Span:
216        from .null_tracer import _NullSpan
217
218        if self._client is None:
219            return _NullSpan(name=name, is_root=True, kind="trace")
220
221        input_data = kwargs.get("input")
222        user_id = kwargs.get("user_id")
223        session_id = kwargs.get("session_id")
224        tags = kwargs.get("tags")
225
226        # Merge caller metadata with trace-level fields not supported as direct
227        # parameters on start_as_current_observation in SDK v3+.
228        metadata: dict[str, Any] = dict(kwargs.get("metadata") or {})
229        if user_id:
230            metadata["user_id"] = user_id
231        if session_id:
232            metadata["session_id"] = session_id
233        if tags:
234            metadata["tags"] = tags
235        if self._config.environment:
236            metadata["environment"] = self._config.environment
237
238        try:
239            from langfuse.types import TraceContext  # type: ignore
240
241            # Pre-allocate a deterministic trace ID so callers can reference it
242            # before the observation is flushed.
243            trace_id = self._client.create_trace_id()
244
245            lf_ctx = self._client.start_as_current_observation(
246                trace_context=TraceContext(trace_id=trace_id),
247                name=name,
248                as_type="span",
249                input=input_data,
250                metadata=metadata or None,
251                version=self._config.release or None,
252            )
253            span = _LangfuseSpan(
254                lf_client=self._client,
255                lf_ctx=lf_ctx,
256                trace_id=trace_id,
257                name=name,
258                kind="trace",
259            )
260            span.id = trace_id
261            span.is_root = True
262            if input_data is not None:
263                span.set_input(input_data)
264            return span
265        except Exception as exc:
266            _logger.error("Langfuse trace() failed", error=str(exc))
267            return _NullSpan(name=name, is_root=True, kind="trace")

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
269    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
270        if self._client is None:
271            return
272        try:
273            self._client.create_score(
274                trace_id=trace_id,
275                name=name,
276                value=value,
277                comment=comment or None,
278            )
279        except Exception:
280            pass

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

def flush(self) -> None:
282    def flush(self) -> None:
283        if self._client is None:
284            return
285        try:
286            if hasattr(self._client, "flush"):
287                self._client.flush()
288        except Exception:
289            pass

Flush any buffered events to the backend. Call before process exit.

def shutdown(self) -> None:
291    def shutdown(self) -> None:
292        if self._client is None:
293            return
294        try:
295            if hasattr(self._client, "shutdown"):
296                self._client.shutdown()
297            elif hasattr(self._client, "flush"):
298                self._client.flush()
299        except Exception:
300            pass

Gracefully shut down the provider and release resources.

178class AiFoundryTracer(TracingProvider):
179    """
180    Tracing provider that exports spans to Azure Monitor / Application Insights
181    via the Azure Monitor OpenTelemetry distro.
182
183    Mirrors the ``AIFoundryObserver`` class from ``observability_decorator.py``
184    but implements the gmf-forge-ai ``TracingProvider`` interface so it can be
185    used interchangeably with ``LangfuseTracer`` and ``SplunkTracer``.
186    """
187
188    def __init__(self, config: TracingConfig):
189        self._config = config
190        self._otel_tracer: Optional[Any] = None
191        self._init_tracer()
192
193    def _init_tracer(self) -> None:
194        try:
195            from azure.monitor.opentelemetry import configure_azure_monitor  # type: ignore
196            from opentelemetry import trace as otel_trace  # type: ignore
197        except ImportError:
198            _logger.warning(
199                "azure-monitor-opentelemetry / opentelemetry-sdk not installed "
200                "— falling back to no-op tracing",
201                install_cmd="pip install azure-monitor-opentelemetry",
202            )
203            return
204
205        connection_string = self._config.azure_ai_connection_string
206        if not connection_string:
207            _logger.warning(
208                "APPLICATIONINSIGHTS_CONNECTION_STRING not set "
209                "— falling back to no-op tracing",
210            )
211            return
212
213        service_name = (
214            self._config.splunk_service_name  # reuse existing config field
215            or "gmf-forge-ai"
216        )
217
218        try:
219            configure_azure_monitor(
220                connection_string=connection_string,
221                service_name=service_name,
222                disable_offline_storage=False,
223            )
224            self._otel_tracer = otel_trace.get_tracer(
225                __name__,
226                schema_url="https://opentelemetry.io/schemas/1.11.0",
227            )
228            _logger.info(
229                "AI Foundry tracer (Azure Monitor) initialized",
230                service=service_name,
231            )
232        except Exception as exc:
233            _logger.error("AiFoundryTracer init failed", error=str(exc))
234
235    # ── TracingProvider interface ─────────────────────────────────────────────
236
237    def trace(self, name: str, **kwargs: Any) -> Span:
238        from .null_tracer import _NullSpan
239
240        if self._otel_tracer is None:
241            return _NullSpan(name=name, is_root=True, kind="trace")
242
243        input_data = kwargs.get("input")
244        user_id = kwargs.get("user_id")
245        session_id = kwargs.get("session_id")
246        metadata: dict[str, Any] = dict(kwargs.get("metadata") or {})
247        tags = kwargs.get("tags")
248
249        try:
250            from opentelemetry import trace as otel_trace  # type: ignore
251
252            otel_span = self._otel_tracer.start_span(name)
253
254            # Attach standard attributes matching AIFoundryObserver pattern
255            otel_span.set_attribute("function.name", name)
256            otel_span.set_attribute("span.kind", "trace")
257            if self._config.environment:
258                otel_span.set_attribute("deployment.environment", self._config.environment)
259            if self._config.release:
260                otel_span.set_attribute("service.version", self._config.release)
261            if user_id:
262                otel_span.set_attribute("enduser.id", user_id)
263            if session_id:
264                otel_span.set_attribute("session.id", session_id)
265            if tags:
266                otel_span.set_attribute("tags", ", ".join(tags) if isinstance(tags, list) else str(tags))
267            for key, value in metadata.items():
268                if isinstance(value, (str, int, float, bool)):
269                    otel_span.set_attribute(f"metadata.{key}", value)
270            if input_data is not None:
271                otel_span.set_attribute("input", str(input_data))
272
273            # Derive trace_id from OTel span context
274            otel_ctx = otel_span.get_span_context()
275            trace_id = format(otel_ctx.trace_id, "032x") if otel_ctx else ""
276
277            span = _AIFoundrySpan(
278                otel_span=otel_span,
279                otel_tracer=self._otel_tracer,
280                trace_id=trace_id,
281                name=name,
282                kind="trace",
283            )
284            span.is_root = True
285            span.id = trace_id
286            if input_data is not None:
287                span.set_input(input_data)
288            return span
289
290        except Exception as exc:
291            _logger.error("AiFoundryTracer.trace() failed", error=str(exc))
292            return _NullSpan(name=name, is_root=True, kind="trace")
293
294    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
295        # Application Insights has no built-in quality-score concept;
296        # emit it as a custom event attribute on the active span instead.
297        if self._otel_tracer is None:
298            return
299        try:
300            from opentelemetry import trace as otel_trace  # type: ignore
301
302            current = otel_trace.get_current_span()
303            if current:
304                current.set_attribute(f"score.{name}", value)
305                if comment:
306                    current.set_attribute(f"score.{name}.comment", comment)
307        except Exception:
308            pass
309
310    def flush(self) -> None:
311        # The Azure Monitor exporter flushes automatically; nothing extra needed.
312        pass
313
314    def shutdown(self) -> None:
315        try:
316            from opentelemetry import trace as otel_trace  # type: ignore
317
318            provider = otel_trace.get_tracer_provider()
319            if hasattr(provider, "shutdown"):
320                provider.shutdown()
321        except Exception:
322            pass

Tracing provider that exports spans to Azure Monitor / Application Insights via the Azure Monitor OpenTelemetry distro.

Mirrors the AIFoundryObserver class from observability_decorator.py but implements the gmf-forge-ai TracingProvider interface so it can be used interchangeably with LangfuseTracer and SplunkTracer.

AiFoundryTracer( config: TracingConfig)
188    def __init__(self, config: TracingConfig):
189        self._config = config
190        self._otel_tracer: Optional[Any] = None
191        self._init_tracer()
def trace( self, name: str, **kwargs: Any) -> Span:
237    def trace(self, name: str, **kwargs: Any) -> Span:
238        from .null_tracer import _NullSpan
239
240        if self._otel_tracer is None:
241            return _NullSpan(name=name, is_root=True, kind="trace")
242
243        input_data = kwargs.get("input")
244        user_id = kwargs.get("user_id")
245        session_id = kwargs.get("session_id")
246        metadata: dict[str, Any] = dict(kwargs.get("metadata") or {})
247        tags = kwargs.get("tags")
248
249        try:
250            from opentelemetry import trace as otel_trace  # type: ignore
251
252            otel_span = self._otel_tracer.start_span(name)
253
254            # Attach standard attributes matching AIFoundryObserver pattern
255            otel_span.set_attribute("function.name", name)
256            otel_span.set_attribute("span.kind", "trace")
257            if self._config.environment:
258                otel_span.set_attribute("deployment.environment", self._config.environment)
259            if self._config.release:
260                otel_span.set_attribute("service.version", self._config.release)
261            if user_id:
262                otel_span.set_attribute("enduser.id", user_id)
263            if session_id:
264                otel_span.set_attribute("session.id", session_id)
265            if tags:
266                otel_span.set_attribute("tags", ", ".join(tags) if isinstance(tags, list) else str(tags))
267            for key, value in metadata.items():
268                if isinstance(value, (str, int, float, bool)):
269                    otel_span.set_attribute(f"metadata.{key}", value)
270            if input_data is not None:
271                otel_span.set_attribute("input", str(input_data))
272
273            # Derive trace_id from OTel span context
274            otel_ctx = otel_span.get_span_context()
275            trace_id = format(otel_ctx.trace_id, "032x") if otel_ctx else ""
276
277            span = _AIFoundrySpan(
278                otel_span=otel_span,
279                otel_tracer=self._otel_tracer,
280                trace_id=trace_id,
281                name=name,
282                kind="trace",
283            )
284            span.is_root = True
285            span.id = trace_id
286            if input_data is not None:
287                span.set_input(input_data)
288            return span
289
290        except Exception as exc:
291            _logger.error("AiFoundryTracer.trace() failed", error=str(exc))
292            return _NullSpan(name=name, is_root=True, kind="trace")

Start a new root trace.

Args: name: Human-readable name for the operation (e.g. "rag_query"). **kwargs: Optional fields forwarded to the backend: - input — the operation input (question, prompt, …) - user_id — end-user identifier for session grouping - session_id — conversation or session ID - metadata — dict of arbitrary key/value pairs - tags — list of string tags

Returns: A Span to use as a context manager.

def score(self, trace_id: str, name: str, value: float, comment: str = '') -> None:
294    def score(self, trace_id: str, name: str, value: float, comment: str = "") -> None:
295        # Application Insights has no built-in quality-score concept;
296        # emit it as a custom event attribute on the active span instead.
297        if self._otel_tracer is None:
298            return
299        try:
300            from opentelemetry import trace as otel_trace  # type: ignore
301
302            current = otel_trace.get_current_span()
303            if current:
304                current.set_attribute(f"score.{name}", value)
305                if comment:
306                    current.set_attribute(f"score.{name}.comment", comment)
307        except Exception:
308            pass

Attach a quality score to a completed trace.

Args: trace_id: ID of the trace to score (span.trace_id). name: Score dimension name, e.g. "faithfulness", "relevance". value: Numeric score — typically 0.0–1.0 but provider-specific. comment: Optional free-text explanation.

def flush(self) -> None:
310    def flush(self) -> None:
311        # The Azure Monitor exporter flushes automatically; nothing extra needed.
312        pass

Flush any buffered events to the backend. Call before process exit.

def shutdown(self) -> None:
314    def shutdown(self) -> None:
315        try:
316            from opentelemetry import trace as otel_trace  # type: ignore
317
318            provider = otel_trace.get_tracer_provider()
319            if hasattr(provider, "shutdown"):
320                provider.shutdown()
321        except Exception:
322            pass

Gracefully shut down the provider and release resources.

def get_tracer( reset: bool = False) -> TracingProvider:
112def get_tracer(reset: bool = False) -> TracingProvider:
113    """
114    Return the process-wide tracing provider, initialising it on first call.
115
116    Reads configuration from environment variables.  Pass ``reset=True``
117    to force re-initialisation (useful in tests).
118
119    Environment variables
120    ~~~~~~~~~~~~~~~~~~~~~
121    TRACING_PROVIDER
122        Which backend to use.  One of: ``null``, ``splunk``, ``langfuse``.
123        Defaults to ``null`` when not set.
124
125    (Splunk)
126    SPLUNK_OTLP_ENDPOINT    OTLP/HTTP export endpoint
127    SPLUNK_ACCESS_TOKEN     Splunk Observability Cloud access token
128    SPLUNK_SERVICE_NAME     Service name shown in APM (default: gmf-forge-ai)
129
130    (Langfuse)
131    LANGFUSE_PUBLIC_KEY     Langfuse public key
132    LANGFUSE_SECRET_KEY     Langfuse secret key
133    LANGFUSE_HOST           Langfuse host (default: https://cloud.langfuse.com)
134
135    (Common)
136    ENVIRONMENT             Deployment environment tag (default: development)
137    RELEASE                 App release/version tag (optional)
138
139    Returns:
140        A :class:`TracingProvider` instance ready to use.
141    """
142    global _tracer_instance
143    if _tracer_instance is not None and not reset:
144        return _tracer_instance
145
146    config = _config_from_env()
147    provider_key = config.provider.lower().strip()
148    provider_cls = _PROVIDERS.get(provider_key, NullTracer)
149
150    if provider_key not in _PROVIDERS:
151        _logger.warning(
152            "Unknown TRACING_PROVIDER — falling back to NullTracer",
153            provider=provider_key,
154            supported=list(_PROVIDERS),
155        )
156
157    _tracer_instance = provider_cls(config) if provider_cls is not NullTracer else NullTracer()
158    return _tracer_instance

Return the process-wide tracing provider, initialising it on first call.

Reads configuration from environment variables. Pass reset=True to force re-initialisation (useful in tests).

Environment variables ~~~~~ TRACING_PROVIDER Which backend to use. One of: null, splunk, langfuse. Defaults to null when not set.

(Splunk) SPLUNK_OTLP_ENDPOINT OTLP/HTTP export endpoint SPLUNK_ACCESS_TOKEN Splunk Observability Cloud access token SPLUNK_SERVICE_NAME Service name shown in APM (default: gmf-forge-ai)

(Langfuse) LANGFUSE_PUBLIC_KEY Langfuse public key LANGFUSE_SECRET_KEY Langfuse secret key LANGFUSE_HOST Langfuse host (default: https://cloud.langfuse.com)

(Common) ENVIRONMENT Deployment environment tag (default: development) RELEASE App release/version tag (optional)

Returns: A TracingProvider instance ready to use.

def traced( func: Optional[~F] = None, *, name: Optional[str] = None, capture_input: bool = True, capture_output: bool = True, metadata_fn: Optional[Callable[[Any, dict], NoneType]] = None, tags: Optional[list[str]] = None) -> Union[~F, Callable[[~F], ~F]]:
 67def traced(
 68    func: Optional[F] = None,
 69    *,
 70    name: Optional[str] = None,
 71    capture_input: bool = True,
 72    capture_output: bool = True,
 73    metadata_fn: Optional[Callable[[Any, dict], None]] = None,
 74    tags: Optional[list[str]] = None,
 75) -> Union[F, Callable[[F], F]]:
 76    """
 77    Decorator to open a root trace around a function.
 78
 79    If a parent span is already active (e.g., called from within @trace_span),
 80    this still opens a new root trace.
 81
 82    Args:
 83        func: The function to decorate (when used without parens).
 84        name: Override trace name; defaults to function name.
 85        capture_input: Include function args in trace input (default: True).
 86        capture_output: Include return value in trace output (default: True).
 87        metadata_fn: Callback(result, metadata_dict) to add custom metadata.
 88        tags: List of string tags to attach to the trace.
 89
 90    Example::
 91
 92        @traced(name="rag_pipeline", tags=["rag"])
 93        async def answer(question: str) -> str:
 94            ...
 95    """
 96
 97    def decorator(fn: F) -> F:
 98        trace_name = name or fn.__name__
 99
100        if asyncio.iscoroutinefunction(fn):
101
102            @functools.wraps(fn)
103            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
104                from . import get_tracer
105                tracer = get_tracer()
106                input_data = (args, kwargs) if capture_input else None
107                with tracer.trace(
108                    trace_name,
109                    input=input_data,
110                    tags=tags,
111                ) as trace:
112                    try:
113                        result = await fn(*args, **kwargs)
114                    except Exception as exc:
115                        trace.set_error(exc)
116                        raise
117                    if capture_output:
118                        trace.set_output(result)
119                    if metadata_fn is not None:
120                        meta: dict[str, Any] = {}
121                        metadata_fn(result, meta)
122                        trace.set_metadata(**meta)
123                    return result
124
125            return async_wrapper  # type: ignore[return-value]
126        else:
127
128            @functools.wraps(fn)
129            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
130                from . import get_tracer
131                tracer = get_tracer()
132                input_data = (args, kwargs) if capture_input else None
133                with tracer.trace(
134                    trace_name,
135                    input=input_data,
136                    tags=tags,
137                ) as trace:
138                    try:
139                        result = fn(*args, **kwargs)
140                    except Exception as exc:
141                        trace.set_error(exc)
142                        raise
143                    if capture_output:
144                        trace.set_output(result)
145                    if metadata_fn is not None:
146                        meta: dict[str, Any] = {}
147                        metadata_fn(result, meta)
148                        trace.set_metadata(**meta)
149                    return result
150
151            return sync_wrapper  # type: ignore[return-value]
152
153    if func is not None:
154        # Called as @traced without parens
155        return decorator(func)
156    # Called as @traced(...) with parens
157    return decorator

Decorator to open a root trace around a function.

If a parent span is already active (e.g., called from within @trace_span), this still opens a new root trace.

Args: func: The function to decorate (when used without parens). name: Override trace name; defaults to function name. capture_input: Include function args in trace input (default: True). capture_output: Include return value in trace output (default: True). metadata_fn: Callback(result, metadata_dict) to add custom metadata. tags: List of string tags to attach to the trace.

Example::

@traced(name="rag_pipeline", tags=["rag"])
async def answer(question: str) -> str:
    ...
def trace_span( func: Optional[~F] = None, *, name: Optional[str] = None, capture_input: bool = True, capture_output: bool = True, metadata_fn: Optional[Callable[[Any, dict], NoneType]] = None) -> Union[~F, Callable[[~F], ~F]]:
183def trace_span(
184    func: Optional[F] = None,
185    *,
186    name: Optional[str] = None,
187    capture_input: bool = True,
188    capture_output: bool = True,
189    metadata_fn: Optional[Callable[[Any, dict], None]] = None,
190) -> Union[F, Callable[[F], F]]:
191    """
192    Decorator to open a child span around a function.
193
194    If no parent span is active, creates a root trace (failsafe).
195    If a parent span is active, creates a child span under it.
196
197    Args:
198        func: The function to decorate (when used without parens).
199        name: Override span name; defaults to function name.
200        capture_input: Include function args in span input (default: True).
201        capture_output: Include return value in span output (default: True).
202        metadata_fn: Callback(result, metadata_dict) to add custom metadata.
203
204    Example::
205
206        @trace_span(name="retrieval")
207        async def retrieve(question: str) -> list[str]:
208            ...
209    """
210
211    def decorator(fn: F) -> F:
212        span_name = name or fn.__name__
213
214        if asyncio.iscoroutinefunction(fn):
215
216            @functools.wraps(fn)
217            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
218                active = _get_active_span()
219                input_data = (args, kwargs) if capture_input else None
220
221                if active is not None:
222                    # Parent span exists — create child
223                    with active.span(span_name, input=input_data) as span:
224                        try:
225                            result = await fn(*args, **kwargs)
226                        except Exception as exc:
227                            span.set_error(exc)
228                            raise
229                        if capture_output:
230                            span.set_output(result)
231                        if metadata_fn is not None:
232                            meta: dict[str, Any] = {}
233                            metadata_fn(result, meta)
234                            span.set_metadata(**meta)
235                        return result
236                else:
237                    # No parent — create root trace as fallback
238                    from . import get_tracer
239                    tracer = get_tracer()
240                    with tracer.trace(span_name, input=input_data) as trace:
241                        try:
242                            result = await fn(*args, **kwargs)
243                        except Exception as exc:
244                            trace.set_error(exc)
245                            raise
246                        if capture_output:
247                            trace.set_output(result)
248                        if metadata_fn is not None:
249                            meta: dict[str, Any] = {}
250                            metadata_fn(result, meta)
251                            trace.set_metadata(**meta)
252                        return result
253
254            return async_wrapper  # type: ignore[return-value]
255        else:
256
257            @functools.wraps(fn)
258            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
259                active = _get_active_span()
260                input_data = (args, kwargs) if capture_input else None
261
262                if active is not None:
263                    # Parent span exists — create child
264                    with active.span(span_name, input=input_data) as span:
265                        try:
266                            result = fn(*args, **kwargs)
267                        except Exception as exc:
268                            span.set_error(exc)
269                            raise
270                        if capture_output:
271                            span.set_output(result)
272                        if metadata_fn is not None:
273                            meta: dict[str, Any] = {}
274                            metadata_fn(result, meta)
275                            span.set_metadata(**meta)
276                        return result
277                else:
278                    # No parent — create root trace as fallback
279                    from . import get_tracer
280                    tracer = get_tracer()
281                    with tracer.trace(span_name, input=input_data) as trace:
282                        try:
283                            result = fn(*args, **kwargs)
284                        except Exception as exc:
285                            trace.set_error(exc)
286                            raise
287                        if capture_output:
288                            trace.set_output(result)
289                        if metadata_fn is not None:
290                            meta: dict[str, Any] = {}
291                            metadata_fn(result, meta)
292                            trace.set_metadata(**meta)
293                        return result
294
295            return sync_wrapper  # type: ignore[return-value]
296
297    if func is not None:
298        return decorator(func)
299    return decorator

Decorator to open a child span around a function.

If no parent span is active, creates a root trace (failsafe). If a parent span is active, creates a child span under it.

Args: func: The function to decorate (when used without parens). name: Override span name; defaults to function name. capture_input: Include function args in span input (default: True). capture_output: Include return value in span output (default: True). metadata_fn: Callback(result, metadata_dict) to add custom metadata.

Example::

@trace_span(name="retrieval")
async def retrieve(question: str) -> list[str]:
    ...
def trace_generation( func: Optional[~F] = None, *, name: Optional[str] = None, model: Optional[str] = None, capture_input: bool = True, capture_output: bool = True, capture_usage: bool = True, metadata_fn: Optional[Callable[[Any, dict], NoneType]] = None) -> Union[~F, Callable[[~F], ~F]]:
327def trace_generation(
328    func: Optional[F] = None,
329    *,
330    name: Optional[str] = None,
331    model: Optional[str] = None,
332    capture_input: bool = True,
333    capture_output: bool = True,
334    capture_usage: bool = True,
335    metadata_fn: Optional[Callable[[Any, dict], None]] = None,
336) -> Union[F, Callable[[F], F]]:
337    """
338    Decorator to open a generation (LLM) span around a function.
339
340    Use for any LLM call. Automatically creates a child span if a parent
341    is active; otherwise creates a root trace.
342
343    Args:
344        func: The function to decorate (when used without parens).
345        name: Override span name; defaults to function name.
346        model: Model name to attach (e.g. "gpt-4o").
347        capture_input: Include function args in span input (default: True).
348        capture_output: Include return value in span output (default: True).
349        capture_usage: Try to extract token usage from return dict (default: True).
350        metadata_fn: Callback(result, metadata_dict) to add custom metadata.
351
352    Example::
353
354        @trace_generation(name="llm_call", model="gpt-4o")
355        async def call_llm(prompt: str) -> dict[str, Any]:
356            # Should return {"text": "...", "usage": {"prompt_tokens": 50, ...}}
357            ...
358    """
359
360    def decorator(fn: F) -> F:
361        gen_name = name or fn.__name__
362
363        if asyncio.iscoroutinefunction(fn):
364
365            @functools.wraps(fn)
366            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
367                active = _get_active_span()
368                input_data = (args, kwargs) if capture_input else None
369
370                if active is not None:
371                    # Parent span exists — create child generation
372                    with active.generation(gen_name, model=model or "", input=input_data) as gen:
373                        try:
374                            result = await fn(*args, **kwargs)
375                        except Exception as exc:
376                            gen.set_error(exc)
377                            raise
378
379                        if capture_output:
380                            gen.set_output(result)
381
382                        # Try to extract token usage if result is a dict
383                        if capture_usage and isinstance(result, dict) and "usage" in result:
384                            usage = result["usage"]
385                            gen.set_token_usage(
386                                prompt_tokens=usage.get("prompt_tokens", 0),
387                                completion_tokens=usage.get("completion_tokens", 0),
388                                total_tokens=usage.get("total_tokens"),
389                            )
390
391                        if metadata_fn is not None:
392                            meta: dict[str, Any] = {}
393                            metadata_fn(result, meta)
394                            gen.set_metadata(**meta)
395
396                        return result
397                else:
398                    # No parent — create root trace as fallback
399                    from . import get_tracer
400                    tracer = get_tracer()
401                    with tracer.trace(gen_name, input=input_data) as trace:
402                        try:
403                            result = await fn(*args, **kwargs)
404                        except Exception as exc:
405                            trace.set_error(exc)
406                            raise
407
408                        if capture_output:
409                            trace.set_output(result)
410
411                        if capture_usage and isinstance(result, dict) and "usage" in result:
412                            usage = result["usage"]
413                            trace.set_token_usage(
414                                prompt_tokens=usage.get("prompt_tokens", 0),
415                                completion_tokens=usage.get("completion_tokens", 0),
416                                total_tokens=usage.get("total_tokens"),
417                            )
418
419                        if metadata_fn is not None:
420                            meta: dict[str, Any] = {}
421                            metadata_fn(result, meta)
422                            trace.set_metadata(**meta)
423
424                        return result
425
426            return async_wrapper  # type: ignore[return-value]
427        else:
428
429            @functools.wraps(fn)
430            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
431                active = _get_active_span()
432                input_data = (args, kwargs) if capture_input else None
433
434                if active is not None:
435                    # Parent span exists — create child generation
436                    with active.generation(gen_name, model=model or "", input=input_data) as gen:
437                        try:
438                            result = fn(*args, **kwargs)
439                        except Exception as exc:
440                            gen.set_error(exc)
441                            raise
442
443                        if capture_output:
444                            gen.set_output(result)
445
446                        if capture_usage and isinstance(result, dict) and "usage" in result:
447                            usage = result["usage"]
448                            gen.set_token_usage(
449                                prompt_tokens=usage.get("prompt_tokens", 0),
450                                completion_tokens=usage.get("completion_tokens", 0),
451                                total_tokens=usage.get("total_tokens"),
452                            )
453
454                        if metadata_fn is not None:
455                            meta: dict[str, Any] = {}
456                            metadata_fn(result, meta)
457                            gen.set_metadata(**meta)
458
459                        return result
460                else:
461                    # No parent — create root trace as fallback
462                    from . import get_tracer
463                    tracer = get_tracer()
464                    with tracer.trace(gen_name, input=input_data) as trace:
465                        try:
466                            result = fn(*args, **kwargs)
467                        except Exception as exc:
468                            trace.set_error(exc)
469                            raise
470
471                        if capture_output:
472                            trace.set_output(result)
473
474                        if capture_usage and isinstance(result, dict) and "usage" in result:
475                            usage = result["usage"]
476                            trace.set_token_usage(
477                                prompt_tokens=usage.get("prompt_tokens", 0),
478                                completion_tokens=usage.get("completion_tokens", 0),
479                                total_tokens=usage.get("total_tokens"),
480                            )
481
482                        if metadata_fn is not None:
483                            meta: dict[str, Any] = {}
484                            metadata_fn(result, meta)
485                            trace.set_metadata(**meta)
486
487                        return result
488
489            return sync_wrapper  # type: ignore[return-value]
490
491    if func is not None:
492        return decorator(func)
493    return decorator

Decorator to open a generation (LLM) span around a function.

Use for any LLM call. Automatically creates a child span if a parent is active; otherwise creates a root trace.

Args: func: The function to decorate (when used without parens). name: Override span name; defaults to function name. model: Model name to attach (e.g. "gpt-4o"). capture_input: Include function args in span input (default: True). capture_output: Include return value in span output (default: True). capture_usage: Try to extract token usage from return dict (default: True). metadata_fn: Callback(result, metadata_dict) to add custom metadata.

Example::

@trace_generation(name="llm_call", model="gpt-4o")
async def call_llm(prompt: str) -> dict[str, Any]:
    # Should return {"text": "...", "usage": {"prompt_tokens": 50, ...}}
    ...