gmf_forge_ai_shared_core.auth

Basic authentication - API key and token validation.

1"""Basic authentication - API key and token validation."""
2
3from gmf_forge_ai_shared_core.auth.api_key_manager import APIKeyManager
4from gmf_forge_ai_shared_core.auth.token_validator import TokenValidator
5
6__all__ = [
7    "APIKeyManager",
8    "TokenValidator",
9]
class APIKeyManager:
10class APIKeyManager:
11    """
12    Manager for API keys used in authentication.
13    
14    Handles creation, validation, and rotation of API keys.
15    """
16    
17    def __init__(self):
18        """Initialize API key manager."""
19        self._keys: Dict[str, Dict[str, any]] = {}
20    
21    def create_key(
22        self,
23        name: str,
24        expiry_days: Optional[int] = None
25    ) -> str:
26        """
27        Create a new API key.
28        
29        Args:
30            name: Identifier for the key
31            expiry_days: Optional expiration in days
32            
33        Returns:
34            The generated API key
35        """
36        # Generate a secure random key
37        key = secrets.token_urlsafe(32)
38        key_hash = self._hash_key(key)
39        
40        expiry = None
41        if expiry_days:
42            expiry = datetime.utcnow() + timedelta(days=expiry_days)
43        
44        self._keys[key_hash] = {
45            "name": name,
46            "created_at": datetime.utcnow(),
47            "expiry": expiry,
48            "active": True
49        }
50        
51        return key
52    
53    def validate_key(self, key: str) -> bool:
54        """
55        Validate an API key.
56        
57        Args:
58            key: The API key to validate
59            
60        Returns:
61            True if valid, False otherwise
62        """
63        key_hash = self._hash_key(key)
64        key_info = self._keys.get(key_hash)
65        
66        if not key_info or not key_info["active"]:
67            return False
68        
69        # Check expiry
70        if key_info["expiry"] and datetime.utcnow() > key_info["expiry"]:
71            return False
72        
73        return True
74    
75    def revoke_key(self, key: str) -> None:
76        """Revoke an API key."""
77        key_hash = self._hash_key(key)
78        if key_hash in self._keys:
79            self._keys[key_hash]["active"] = False
80    
81    @staticmethod
82    def _hash_key(key: str) -> str:
83        """Hash an API key for storage."""
84        return hashlib.sha256(key.encode()).hexdigest()

Manager for API keys used in authentication.

Handles creation, validation, and rotation of API keys.

APIKeyManager()
17    def __init__(self):
18        """Initialize API key manager."""
19        self._keys: Dict[str, Dict[str, any]] = {}

Initialize API key manager.

def create_key(self, name: str, expiry_days: Optional[int] = None) -> str:
21    def create_key(
22        self,
23        name: str,
24        expiry_days: Optional[int] = None
25    ) -> str:
26        """
27        Create a new API key.
28        
29        Args:
30            name: Identifier for the key
31            expiry_days: Optional expiration in days
32            
33        Returns:
34            The generated API key
35        """
36        # Generate a secure random key
37        key = secrets.token_urlsafe(32)
38        key_hash = self._hash_key(key)
39        
40        expiry = None
41        if expiry_days:
42            expiry = datetime.utcnow() + timedelta(days=expiry_days)
43        
44        self._keys[key_hash] = {
45            "name": name,
46            "created_at": datetime.utcnow(),
47            "expiry": expiry,
48            "active": True
49        }
50        
51        return key

Create a new API key.

Args: name: Identifier for the key expiry_days: Optional expiration in days

Returns: The generated API key

def validate_key(self, key: str) -> bool:
53    def validate_key(self, key: str) -> bool:
54        """
55        Validate an API key.
56        
57        Args:
58            key: The API key to validate
59            
60        Returns:
61            True if valid, False otherwise
62        """
63        key_hash = self._hash_key(key)
64        key_info = self._keys.get(key_hash)
65        
66        if not key_info or not key_info["active"]:
67            return False
68        
69        # Check expiry
70        if key_info["expiry"] and datetime.utcnow() > key_info["expiry"]:
71            return False
72        
73        return True

Validate an API key.

Args: key: The API key to validate

Returns: True if valid, False otherwise

def revoke_key(self, key: str) -> None:
75    def revoke_key(self, key: str) -> None:
76        """Revoke an API key."""
77        key_hash = self._hash_key(key)
78        if key_hash in self._keys:
79            self._keys[key_hash]["active"] = False

Revoke an API key.

class TokenValidator:
 9class TokenValidator:
10    """
11    Validator for JWT and bearer tokens.
12    
13    Handles token verification and claims extraction.
14    """
15    
16    def __init__(self, secret_key: str, algorithm: str = "HS256"):
17        """
18        Initialize token validator.
19        
20        Args:
21            secret_key: Secret key for token validation
22            algorithm: JWT algorithm to use
23        """
24        self.secret_key = secret_key
25        self.algorithm = algorithm
26    
27    def validate(self, token: str) -> Optional[Dict[str, Any]]:
28        """
29        Validate a JWT token.
30        
31        Args:
32            token: The JWT token to validate
33            
34        Returns:
35            Decoded claims if valid, None otherwise
36        """
37        try:
38            # Decode and verify token
39            payload = jwt.decode(
40                token,
41                self.secret_key,
42                algorithms=[self.algorithm]
43            )
44            
45            # Check expiration
46            exp = payload.get("exp")
47            if exp and datetime.utcfromtimestamp(exp) < datetime.utcnow():
48                return None
49            
50            return payload
51            
52        except jwt.InvalidTokenError:
53            return None
54    
55    def create_token(
56        self,
57        claims: Dict[str, Any],
58        expiry_seconds: int = 3600
59    ) -> str:
60        """
61        Create a new JWT token.
62        
63        Args:
64            claims: Claims to include in the token
65            expiry_seconds: Token expiration in seconds
66            
67        Returns:
68            The encoded JWT token
69        """
70        payload = {
71            **claims,
72            "exp": datetime.utcnow().timestamp() + expiry_seconds,
73            "iat": datetime.utcnow().timestamp()
74        }
75        
76        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

Validator for JWT and bearer tokens.

Handles token verification and claims extraction.

TokenValidator(secret_key: str, algorithm: str = 'HS256')
16    def __init__(self, secret_key: str, algorithm: str = "HS256"):
17        """
18        Initialize token validator.
19        
20        Args:
21            secret_key: Secret key for token validation
22            algorithm: JWT algorithm to use
23        """
24        self.secret_key = secret_key
25        self.algorithm = algorithm

Initialize token validator.

Args: secret_key: Secret key for token validation algorithm: JWT algorithm to use

secret_key
algorithm
def validate(self, token: str) -> Optional[Dict[str, Any]]:
27    def validate(self, token: str) -> Optional[Dict[str, Any]]:
28        """
29        Validate a JWT token.
30        
31        Args:
32            token: The JWT token to validate
33            
34        Returns:
35            Decoded claims if valid, None otherwise
36        """
37        try:
38            # Decode and verify token
39            payload = jwt.decode(
40                token,
41                self.secret_key,
42                algorithms=[self.algorithm]
43            )
44            
45            # Check expiration
46            exp = payload.get("exp")
47            if exp and datetime.utcfromtimestamp(exp) < datetime.utcnow():
48                return None
49            
50            return payload
51            
52        except jwt.InvalidTokenError:
53            return None

Validate a JWT token.

Args: token: The JWT token to validate

Returns: Decoded claims if valid, None otherwise

def create_token(self, claims: Dict[str, Any], expiry_seconds: int = 3600) -> str:
55    def create_token(
56        self,
57        claims: Dict[str, Any],
58        expiry_seconds: int = 3600
59    ) -> str:
60        """
61        Create a new JWT token.
62        
63        Args:
64            claims: Claims to include in the token
65            expiry_seconds: Token expiration in seconds
66            
67        Returns:
68            The encoded JWT token
69        """
70        payload = {
71            **claims,
72            "exp": datetime.utcnow().timestamp() + expiry_seconds,
73            "iat": datetime.utcnow().timestamp()
74        }
75        
76        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

Create a new JWT token.

Args: claims: Claims to include in the token expiry_seconds: Token expiration in seconds

Returns: The encoded JWT token