gmf_forge_ai_shared_core.llm_gateway
LLM Gateway - Unified interface for multiple LLM providers.
15class UnifiedLLMGateway: 16 """ 17 Unified interface for multiple LLM providers. 18 19 Supports Azure OpenAI (primary), OpenAI, Anthropic, Cohere, vLLM, and Ollama providers. 20 Can use a provider registry for flexible multi-provider setups or a single default provider. 21 22 **Performance Monitoring:** 23 Optionally integrate BasicPerformanceMonitor for automatic LLM performance tracking. 24 When enabled, the gateway automatically tracks latency, token usage, and throughput. 25 26 Example with single provider (simple): 27 >>> gateway = UnifiedLLMGateway(default_provider=azure_provider) 28 >>> response = await gateway.complete("What is RAG?") 29 30 Example with registry (multi-provider): 31 >>> registry = LLMProviderRegistry() 32 >>> registry.register("azure", azure_provider, is_default=True) 33 >>> registry.register("ollama", ollama_provider) 34 >>> gateway = UnifiedLLMGateway(provider_registry=registry) 35 >>> 36 >>> # Use default provider (azure) 37 >>> response = await gateway.complete("What is RAG?") 38 >>> 39 >>> # Use specific provider (ollama) 40 >>> response = await gateway.complete("What is RAG?", provider="ollama") 41 42 Example with performance monitoring (explicit opt-in): 43 >>> from gmf_forge_ai_shared_core.observability import BasicPerformanceMonitor 44 >>> 45 >>> monitor = BasicPerformanceMonitor() 46 >>> gateway = UnifiedLLMGateway( 47 ... default_provider=azure_provider, 48 ... performance_monitor=monitor # Enable automatic performance tracking 49 ... ) 50 >>> 51 >>> response = await gateway.complete("What is RAG?") 52 >>> # Performance automatically tracked! No manual token counting needed. 53 >>> 54 >>> # View performance statistics 55 >>> stats = monitor.get_stats() 56 >>> print(f"Average latency: {stats['avg_latency_ms']:.2f}ms") 57 >>> print(f"Total tokens used: {stats['total_tokens']}") 58 """ 59 60 def __init__( 61 self, 62 default_provider: Optional[BaseProvider] = None, 63 fallback_providers: Optional[List[BaseProvider]] = None, 64 provider_registry: Optional["LLMProviderRegistry"] = None, 65 performance_monitor: Optional["BasicPerformanceMonitor"] = None, 66 ): 67 """ 68 Initialize the unified LLM gateway. 69 70 Args: 71 default_provider: Primary provider to use (for simple single-provider setup) 72 fallback_providers: List of fallback providers if primary fails 73 provider_registry: Provider registry for multi-provider setup (recommended) 74 performance_monitor: Optional performance monitor for automatic tracking. 75 When provided, gateway automatically tracks latency and token usage. 76 """ 77 self.default_provider = default_provider 78 self.fallback_providers = fallback_providers or [] 79 self.provider_registry = provider_registry 80 self.performance_monitor = performance_monitor 81 self._providers: Dict[str, BaseProvider] = {} 82 83 if default_provider: 84 self.register_provider(default_provider.name, default_provider) 85 86 def register_provider(self, name: str, provider: BaseProvider) -> None: 87 """Register a new provider.""" 88 self._providers[name] = provider 89 90 def _get_provider(self, provider_name: Optional[str] = None) -> BaseProvider: 91 """ 92 Get a provider by name or return the default. 93 94 Args: 95 provider_name: Optional provider name. If None, uses default. 96 97 Returns: 98 Provider instance 99 100 Raises: 101 ValueError: If provider not found or no default configured 102 """ 103 # If provider name specified, try to get it from registry first 104 if provider_name: 105 if self.provider_registry and self.provider_registry.has_provider(provider_name): 106 return self.provider_registry.get(provider_name) 107 elif provider_name in self._providers: 108 return self._providers[provider_name] 109 else: 110 raise ValueError(f"Provider '{provider_name}' not found") 111 112 # Otherwise, get default provider 113 if self.provider_registry: 114 try: 115 return self.provider_registry.get_default() 116 except RuntimeError: 117 pass # Fall through to try self.default_provider 118 119 if self.default_provider: 120 return self.default_provider 121 122 raise ValueError("No default provider configured") 123 124 async def complete( 125 self, 126 prompt: str, 127 model: Optional[str] = None, 128 provider: Optional[str] = None, 129 **kwargs: Any 130 ) -> Any: 131 """ 132 Generate a completion using the configured provider. 133 134 When performance_monitor is enabled, automatically tracks: 135 - Request latency 136 - Token usage (prompt, completion, total) 137 - Tokens per second 138 - Success/failure status 139 140 Args: 141 prompt: The prompt to complete 142 model: Optional model name override 143 provider: Optional provider name (e.g., "azure", "ollama") 144 **kwargs: Additional provider-specific parameters 145 146 Returns: 147 CompletionResponse with content, model, usage, and metadata 148 """ 149 selected_provider = self._get_provider(provider) 150 151 # Start performance tracking if monitor is enabled 152 request_id = None 153 if self.performance_monitor is not None: 154 request_id = self.performance_monitor.start_request( 155 provider=selected_provider.name, 156 model=model or "default", 157 prompt_preview=prompt[:50] if len(prompt) > 50 else prompt 158 ) 159 160 try: 161 # Make the LLM call 162 response = await selected_provider.complete(prompt=prompt, model=model, **kwargs) 163 164 # End performance tracking on success 165 if self.performance_monitor is not None and request_id: 166 self.performance_monitor.end_request( 167 request_id=request_id, 168 prompt_tokens=response.usage.get("prompt_tokens", 0), 169 completion_tokens=response.usage.get("completion_tokens", 0), 170 success=True 171 ) 172 173 return response 174 175 except Exception as e: 176 # End performance tracking on failure 177 if self.performance_monitor is not None and request_id: 178 self.performance_monitor.end_request( 179 request_id=request_id, 180 prompt_tokens=0, 181 completion_tokens=0, 182 success=False, 183 error=str(e) 184 ) 185 186 # Try fallback providers only if no specific provider was requested 187 if not provider: 188 for fallback in self.fallback_providers: 189 # Start tracking for fallback attempt 190 fallback_request_id = None 191 if self.performance_monitor is not None: 192 fallback_request_id = self.performance_monitor.start_request( 193 provider=fallback.name, 194 model=model or "default", 195 fallback_attempt=True 196 ) 197 198 try: 199 response = await fallback.complete(prompt=prompt, model=model, **kwargs) 200 201 # Track fallback success 202 if self.performance_monitor is not None and fallback_request_id: 203 self.performance_monitor.end_request( 204 request_id=fallback_request_id, 205 prompt_tokens=response.usage.get("prompt_tokens", 0), 206 completion_tokens=response.usage.get("completion_tokens", 0), 207 success=True 208 ) 209 210 return response 211 except Exception: 212 # Track fallback failure 213 if self.performance_monitor is not None and fallback_request_id: 214 self.performance_monitor.end_request( 215 request_id=fallback_request_id, 216 prompt_tokens=0, 217 completion_tokens=0, 218 success=False 219 ) 220 continue 221 222 # If all providers fail, raise the original exception 223 raise e
Unified interface for multiple LLM providers.
Supports Azure OpenAI (primary), OpenAI, Anthropic, Cohere, vLLM, and Ollama providers. Can use a provider registry for flexible multi-provider setups or a single default provider.
Performance Monitoring: Optionally integrate BasicPerformanceMonitor for automatic LLM performance tracking. When enabled, the gateway automatically tracks latency, token usage, and throughput.
Example with single provider (simple):
gateway = UnifiedLLMGateway(default_provider=azure_provider) response = await gateway.complete("What is RAG?")
Example with registry (multi-provider):
registry = LLMProviderRegistry() registry.register("azure", azure_provider, is_default=True) registry.register("ollama", ollama_provider) gateway = UnifiedLLMGateway(provider_registry=registry)
Use default provider (azure)
response = await gateway.complete("What is RAG?")
Use specific provider (ollama)
response = await gateway.complete("What is RAG?", provider="ollama")
Example with performance monitoring (explicit opt-in):
from gmf_forge_ai_shared_core.observability import BasicPerformanceMonitor
monitor = BasicPerformanceMonitor() gateway = UnifiedLLMGateway( ... default_provider=azure_provider, ... performance_monitor=monitor # Enable automatic performance tracking ... )
response = await gateway.complete("What is RAG?")
Performance automatically tracked! No manual token counting needed.
View performance statistics
stats = monitor.get_stats() print(f"Average latency: {stats['avg_latency_ms']:.2f}ms") print(f"Total tokens used: {stats['total_tokens']}")
60 def __init__( 61 self, 62 default_provider: Optional[BaseProvider] = None, 63 fallback_providers: Optional[List[BaseProvider]] = None, 64 provider_registry: Optional["LLMProviderRegistry"] = None, 65 performance_monitor: Optional["BasicPerformanceMonitor"] = None, 66 ): 67 """ 68 Initialize the unified LLM gateway. 69 70 Args: 71 default_provider: Primary provider to use (for simple single-provider setup) 72 fallback_providers: List of fallback providers if primary fails 73 provider_registry: Provider registry for multi-provider setup (recommended) 74 performance_monitor: Optional performance monitor for automatic tracking. 75 When provided, gateway automatically tracks latency and token usage. 76 """ 77 self.default_provider = default_provider 78 self.fallback_providers = fallback_providers or [] 79 self.provider_registry = provider_registry 80 self.performance_monitor = performance_monitor 81 self._providers: Dict[str, BaseProvider] = {} 82 83 if default_provider: 84 self.register_provider(default_provider.name, default_provider)
Initialize the unified LLM gateway.
Args: default_provider: Primary provider to use (for simple single-provider setup) fallback_providers: List of fallback providers if primary fails provider_registry: Provider registry for multi-provider setup (recommended) performance_monitor: Optional performance monitor for automatic tracking. When provided, gateway automatically tracks latency and token usage.
86 def register_provider(self, name: str, provider: BaseProvider) -> None: 87 """Register a new provider.""" 88 self._providers[name] = provider
Register a new provider.
124 async def complete( 125 self, 126 prompt: str, 127 model: Optional[str] = None, 128 provider: Optional[str] = None, 129 **kwargs: Any 130 ) -> Any: 131 """ 132 Generate a completion using the configured provider. 133 134 When performance_monitor is enabled, automatically tracks: 135 - Request latency 136 - Token usage (prompt, completion, total) 137 - Tokens per second 138 - Success/failure status 139 140 Args: 141 prompt: The prompt to complete 142 model: Optional model name override 143 provider: Optional provider name (e.g., "azure", "ollama") 144 **kwargs: Additional provider-specific parameters 145 146 Returns: 147 CompletionResponse with content, model, usage, and metadata 148 """ 149 selected_provider = self._get_provider(provider) 150 151 # Start performance tracking if monitor is enabled 152 request_id = None 153 if self.performance_monitor is not None: 154 request_id = self.performance_monitor.start_request( 155 provider=selected_provider.name, 156 model=model or "default", 157 prompt_preview=prompt[:50] if len(prompt) > 50 else prompt 158 ) 159 160 try: 161 # Make the LLM call 162 response = await selected_provider.complete(prompt=prompt, model=model, **kwargs) 163 164 # End performance tracking on success 165 if self.performance_monitor is not None and request_id: 166 self.performance_monitor.end_request( 167 request_id=request_id, 168 prompt_tokens=response.usage.get("prompt_tokens", 0), 169 completion_tokens=response.usage.get("completion_tokens", 0), 170 success=True 171 ) 172 173 return response 174 175 except Exception as e: 176 # End performance tracking on failure 177 if self.performance_monitor is not None and request_id: 178 self.performance_monitor.end_request( 179 request_id=request_id, 180 prompt_tokens=0, 181 completion_tokens=0, 182 success=False, 183 error=str(e) 184 ) 185 186 # Try fallback providers only if no specific provider was requested 187 if not provider: 188 for fallback in self.fallback_providers: 189 # Start tracking for fallback attempt 190 fallback_request_id = None 191 if self.performance_monitor is not None: 192 fallback_request_id = self.performance_monitor.start_request( 193 provider=fallback.name, 194 model=model or "default", 195 fallback_attempt=True 196 ) 197 198 try: 199 response = await fallback.complete(prompt=prompt, model=model, **kwargs) 200 201 # Track fallback success 202 if self.performance_monitor is not None and fallback_request_id: 203 self.performance_monitor.end_request( 204 request_id=fallback_request_id, 205 prompt_tokens=response.usage.get("prompt_tokens", 0), 206 completion_tokens=response.usage.get("completion_tokens", 0), 207 success=True 208 ) 209 210 return response 211 except Exception: 212 # Track fallback failure 213 if self.performance_monitor is not None and fallback_request_id: 214 self.performance_monitor.end_request( 215 request_id=fallback_request_id, 216 prompt_tokens=0, 217 completion_tokens=0, 218 success=False 219 ) 220 continue 221 222 # If all providers fail, raise the original exception 223 raise e
Generate a completion using the configured provider.
When performance_monitor is enabled, automatically tracks:
- Request latency
- Token usage (prompt, completion, total)
- Tokens per second
- Success/failure status
Args: prompt: The prompt to complete model: Optional model name override provider: Optional provider name (e.g., "azure", "ollama") **kwargs: Additional provider-specific parameters
Returns: CompletionResponse with content, model, usage, and metadata
31class BaseProvider(ABC): 32 """ 33 Abstract base class for LLM providers. 34 35 All providers (Azure OpenAI, OpenAI, Anthropic, etc.) must implement this interface. 36 37 Note: Model registration is handled by LLMProviderRegistry, not by individual providers. 38 Providers focus on LLM operations (complete, stream, validate). 39 """ 40 41 def __init__(self, name: str): 42 """ 43 Initialize the provider. 44 45 Args: 46 name: Unique identifier for this provider 47 """ 48 self.name = name 49 50 @abstractmethod 51 async def complete( 52 self, 53 prompt: str, 54 model: Optional[str] = None, 55 temperature: float = 0.7, 56 max_tokens: Optional[int] = None, 57 **kwargs: Any 58 ) -> CompletionResponse: 59 """ 60 Generate a completion. 61 62 Args: 63 prompt: The prompt to complete 64 model: Model name 65 temperature: Sampling temperature (0-1) 66 max_tokens: Maximum tokens to generate 67 **kwargs: Provider-specific parameters 68 69 Returns: 70 CompletionResponse object 71 """ 72 pass 73 74 @abstractmethod 75 async def stream_complete( 76 self, 77 prompt: str, 78 model: Optional[str] = None, 79 temperature: float = 0.7, 80 max_tokens: Optional[int] = None, 81 **kwargs: Any 82 ) -> AsyncIterator[str]: 83 """ 84 Stream a completion. 85 86 Args: 87 prompt: The prompt to complete 88 model: Model name 89 temperature: Sampling temperature (0-1) 90 max_tokens: Maximum tokens to generate 91 **kwargs: Provider-specific parameters 92 93 Yields: 94 Chunks of the completion 95 """ 96 pass 97 98 @abstractmethod 99 async def validate_credentials(self) -> bool: 100 """ 101 Validate that the provider credentials are correct. 102 103 Returns: 104 True if credentials are valid, False otherwise 105 """ 106 pass
Abstract base class for LLM providers.
All providers (Azure OpenAI, OpenAI, Anthropic, etc.) must implement this interface.
Note: Model registration is handled by LLMProviderRegistry, not by individual providers. Providers focus on LLM operations (complete, stream, validate).
41 def __init__(self, name: str): 42 """ 43 Initialize the provider. 44 45 Args: 46 name: Unique identifier for this provider 47 """ 48 self.name = name
Initialize the provider.
Args: name: Unique identifier for this provider
50 @abstractmethod 51 async def complete( 52 self, 53 prompt: str, 54 model: Optional[str] = None, 55 temperature: float = 0.7, 56 max_tokens: Optional[int] = None, 57 **kwargs: Any 58 ) -> CompletionResponse: 59 """ 60 Generate a completion. 61 62 Args: 63 prompt: The prompt to complete 64 model: Model name 65 temperature: Sampling temperature (0-1) 66 max_tokens: Maximum tokens to generate 67 **kwargs: Provider-specific parameters 68 69 Returns: 70 CompletionResponse object 71 """ 72 pass
Generate a completion.
Args: prompt: The prompt to complete model: Model name temperature: Sampling temperature (0-1) max_tokens: Maximum tokens to generate **kwargs: Provider-specific parameters
Returns: CompletionResponse object
74 @abstractmethod 75 async def stream_complete( 76 self, 77 prompt: str, 78 model: Optional[str] = None, 79 temperature: float = 0.7, 80 max_tokens: Optional[int] = None, 81 **kwargs: Any 82 ) -> AsyncIterator[str]: 83 """ 84 Stream a completion. 85 86 Args: 87 prompt: The prompt to complete 88 model: Model name 89 temperature: Sampling temperature (0-1) 90 max_tokens: Maximum tokens to generate 91 **kwargs: Provider-specific parameters 92 93 Yields: 94 Chunks of the completion 95 """ 96 pass
Stream a completion.
Args: prompt: The prompt to complete model: Model name temperature: Sampling temperature (0-1) max_tokens: Maximum tokens to generate **kwargs: Provider-specific parameters
Yields: Chunks of the completion
98 @abstractmethod 99 async def validate_credentials(self) -> bool: 100 """ 101 Validate that the provider credentials are correct. 102 103 Returns: 104 True if credentials are valid, False otherwise 105 """ 106 pass
Validate that the provider credentials are correct.
Returns: True if credentials are valid, False otherwise