gmf_forge_ai_shared_core.llm_gateway

LLM Gateway - Unified interface for multiple LLM providers.

1"""LLM Gateway - Unified interface for multiple LLM providers."""
2
3from gmf_forge_ai_shared_core.llm_gateway.unified_gateway import UnifiedLLMGateway
4from gmf_forge_ai_shared_core.llm_gateway.providers.base_provider import BaseProvider
5
6__all__ = [
7    "UnifiedLLMGateway",
8    "BaseProvider",
9]
class UnifiedLLMGateway:
 15class UnifiedLLMGateway:
 16    """
 17    Unified interface for multiple LLM providers.
 18    
 19    Supports Azure OpenAI (primary), OpenAI, Anthropic, Cohere, vLLM, and Ollama providers.
 20    Can use a provider registry for flexible multi-provider setups or a single default provider.
 21    
 22    **Performance Monitoring:**
 23    Optionally integrate BasicPerformanceMonitor for automatic LLM performance tracking.
 24    When enabled, the gateway automatically tracks latency, token usage, and throughput.
 25    
 26    Example with single provider (simple):
 27        >>> gateway = UnifiedLLMGateway(default_provider=azure_provider)
 28        >>> response = await gateway.complete("What is RAG?")
 29    
 30    Example with registry (multi-provider):
 31        >>> registry = LLMProviderRegistry()
 32        >>> registry.register("azure", azure_provider, is_default=True)
 33        >>> registry.register("ollama", ollama_provider)
 34        >>> gateway = UnifiedLLMGateway(provider_registry=registry)
 35        >>> 
 36        >>> # Use default provider (azure)
 37        >>> response = await gateway.complete("What is RAG?")
 38        >>> 
 39        >>> # Use specific provider (ollama)
 40        >>> response = await gateway.complete("What is RAG?", provider="ollama")
 41    
 42    Example with performance monitoring (explicit opt-in):
 43        >>> from gmf_forge_ai_shared_core.observability import BasicPerformanceMonitor
 44        >>> 
 45        >>> monitor = BasicPerformanceMonitor()
 46        >>> gateway = UnifiedLLMGateway(
 47        ...     default_provider=azure_provider,
 48        ...     performance_monitor=monitor  # Enable automatic performance tracking
 49        ... )
 50        >>> 
 51        >>> response = await gateway.complete("What is RAG?")
 52        >>> # Performance automatically tracked! No manual token counting needed.
 53        >>> 
 54        >>> # View performance statistics
 55        >>> stats = monitor.get_stats()
 56        >>> print(f"Average latency: {stats['avg_latency_ms']:.2f}ms")
 57        >>> print(f"Total tokens used: {stats['total_tokens']}")
 58    """
 59    
 60    def __init__(
 61        self,
 62        default_provider: Optional[BaseProvider] = None,
 63        fallback_providers: Optional[List[BaseProvider]] = None,
 64        provider_registry: Optional["LLMProviderRegistry"] = None,
 65        performance_monitor: Optional["BasicPerformanceMonitor"] = None,
 66    ):
 67        """
 68        Initialize the unified LLM gateway.
 69        
 70        Args:
 71            default_provider: Primary provider to use (for simple single-provider setup)
 72            fallback_providers: List of fallback providers if primary fails
 73            provider_registry: Provider registry for multi-provider setup (recommended)
 74            performance_monitor: Optional performance monitor for automatic tracking.
 75                               When provided, gateway automatically tracks latency and token usage.
 76        """
 77        self.default_provider = default_provider
 78        self.fallback_providers = fallback_providers or []
 79        self.provider_registry = provider_registry
 80        self.performance_monitor = performance_monitor
 81        self._providers: Dict[str, BaseProvider] = {}
 82        
 83        if default_provider:
 84            self.register_provider(default_provider.name, default_provider)
 85    
 86    def register_provider(self, name: str, provider: BaseProvider) -> None:
 87        """Register a new provider."""
 88        self._providers[name] = provider
 89    
 90    def _get_provider(self, provider_name: Optional[str] = None) -> BaseProvider:
 91        """
 92        Get a provider by name or return the default.
 93        
 94        Args:
 95            provider_name: Optional provider name. If None, uses default.
 96            
 97        Returns:
 98            Provider instance
 99            
100        Raises:
101            ValueError: If provider not found or no default configured
102        """
103        # If provider name specified, try to get it from registry first
104        if provider_name:
105            if self.provider_registry and self.provider_registry.has_provider(provider_name):
106                return self.provider_registry.get(provider_name)
107            elif provider_name in self._providers:
108                return self._providers[provider_name]
109            else:
110                raise ValueError(f"Provider '{provider_name}' not found")
111        
112        # Otherwise, get default provider
113        if self.provider_registry:
114            try:
115                return self.provider_registry.get_default()
116            except RuntimeError:
117                pass  # Fall through to try self.default_provider
118        
119        if self.default_provider:
120            return self.default_provider
121        
122        raise ValueError("No default provider configured")
123    
124    async def complete(
125        self,
126        prompt: str,
127        model: Optional[str] = None,
128        provider: Optional[str] = None,
129        **kwargs: Any
130    ) -> Any:
131        """
132        Generate a completion using the configured provider.
133        
134        When performance_monitor is enabled, automatically tracks:
135        - Request latency
136        - Token usage (prompt, completion, total)
137        - Tokens per second
138        - Success/failure status
139        
140        Args:
141            prompt: The prompt to complete
142            model: Optional model name override
143            provider: Optional provider name (e.g., "azure", "ollama")
144            **kwargs: Additional provider-specific parameters
145            
146        Returns:
147            CompletionResponse with content, model, usage, and metadata
148        """
149        selected_provider = self._get_provider(provider)
150        
151        # Start performance tracking if monitor is enabled
152        request_id = None
153        if self.performance_monitor is not None:
154            request_id = self.performance_monitor.start_request(
155                provider=selected_provider.name,
156                model=model or "default",
157                prompt_preview=prompt[:50] if len(prompt) > 50 else prompt
158            )
159        
160        try:
161            # Make the LLM call
162            response = await selected_provider.complete(prompt=prompt, model=model, **kwargs)
163            
164            # End performance tracking on success
165            if self.performance_monitor is not None and request_id:
166                self.performance_monitor.end_request(
167                    request_id=request_id,
168                    prompt_tokens=response.usage.get("prompt_tokens", 0),
169                    completion_tokens=response.usage.get("completion_tokens", 0),
170                    success=True
171                )
172            
173            return response
174            
175        except Exception as e:
176            # End performance tracking on failure
177            if self.performance_monitor is not None and request_id:
178                self.performance_monitor.end_request(
179                    request_id=request_id,
180                    prompt_tokens=0,
181                    completion_tokens=0,
182                    success=False,
183                    error=str(e)
184                )
185            
186            # Try fallback providers only if no specific provider was requested
187            if not provider:
188                for fallback in self.fallback_providers:
189                    # Start tracking for fallback attempt
190                    fallback_request_id = None
191                    if self.performance_monitor is not None:
192                        fallback_request_id = self.performance_monitor.start_request(
193                            provider=fallback.name,
194                            model=model or "default",
195                            fallback_attempt=True
196                        )
197                    
198                    try:
199                        response = await fallback.complete(prompt=prompt, model=model, **kwargs)
200                        
201                        # Track fallback success
202                        if self.performance_monitor is not None and fallback_request_id:
203                            self.performance_monitor.end_request(
204                                request_id=fallback_request_id,
205                                prompt_tokens=response.usage.get("prompt_tokens", 0),
206                                completion_tokens=response.usage.get("completion_tokens", 0),
207                                success=True
208                            )
209                        
210                        return response
211                    except Exception:
212                        # Track fallback failure
213                        if self.performance_monitor is not None and fallback_request_id:
214                            self.performance_monitor.end_request(
215                                request_id=fallback_request_id,
216                                prompt_tokens=0,
217                                completion_tokens=0,
218                                success=False
219                            )
220                        continue
221            
222            # If all providers fail, raise the original exception
223            raise e

Unified interface for multiple LLM providers.

Supports Azure OpenAI (primary), OpenAI, Anthropic, Cohere, vLLM, and Ollama providers. Can use a provider registry for flexible multi-provider setups or a single default provider.

Performance Monitoring: Optionally integrate BasicPerformanceMonitor for automatic LLM performance tracking. When enabled, the gateway automatically tracks latency, token usage, and throughput.

Example with single provider (simple):

gateway = UnifiedLLMGateway(default_provider=azure_provider) response = await gateway.complete("What is RAG?")

Example with registry (multi-provider):

registry = LLMProviderRegistry() registry.register("azure", azure_provider, is_default=True) registry.register("ollama", ollama_provider) gateway = UnifiedLLMGateway(provider_registry=registry)

Use default provider (azure)

response = await gateway.complete("What is RAG?")

Use specific provider (ollama)

response = await gateway.complete("What is RAG?", provider="ollama")

Example with performance monitoring (explicit opt-in):

from gmf_forge_ai_shared_core.observability import BasicPerformanceMonitor

monitor = BasicPerformanceMonitor() gateway = UnifiedLLMGateway( ... default_provider=azure_provider, ... performance_monitor=monitor # Enable automatic performance tracking ... )

response = await gateway.complete("What is RAG?")

Performance automatically tracked! No manual token counting needed.

View performance statistics

stats = monitor.get_stats() print(f"Average latency: {stats['avg_latency_ms']:.2f}ms") print(f"Total tokens used: {stats['total_tokens']}")

UnifiedLLMGateway( default_provider: Optional[BaseProvider] = None, fallback_providers: Optional[List[BaseProvider]] = None, provider_registry: Optional[gmf_forge_ai_shared_core.registry.LLMProviderRegistry] = None, performance_monitor: Optional[gmf_forge_ai_shared_core.observability.BasicPerformanceMonitor] = None)
60    def __init__(
61        self,
62        default_provider: Optional[BaseProvider] = None,
63        fallback_providers: Optional[List[BaseProvider]] = None,
64        provider_registry: Optional["LLMProviderRegistry"] = None,
65        performance_monitor: Optional["BasicPerformanceMonitor"] = None,
66    ):
67        """
68        Initialize the unified LLM gateway.
69        
70        Args:
71            default_provider: Primary provider to use (for simple single-provider setup)
72            fallback_providers: List of fallback providers if primary fails
73            provider_registry: Provider registry for multi-provider setup (recommended)
74            performance_monitor: Optional performance monitor for automatic tracking.
75                               When provided, gateway automatically tracks latency and token usage.
76        """
77        self.default_provider = default_provider
78        self.fallback_providers = fallback_providers or []
79        self.provider_registry = provider_registry
80        self.performance_monitor = performance_monitor
81        self._providers: Dict[str, BaseProvider] = {}
82        
83        if default_provider:
84            self.register_provider(default_provider.name, default_provider)

Initialize the unified LLM gateway.

Args: default_provider: Primary provider to use (for simple single-provider setup) fallback_providers: List of fallback providers if primary fails provider_registry: Provider registry for multi-provider setup (recommended) performance_monitor: Optional performance monitor for automatic tracking. When provided, gateway automatically tracks latency and token usage.

default_provider
fallback_providers
provider_registry
performance_monitor
def register_provider( self, name: str, provider: BaseProvider) -> None:
86    def register_provider(self, name: str, provider: BaseProvider) -> None:
87        """Register a new provider."""
88        self._providers[name] = provider

Register a new provider.

async def complete( self, prompt: str, model: Optional[str] = None, provider: Optional[str] = None, **kwargs: Any) -> Any:
124    async def complete(
125        self,
126        prompt: str,
127        model: Optional[str] = None,
128        provider: Optional[str] = None,
129        **kwargs: Any
130    ) -> Any:
131        """
132        Generate a completion using the configured provider.
133        
134        When performance_monitor is enabled, automatically tracks:
135        - Request latency
136        - Token usage (prompt, completion, total)
137        - Tokens per second
138        - Success/failure status
139        
140        Args:
141            prompt: The prompt to complete
142            model: Optional model name override
143            provider: Optional provider name (e.g., "azure", "ollama")
144            **kwargs: Additional provider-specific parameters
145            
146        Returns:
147            CompletionResponse with content, model, usage, and metadata
148        """
149        selected_provider = self._get_provider(provider)
150        
151        # Start performance tracking if monitor is enabled
152        request_id = None
153        if self.performance_monitor is not None:
154            request_id = self.performance_monitor.start_request(
155                provider=selected_provider.name,
156                model=model or "default",
157                prompt_preview=prompt[:50] if len(prompt) > 50 else prompt
158            )
159        
160        try:
161            # Make the LLM call
162            response = await selected_provider.complete(prompt=prompt, model=model, **kwargs)
163            
164            # End performance tracking on success
165            if self.performance_monitor is not None and request_id:
166                self.performance_monitor.end_request(
167                    request_id=request_id,
168                    prompt_tokens=response.usage.get("prompt_tokens", 0),
169                    completion_tokens=response.usage.get("completion_tokens", 0),
170                    success=True
171                )
172            
173            return response
174            
175        except Exception as e:
176            # End performance tracking on failure
177            if self.performance_monitor is not None and request_id:
178                self.performance_monitor.end_request(
179                    request_id=request_id,
180                    prompt_tokens=0,
181                    completion_tokens=0,
182                    success=False,
183                    error=str(e)
184                )
185            
186            # Try fallback providers only if no specific provider was requested
187            if not provider:
188                for fallback in self.fallback_providers:
189                    # Start tracking for fallback attempt
190                    fallback_request_id = None
191                    if self.performance_monitor is not None:
192                        fallback_request_id = self.performance_monitor.start_request(
193                            provider=fallback.name,
194                            model=model or "default",
195                            fallback_attempt=True
196                        )
197                    
198                    try:
199                        response = await fallback.complete(prompt=prompt, model=model, **kwargs)
200                        
201                        # Track fallback success
202                        if self.performance_monitor is not None and fallback_request_id:
203                            self.performance_monitor.end_request(
204                                request_id=fallback_request_id,
205                                prompt_tokens=response.usage.get("prompt_tokens", 0),
206                                completion_tokens=response.usage.get("completion_tokens", 0),
207                                success=True
208                            )
209                        
210                        return response
211                    except Exception:
212                        # Track fallback failure
213                        if self.performance_monitor is not None and fallback_request_id:
214                            self.performance_monitor.end_request(
215                                request_id=fallback_request_id,
216                                prompt_tokens=0,
217                                completion_tokens=0,
218                                success=False
219                            )
220                        continue
221            
222            # If all providers fail, raise the original exception
223            raise e

Generate a completion using the configured provider.

When performance_monitor is enabled, automatically tracks:

  • Request latency
  • Token usage (prompt, completion, total)
  • Tokens per second
  • Success/failure status

Args: prompt: The prompt to complete model: Optional model name override provider: Optional provider name (e.g., "azure", "ollama") **kwargs: Additional provider-specific parameters

Returns: CompletionResponse with content, model, usage, and metadata

class BaseProvider(abc.ABC):
 31class BaseProvider(ABC):
 32    """
 33    Abstract base class for LLM providers.
 34    
 35    All providers (Azure OpenAI, OpenAI, Anthropic, etc.) must implement this interface.
 36    
 37    Note: Model registration is handled by LLMProviderRegistry, not by individual providers.
 38    Providers focus on LLM operations (complete, stream, validate).
 39    """
 40    
 41    def __init__(self, name: str):
 42        """
 43        Initialize the provider.
 44        
 45        Args:
 46            name: Unique identifier for this provider
 47        """
 48        self.name = name
 49    
 50    @abstractmethod
 51    async def complete(
 52        self,
 53        prompt: str,
 54        model: Optional[str] = None,
 55        temperature: float = 0.7,
 56        max_tokens: Optional[int] = None,
 57        **kwargs: Any
 58    ) -> CompletionResponse:
 59        """
 60        Generate a completion.
 61        
 62        Args:
 63            prompt: The prompt to complete
 64            model: Model name
 65            temperature: Sampling temperature (0-1)
 66            max_tokens: Maximum tokens to generate
 67            **kwargs: Provider-specific parameters
 68            
 69        Returns:
 70            CompletionResponse object
 71        """
 72        pass
 73    
 74    @abstractmethod
 75    async def stream_complete(
 76        self,
 77        prompt: str,
 78        model: Optional[str] = None,
 79        temperature: float = 0.7,
 80        max_tokens: Optional[int] = None,
 81        **kwargs: Any
 82    ) -> AsyncIterator[str]:
 83        """
 84        Stream a completion.
 85        
 86        Args:
 87            prompt: The prompt to complete
 88            model: Model name
 89            temperature: Sampling temperature (0-1)
 90            max_tokens: Maximum tokens to generate
 91            **kwargs: Provider-specific parameters
 92            
 93        Yields:
 94            Chunks of the completion
 95        """
 96        pass
 97    
 98    @abstractmethod
 99    async def validate_credentials(self) -> bool:
100        """
101        Validate that the provider credentials are correct.
102        
103        Returns:
104            True if credentials are valid, False otherwise
105        """
106        pass

Abstract base class for LLM providers.

All providers (Azure OpenAI, OpenAI, Anthropic, etc.) must implement this interface.

Note: Model registration is handled by LLMProviderRegistry, not by individual providers. Providers focus on LLM operations (complete, stream, validate).

BaseProvider(name: str)
41    def __init__(self, name: str):
42        """
43        Initialize the provider.
44        
45        Args:
46            name: Unique identifier for this provider
47        """
48        self.name = name

Initialize the provider.

Args: name: Unique identifier for this provider

name
@abstractmethod
async def complete( self, prompt: str, model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs: Any) -> gmf_forge_ai_shared_core.llm_gateway.providers.CompletionResponse:
50    @abstractmethod
51    async def complete(
52        self,
53        prompt: str,
54        model: Optional[str] = None,
55        temperature: float = 0.7,
56        max_tokens: Optional[int] = None,
57        **kwargs: Any
58    ) -> CompletionResponse:
59        """
60        Generate a completion.
61        
62        Args:
63            prompt: The prompt to complete
64            model: Model name
65            temperature: Sampling temperature (0-1)
66            max_tokens: Maximum tokens to generate
67            **kwargs: Provider-specific parameters
68            
69        Returns:
70            CompletionResponse object
71        """
72        pass

Generate a completion.

Args: prompt: The prompt to complete model: Model name temperature: Sampling temperature (0-1) max_tokens: Maximum tokens to generate **kwargs: Provider-specific parameters

Returns: CompletionResponse object

@abstractmethod
async def stream_complete( self, prompt: str, model: Optional[str] = None, temperature: float = 0.7, max_tokens: Optional[int] = None, **kwargs: Any) -> AsyncIterator[str]:
74    @abstractmethod
75    async def stream_complete(
76        self,
77        prompt: str,
78        model: Optional[str] = None,
79        temperature: float = 0.7,
80        max_tokens: Optional[int] = None,
81        **kwargs: Any
82    ) -> AsyncIterator[str]:
83        """
84        Stream a completion.
85        
86        Args:
87            prompt: The prompt to complete
88            model: Model name
89            temperature: Sampling temperature (0-1)
90            max_tokens: Maximum tokens to generate
91            **kwargs: Provider-specific parameters
92            
93        Yields:
94            Chunks of the completion
95        """
96        pass

Stream a completion.

Args: prompt: The prompt to complete model: Model name temperature: Sampling temperature (0-1) max_tokens: Maximum tokens to generate **kwargs: Provider-specific parameters

Yields: Chunks of the completion

@abstractmethod
async def validate_credentials(self) -> bool:
 98    @abstractmethod
 99    async def validate_credentials(self) -> bool:
100        """
101        Validate that the provider credentials are correct.
102        
103        Returns:
104            True if credentials are valid, False otherwise
105        """
106        pass

Validate that the provider credentials are correct.

Returns: True if credentials are valid, False otherwise