gmf_forge_ai_orchestration.state
State management — conversation state, checkpoints, blackboard, and store backends.
1"""State management — conversation state, checkpoints, blackboard, and store backends.""" 2 3from gmf_forge_ai_orchestration.state.base import ( 4 BaseStateStore, 5 Checkpoint, 6 ConversationMessage, 7 ConversationState, 8) 9from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore 10from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore 11from gmf_forge_ai_orchestration.state.factory import StateStoreFactory 12from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager 13from gmf_forge_ai_orchestration.state.blackboard import Blackboard, BlackboardEntry 14 15__all__ = [ 16 "BaseStateStore", 17 "Checkpoint", 18 "ConversationMessage", 19 "ConversationState", 20 "InMemoryStateStore", 21 "RedisStateStore", 22 "StateStoreFactory", 23 "CheckpointManager", 24 "Blackboard", 25 "BlackboardEntry", 26]
102class BaseStateStore(ABC): 103 """ 104 Abstract key-value state store. 105 106 Both InMemoryStateStore and RedisStateStore implement this interface, 107 making them interchangeable in all orchestration components. 108 """ 109 110 @abstractmethod 111 async def get(self, key: str) -> Optional[Any]: 112 """Retrieve a value by key. Returns None if not found or expired.""" 113 114 @abstractmethod 115 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 116 """ 117 Store a value. 118 119 Args: 120 key: Storage key. 121 value: JSON-serialisable value. 122 ttl: Time-to-live in seconds. None means no expiry. 123 """ 124 125 @abstractmethod 126 async def delete(self, key: str) -> None: 127 """Remove a key. No-op if the key does not exist.""" 128 129 @abstractmethod 130 async def exists(self, key: str) -> bool: 131 """Return True if the key exists and has not expired.""" 132 133 @abstractmethod 134 async def clear(self) -> None: 135 """Delete all keys managed by this store instance."""
Abstract key-value state store.
Both InMemoryStateStore and RedisStateStore implement this interface, making them interchangeable in all orchestration components.
110 @abstractmethod 111 async def get(self, key: str) -> Optional[Any]: 112 """Retrieve a value by key. Returns None if not found or expired."""
Retrieve a value by key. Returns None if not found or expired.
114 @abstractmethod 115 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 116 """ 117 Store a value. 118 119 Args: 120 key: Storage key. 121 value: JSON-serialisable value. 122 ttl: Time-to-live in seconds. None means no expiry. 123 """
Store a value.
Args: key: Storage key. value: JSON-serialisable value. ttl: Time-to-live in seconds. None means no expiry.
125 @abstractmethod 126 async def delete(self, key: str) -> None: 127 """Remove a key. No-op if the key does not exist."""
Remove a key. No-op if the key does not exist.
72@dataclass 73class Checkpoint: 74 """A point-in-time snapshot of an agent's execution state.""" 75 76 checkpoint_id: str 77 agent_id: str 78 state: Dict[str, Any] 79 timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) 80 metadata: Dict[str, Any] = field(default_factory=dict) 81 82 def to_dict(self) -> Dict[str, Any]: 83 return { 84 "checkpoint_id": self.checkpoint_id, 85 "agent_id": self.agent_id, 86 "state": self.state, 87 "timestamp": self.timestamp.isoformat(), 88 "metadata": self.metadata, 89 } 90 91 @classmethod 92 def from_dict(cls, data: Dict[str, Any]) -> "Checkpoint": 93 return cls( 94 checkpoint_id=data["checkpoint_id"], 95 agent_id=data["agent_id"], 96 state=data["state"], 97 timestamp=datetime.fromisoformat(data["timestamp"]), 98 metadata=data.get("metadata", {}), 99 )
A point-in-time snapshot of an agent's execution state.
10@dataclass 11class ConversationMessage: 12 """A single message in a conversation.""" 13 14 role: str 15 content: str 16 timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) 17 metadata: Dict[str, Any] = field(default_factory=dict)
A single message in a conversation.
20@dataclass 21class ConversationState: 22 """Full state of an agent conversation session.""" 23 24 session_id: str 25 messages: List[ConversationMessage] = field(default_factory=list) 26 agent_metadata: Dict[str, Any] = field(default_factory=dict) 27 created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) 28 updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) 29 30 def add_message(self, role: str, content: str, **metadata: Any) -> None: 31 """Append a message and update the timestamp.""" 32 self.messages.append(ConversationMessage(role=role, content=content, metadata=metadata)) 33 self.updated_at = datetime.now(timezone.utc) 34 35 def to_dict(self) -> Dict[str, Any]: 36 return { 37 "session_id": self.session_id, 38 "messages": [ 39 { 40 "role": m.role, 41 "content": m.content, 42 "timestamp": m.timestamp.isoformat(), 43 "metadata": m.metadata, 44 } 45 for m in self.messages 46 ], 47 "agent_metadata": self.agent_metadata, 48 "created_at": self.created_at.isoformat(), 49 "updated_at": self.updated_at.isoformat(), 50 } 51 52 @classmethod 53 def from_dict(cls, data: Dict[str, Any]) -> "ConversationState": 54 messages = [ 55 ConversationMessage( 56 role=m["role"], 57 content=m["content"], 58 timestamp=datetime.fromisoformat(m["timestamp"]), 59 metadata=m.get("metadata", {}), 60 ) 61 for m in data.get("messages", []) 62 ] 63 return cls( 64 session_id=data["session_id"], 65 messages=messages, 66 agent_metadata=data.get("agent_metadata", {}), 67 created_at=datetime.fromisoformat(data["created_at"]), 68 updated_at=datetime.fromisoformat(data["updated_at"]), 69 )
Full state of an agent conversation session.
30 def add_message(self, role: str, content: str, **metadata: Any) -> None: 31 """Append a message and update the timestamp.""" 32 self.messages.append(ConversationMessage(role=role, content=content, metadata=metadata)) 33 self.updated_at = datetime.now(timezone.utc)
Append a message and update the timestamp.
35 def to_dict(self) -> Dict[str, Any]: 36 return { 37 "session_id": self.session_id, 38 "messages": [ 39 { 40 "role": m.role, 41 "content": m.content, 42 "timestamp": m.timestamp.isoformat(), 43 "metadata": m.metadata, 44 } 45 for m in self.messages 46 ], 47 "agent_metadata": self.agent_metadata, 48 "created_at": self.created_at.isoformat(), 49 "updated_at": self.updated_at.isoformat(), 50 }
52 @classmethod 53 def from_dict(cls, data: Dict[str, Any]) -> "ConversationState": 54 messages = [ 55 ConversationMessage( 56 role=m["role"], 57 content=m["content"], 58 timestamp=datetime.fromisoformat(m["timestamp"]), 59 metadata=m.get("metadata", {}), 60 ) 61 for m in data.get("messages", []) 62 ] 63 return cls( 64 session_id=data["session_id"], 65 messages=messages, 66 agent_metadata=data.get("agent_metadata", {}), 67 created_at=datetime.fromisoformat(data["created_at"]), 68 updated_at=datetime.fromisoformat(data["updated_at"]), 69 )
11class InMemoryStateStore(BaseStateStore): 12 """ 13 Thread-safe in-memory key-value store with optional TTL. 14 15 Suitable for single-process deployments and testing. For multi-process 16 or persistent state, use RedisStateStore instead. 17 """ 18 19 def __init__(self) -> None: 20 # values are (data, expires_at_monotonic | None) 21 self._store: Dict[str, Tuple[Any, Optional[float]]] = {} 22 self._lock = asyncio.Lock() 23 24 # ------------------------------------------------------------------ 25 # Internal helpers 26 # ------------------------------------------------------------------ 27 28 def _is_expired(self, expires_at: Optional[float]) -> bool: 29 if expires_at is None: 30 return False 31 return time.monotonic() > expires_at 32 33 def _clean_key(self, key: str) -> None: 34 """Remove a key if it has expired (lazy eviction).""" 35 entry = self._store.get(key) 36 if entry and self._is_expired(entry[1]): 37 del self._store[key] 38 39 # ------------------------------------------------------------------ 40 # BaseStateStore implementation 41 # ------------------------------------------------------------------ 42 43 async def get(self, key: str) -> Optional[Any]: 44 async with self._lock: 45 self._clean_key(key) 46 entry = self._store.get(key) 47 return entry[0] if entry else None 48 49 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 50 expires_at = time.monotonic() + ttl if ttl is not None else None 51 async with self._lock: 52 self._store[key] = (value, expires_at) 53 54 async def delete(self, key: str) -> None: 55 async with self._lock: 56 self._store.pop(key, None) 57 58 async def exists(self, key: str) -> bool: 59 async with self._lock: 60 self._clean_key(key) 61 return key in self._store 62 63 async def clear(self) -> None: 64 async with self._lock: 65 self._store.clear()
Thread-safe in-memory key-value store with optional TTL.
Suitable for single-process deployments and testing. For multi-process or persistent state, use RedisStateStore instead.
43 async def get(self, key: str) -> Optional[Any]: 44 async with self._lock: 45 self._clean_key(key) 46 entry = self._store.get(key) 47 return entry[0] if entry else None
Retrieve a value by key. Returns None if not found or expired.
49 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 50 expires_at = time.monotonic() + ttl if ttl is not None else None 51 async with self._lock: 52 self._store[key] = (value, expires_at)
Store a value.
Args: key: Storage key. value: JSON-serialisable value. ttl: Time-to-live in seconds. None means no expiry.
54 async def delete(self, key: str) -> None: 55 async with self._lock: 56 self._store.pop(key, None)
Remove a key. No-op if the key does not exist.
10class RedisStateStore(BaseStateStore): 11 """ 12 Redis-backed key-value store. 13 14 Uses redis.asyncio for non-blocking I/O. Values are JSON-serialised 15 before storage and deserialised on retrieval. 16 17 Args: 18 url: Redis connection URL (e.g. ``"redis://localhost:6379"``). 19 key_prefix: Optional prefix applied to every key to avoid collisions 20 when sharing a Redis instance across services. Defaults to 21 ``"gmf_forge_ai:"``. 22 decode_responses: Passed through to the Redis client. Defaults to True. 23 24 Example:: 25 26 store = RedisStateStore(url="redis://localhost:6379") 27 await store.set("session:abc", {"key": "value"}, ttl=3600) 28 data = await store.get("session:abc") 29 """ 30 31 def __init__( 32 self, 33 url: str = "redis://localhost:6379", 34 key_prefix: str = "gmf_forge_ai:", 35 **redis_kwargs: Any, 36 ) -> None: 37 try: 38 import redis.asyncio as aioredis # type: ignore[import] 39 except ImportError as exc: 40 raise ImportError( 41 "redis package is required for RedisStateStore. " 42 "Install it with: pip install redis>=5.0.0" 43 ) from exc 44 45 self._client = aioredis.from_url(url, decode_responses=True, **redis_kwargs) 46 self._prefix = key_prefix 47 48 # ------------------------------------------------------------------ 49 # Internal helpers 50 # ------------------------------------------------------------------ 51 52 def _k(self, key: str) -> str: 53 return f"{self._prefix}{key}" 54 55 @staticmethod 56 def _serialize(value: Any) -> str: 57 return json.dumps(value, default=str) 58 59 @staticmethod 60 def _deserialize(raw: str) -> Any: 61 return json.loads(raw) 62 63 # ------------------------------------------------------------------ 64 # BaseStateStore implementation 65 # ------------------------------------------------------------------ 66 67 async def get(self, key: str) -> Optional[Any]: 68 raw = await self._client.get(self._k(key)) 69 if raw is None: 70 return None 71 return self._deserialize(raw) 72 73 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 74 serialized = self._serialize(value) 75 if ttl is not None: 76 await self._client.setex(self._k(key), ttl, serialized) 77 else: 78 await self._client.set(self._k(key), serialized) 79 80 async def delete(self, key: str) -> None: 81 await self._client.delete(self._k(key)) 82 83 async def exists(self, key: str) -> bool: 84 result = await self._client.exists(self._k(key)) 85 return bool(result) 86 87 async def clear(self) -> None: 88 """Delete all keys matching this store's prefix.""" 89 pattern = f"{self._prefix}*" 90 cursor = 0 91 while True: 92 cursor, keys = await self._client.scan(cursor, match=pattern, count=100) 93 if keys: 94 await self._client.delete(*keys) 95 if cursor == 0: 96 break 97 98 async def close(self) -> None: 99 """Close the underlying Redis connection pool.""" 100 await self._client.aclose()
Redis-backed key-value store.
Uses redis.asyncio for non-blocking I/O. Values are JSON-serialised before storage and deserialised on retrieval.
Args:
url: Redis connection URL (e.g. "redis://localhost:6379").
key_prefix: Optional prefix applied to every key to avoid collisions
when sharing a Redis instance across services. Defaults to
"gmf_forge_ai:".
decode_responses: Passed through to the Redis client. Defaults to True.
Example::
store = RedisStateStore(url="redis://localhost:6379")
await store.set("session:abc", {"key": "value"}, ttl=3600)
data = await store.get("session:abc")
31 def __init__( 32 self, 33 url: str = "redis://localhost:6379", 34 key_prefix: str = "gmf_forge_ai:", 35 **redis_kwargs: Any, 36 ) -> None: 37 try: 38 import redis.asyncio as aioredis # type: ignore[import] 39 except ImportError as exc: 40 raise ImportError( 41 "redis package is required for RedisStateStore. " 42 "Install it with: pip install redis>=5.0.0" 43 ) from exc 44 45 self._client = aioredis.from_url(url, decode_responses=True, **redis_kwargs) 46 self._prefix = key_prefix
67 async def get(self, key: str) -> Optional[Any]: 68 raw = await self._client.get(self._k(key)) 69 if raw is None: 70 return None 71 return self._deserialize(raw)
Retrieve a value by key. Returns None if not found or expired.
73 async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: 74 serialized = self._serialize(value) 75 if ttl is not None: 76 await self._client.setex(self._k(key), ttl, serialized) 77 else: 78 await self._client.set(self._k(key), serialized)
Store a value.
Args: key: Storage key. value: JSON-serialisable value. ttl: Time-to-live in seconds. None means no expiry.
83 async def exists(self, key: str) -> bool: 84 result = await self._client.exists(self._k(key)) 85 return bool(result)
Return True if the key exists and has not expired.
87 async def clear(self) -> None: 88 """Delete all keys matching this store's prefix.""" 89 pattern = f"{self._prefix}*" 90 cursor = 0 91 while True: 92 cursor, keys = await self._client.scan(cursor, match=pattern, count=100) 93 if keys: 94 await self._client.delete(*keys) 95 if cursor == 0: 96 break
Delete all keys matching this store's prefix.
9class StateStoreFactory: 10 """ 11 Creates configured state store instances. 12 13 Example:: 14 15 # In-memory (default, no extra deps) 16 store = StateStoreFactory.create("memory") 17 18 # Redis 19 store = StateStoreFactory.create("redis", url="redis://localhost:6379") 20 """ 21 22 @staticmethod 23 def create( 24 backend: Literal["memory", "redis"] = "memory", 25 **kwargs: Any, 26 ) -> BaseStateStore: 27 """ 28 Instantiate a state store. 29 30 Args: 31 backend: ``"memory"`` or ``"redis"``. 32 **kwargs: Forwarded to the store constructor. 33 For Redis: ``url``, ``key_prefix``, and any 34 additional kwargs accepted by ``redis.asyncio.from_url``. 35 36 Returns: 37 A :class:`BaseStateStore` instance. 38 39 Raises: 40 ValueError: If an unknown backend name is provided. 41 """ 42 if backend == "memory": 43 from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore 44 return InMemoryStateStore() 45 46 if backend == "redis": 47 from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore 48 return RedisStateStore(**kwargs) 49 50 raise ValueError( 51 f"Unknown state store backend: {backend!r}. " 52 "Supported values: 'memory', 'redis'." 53 )
Creates configured state store instances.
Example::
# In-memory (default, no extra deps)
store = StateStoreFactory.create("memory")
# Redis
store = StateStoreFactory.create("redis", url="redis://localhost:6379")
22 @staticmethod 23 def create( 24 backend: Literal["memory", "redis"] = "memory", 25 **kwargs: Any, 26 ) -> BaseStateStore: 27 """ 28 Instantiate a state store. 29 30 Args: 31 backend: ``"memory"`` or ``"redis"``. 32 **kwargs: Forwarded to the store constructor. 33 For Redis: ``url``, ``key_prefix``, and any 34 additional kwargs accepted by ``redis.asyncio.from_url``. 35 36 Returns: 37 A :class:`BaseStateStore` instance. 38 39 Raises: 40 ValueError: If an unknown backend name is provided. 41 """ 42 if backend == "memory": 43 from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore 44 return InMemoryStateStore() 45 46 if backend == "redis": 47 from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore 48 return RedisStateStore(**kwargs) 49 50 raise ValueError( 51 f"Unknown state store backend: {backend!r}. " 52 "Supported values: 'memory', 'redis'." 53 )
Instantiate a state store.
Args:
backend: "memory" or "redis".
**kwargs: Forwarded to the store constructor.
For Redis: url, key_prefix, and any
additional kwargs accepted by redis.asyncio.from_url.
Returns:
A BaseStateStore instance.
Raises: ValueError: If an unknown backend name is provided.
15class CheckpointManager: 16 """ 17 Saves and restores agent execution state checkpoints. 18 19 Works with any :class:`BaseStateStore` backend (in-memory or Redis). 20 21 Redis key structure 22 ------------------- 23 Three key types are written per agent/execution:: 24 25 Redis 26 │ 27 ├── __ckpt_index__{agent_id} (TTL: default_ttl, refreshed on each write) 28 │ └── [ "ckpt-uuid-A", "ckpt-uuid-B", "ckpt-uuid-C", ... ] 29 │ │ │ 30 │ │ execution-id-1 │ execution-id-2 31 │ ▼ ▼ 32 ├── __ckpt_exec__{execution_id_1} __ckpt_exec__{execution_id_2} 33 │ └── [ "ckpt-uuid-A", └── [ "ckpt-uuid-D", 34 │ "ckpt-uuid-B", "ckpt-uuid-E", 35 │ "ckpt-uuid-C" ] "ckpt-uuid-F" ] 36 │ │ │ 37 │ step 0, 1, 2 step 0, 1, 2 38 │ ▼ ▼ 39 ├── __ckpt_data__{ckpt-uuid-A} (TTL) __ckpt_data__{ckpt-uuid-D} (TTL) 40 │ step_number: 0 step_number: 0 41 │ steps: [ action_0 ] steps: [ action_0 ] 42 │ 43 ├── __ckpt_data__{ckpt-uuid-B} (TTL) __ckpt_data__{ckpt-uuid-E} (TTL) 44 │ step_number: 1 step_number: 1 45 │ steps: [ action_0, action_1 ] steps: [ action_0, action_1 ] 46 │ 47 └── __ckpt_data__{ckpt-uuid-C} (TTL) __ckpt_data__{ckpt-uuid-F} (TTL) 48 step_number: 2 step_number: 2 49 steps: [ action_0, action_1, action_2 ] steps: [ ... ] 50 51 Each ``__ckpt_data__`` key is a **cumulative snapshot** — step N contains all steps 52 0..N, so only the last entry in the exec index is needed to fully resume. 53 54 All three key types share the same TTL (``default_ttl``). Index and exec keys 55 therefore expire together with the data they reference, leaving no stale entries. 56 57 Resuming an execution 58 --------------------- 59 :: 60 61 # 1. Look up all checkpoint IDs for the execution 62 exec_key → __ckpt_exec__{execution_id} → [ id_0, id_1, id_2 ] 63 64 # 2. Load the last (highest step_number) checkpoint 65 last_id = checkpoint_ids[-1] 66 checkpoint = await manager.load(last_id) 67 68 # 3. The checkpoint.state contains the full steps list — resume from there 69 steps_so_far = checkpoint.state["steps"] 70 71 Listing all checkpoints for an agent 72 -------------------------------------- 73 :: 74 75 agent_key → __ckpt_index__{agent_id} → [ id_0, id_1, ... ] 76 checkpoints = await manager.list(agent_id) 77 78 API usage 79 --------- 80 ``save()`` is called **internally by the agent** after each step — developers 81 should not call it directly. The developer-facing methods are: 82 83 ``load(checkpoint_id)`` 84 Load a single checkpoint by ID. Returns ``None`` if expired or not found. 85 86 ``list(agent_id)`` 87 All checkpoints for an agent across all executions, oldest first. 88 Expired entries are silently skipped. 89 90 ``list_by_execution(execution_id)`` 91 All checkpoints for a specific execution, oldest first. 92 93 ``load_latest_for_execution(execution_id)`` 94 The most recent (highest step_number) checkpoint for an execution — 95 the primary entry point for resuming a task:: 96 97 checkpoint = await manager.load_latest_for_execution(execution_id) 98 if checkpoint: 99 steps_so_far = checkpoint.state["steps"] 100 # hand steps_so_far back to the agent to continue from 101 """ 102 103 def __init__(self, store: BaseStateStore, default_ttl: Optional[int] = None) -> None: 104 self._store = store 105 self._default_ttl = default_ttl 106 107 @property 108 def default_ttl(self) -> Optional[int]: 109 """TTL applied to checkpoint data keys when no explicit ttl is passed to save().""" 110 return self._default_ttl 111 112 # ------------------------------------------------------------------ 113 # Public API 114 # ------------------------------------------------------------------ 115 116 async def save( 117 self, 118 agent_id: str, 119 state: dict, 120 execution_id: Optional[str] = None, 121 metadata: Optional[dict] = None, 122 ttl: Optional[int] = None, 123 ) -> str: 124 """ 125 Persist an agent state snapshot. 126 127 Returns: 128 The generated checkpoint_id. 129 """ 130 checkpoint_id = str(uuid.uuid4()) 131 checkpoint = Checkpoint( 132 checkpoint_id=checkpoint_id, 133 agent_id=agent_id, 134 state=state, 135 metadata=metadata or {}, 136 ) 137 138 # Store the checkpoint data 139 effective_ttl = ttl if ttl is not None else self._default_ttl 140 data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}" 141 await self._store.set(data_key, checkpoint.to_dict(), ttl=effective_ttl) 142 143 # Append to agent's checkpoint index 144 index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}" 145 index: List[str] = await self._store.get(index_key) or [] 146 index.append(checkpoint_id) 147 await self._store.set(index_key, index, ttl=effective_ttl) 148 149 # Optionally index by execution_id for resume/query flows 150 if execution_id: 151 exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}" 152 exec_index: List[str] = await self._store.get(exec_key) or [] 153 exec_index.append(checkpoint_id) 154 await self._store.set(exec_key, exec_index, ttl=effective_ttl) 155 156 return checkpoint_id 157 158 async def load(self, checkpoint_id: str) -> Optional[Checkpoint]: 159 """ 160 Load a checkpoint by ID. 161 162 Returns: 163 The :class:`Checkpoint` or ``None`` if not found / expired. 164 """ 165 data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}" 166 raw = await self._store.get(data_key) 167 if raw is None: 168 return None 169 return Checkpoint.from_dict(raw) 170 171 async def list(self, agent_id: str) -> List[Checkpoint]: 172 """ 173 Return all checkpoints for an agent, oldest first. 174 175 Checkpoints that have been evicted (TTL expired) are silently skipped. 176 """ 177 index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}" 178 checkpoint_ids: List[str] = await self._store.get(index_key) or [] 179 180 checkpoints = [] 181 for ckpt_id in checkpoint_ids: 182 checkpoint = await self.load(ckpt_id) 183 if checkpoint is not None: 184 checkpoints.append(checkpoint) 185 return checkpoints 186 187 async def list_by_execution(self, execution_id: str) -> List[Checkpoint]: 188 """Return all checkpoints for an execution, oldest first.""" 189 exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}" 190 checkpoint_ids: List[str] = await self._store.get(exec_key) or [] 191 192 checkpoints = [] 193 for ckpt_id in checkpoint_ids: 194 checkpoint = await self.load(ckpt_id) 195 if checkpoint is not None: 196 checkpoints.append(checkpoint) 197 return checkpoints 198 199 async def load_latest_for_execution(self, execution_id: str) -> Optional[Checkpoint]: 200 """Return the latest checkpoint for an execution, if present.""" 201 checkpoints = await self.list_by_execution(execution_id) 202 if not checkpoints: 203 return None 204 return checkpoints[-1] 205 206 async def delete(self, checkpoint_id: str, agent_id: str) -> None: 207 """Remove a specific checkpoint and its index entry.""" 208 data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}" 209 await self._store.delete(data_key) 210 211 index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}" 212 index: List[str] = await self._store.get(index_key) or [] 213 updated = [cid for cid in index if cid != checkpoint_id] 214 await self._store.set(index_key, updated)
Saves and restores agent execution state checkpoints.
Works with any BaseStateStore backend (in-memory or Redis).
Redis key structure
Three key types are written per agent/execution::
Redis
│
├── __ckpt_index__{agent_id} (TTL: default_ttl, refreshed on each write)
│ └── [ "ckpt-uuid-A", "ckpt-uuid-B", "ckpt-uuid-C", ... ]
│ │ │
│ │ execution-id-1 │ execution-id-2
│ ▼ ▼
├── __ckpt_exec__{execution_id_1} __ckpt_exec__{execution_id_2}
│ └── [ "ckpt-uuid-A", └── [ "ckpt-uuid-D",
│ "ckpt-uuid-B", "ckpt-uuid-E",
│ "ckpt-uuid-C" ] "ckpt-uuid-F" ]
│ │ │
│ step 0, 1, 2 step 0, 1, 2
│ ▼ ▼
├── __ckpt_data__{ckpt-uuid-A} (TTL) __ckpt_data__{ckpt-uuid-D} (TTL)
│ step_number: 0 step_number: 0
│ steps: [ action_0 ] steps: [ action_0 ]
│
├── __ckpt_data__{ckpt-uuid-B} (TTL) __ckpt_data__{ckpt-uuid-E} (TTL)
│ step_number: 1 step_number: 1
│ steps: [ action_0, action_1 ] steps: [ action_0, action_1 ]
│
└── __ckpt_data__{ckpt-uuid-C} (TTL) __ckpt_data__{ckpt-uuid-F} (TTL)
step_number: 2 step_number: 2
steps: [ action_0, action_1, action_2 ] steps: [ ... ]
Each __ckpt_data__ key is a cumulative snapshot — step N contains all steps
0..N, so only the last entry in the exec index is needed to fully resume.
All three key types share the same TTL (default_ttl). Index and exec keys
therefore expire together with the data they reference, leaving no stale entries.
Resuming an execution
::
# 1. Look up all checkpoint IDs for the execution
exec_key → __ckpt_exec__{execution_id} → [ id_0, id_1, id_2 ]
# 2. Load the last (highest step_number) checkpoint
last_id = checkpoint_ids[-1]
checkpoint = await manager.load(last_id)
# 3. The checkpoint.state contains the full steps list — resume from there
steps_so_far = checkpoint.state["steps"]
Listing all checkpoints for an agent
::
agent_key → __ckpt_index__{agent_id} → [ id_0, id_1, ... ]
checkpoints = await manager.list(agent_id)
API usage
save() is called internally by the agent after each step — developers
should not call it directly. The developer-facing methods are:
load(checkpoint_id)
Load a single checkpoint by ID. Returns None if expired or not found.
list(agent_id)
All checkpoints for an agent across all executions, oldest first.
Expired entries are silently skipped.
list_by_execution(execution_id)
All checkpoints for a specific execution, oldest first.
load_latest_for_execution(execution_id)
The most recent (highest step_number) checkpoint for an execution —
the primary entry point for resuming a task::
checkpoint = await manager.load_latest_for_execution(execution_id)
if checkpoint:
steps_so_far = checkpoint.state["steps"]
# hand steps_so_far back to the agent to continue from
107 @property 108 def default_ttl(self) -> Optional[int]: 109 """TTL applied to checkpoint data keys when no explicit ttl is passed to save().""" 110 return self._default_ttl
TTL applied to checkpoint data keys when no explicit ttl is passed to save().
116 async def save( 117 self, 118 agent_id: str, 119 state: dict, 120 execution_id: Optional[str] = None, 121 metadata: Optional[dict] = None, 122 ttl: Optional[int] = None, 123 ) -> str: 124 """ 125 Persist an agent state snapshot. 126 127 Returns: 128 The generated checkpoint_id. 129 """ 130 checkpoint_id = str(uuid.uuid4()) 131 checkpoint = Checkpoint( 132 checkpoint_id=checkpoint_id, 133 agent_id=agent_id, 134 state=state, 135 metadata=metadata or {}, 136 ) 137 138 # Store the checkpoint data 139 effective_ttl = ttl if ttl is not None else self._default_ttl 140 data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}" 141 await self._store.set(data_key, checkpoint.to_dict(), ttl=effective_ttl) 142 143 # Append to agent's checkpoint index 144 index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}" 145 index: List[str] = await self._store.get(index_key) or [] 146 index.append(checkpoint_id) 147 await self._store.set(index_key, index, ttl=effective_ttl) 148 149 # Optionally index by execution_id for resume/query flows 150 if execution_id: 151 exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}" 152 exec_index: List[str] = await self._store.get(exec_key) or [] 153 exec_index.append(checkpoint_id) 154 await self._store.set(exec_key, exec_index, ttl=effective_ttl) 155 156 return checkpoint_id
Persist an agent state snapshot.
Returns: The generated checkpoint_id.
158 async def load(self, checkpoint_id: str) -> Optional[Checkpoint]: 159 """ 160 Load a checkpoint by ID. 161 162 Returns: 163 The :class:`Checkpoint` or ``None`` if not found / expired. 164 """ 165 data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}" 166 raw = await self._store.get(data_key) 167 if raw is None: 168 return None 169 return Checkpoint.from_dict(raw)
Load a checkpoint by ID.
Returns:
The Checkpoint or None if not found / expired.
171 async def list(self, agent_id: str) -> List[Checkpoint]: 172 """ 173 Return all checkpoints for an agent, oldest first. 174 175 Checkpoints that have been evicted (TTL expired) are silently skipped. 176 """ 177 index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}" 178 checkpoint_ids: List[str] = await self._store.get(index_key) or [] 179 180 checkpoints = [] 181 for ckpt_id in checkpoint_ids: 182 checkpoint = await self.load(ckpt_id) 183 if checkpoint is not None: 184 checkpoints.append(checkpoint) 185 return checkpoints
Return all checkpoints for an agent, oldest first.
Checkpoints that have been evicted (TTL expired) are silently skipped.
187 async def list_by_execution(self, execution_id: str) -> List[Checkpoint]: 188 """Return all checkpoints for an execution, oldest first.""" 189 exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}" 190 checkpoint_ids: List[str] = await self._store.get(exec_key) or [] 191 192 checkpoints = [] 193 for ckpt_id in checkpoint_ids: 194 checkpoint = await self.load(ckpt_id) 195 if checkpoint is not None: 196 checkpoints.append(checkpoint) 197 return checkpoints
Return all checkpoints for an execution, oldest first.
199 async def load_latest_for_execution(self, execution_id: str) -> Optional[Checkpoint]: 200 """Return the latest checkpoint for an execution, if present.""" 201 checkpoints = await self.list_by_execution(execution_id) 202 if not checkpoints: 203 return None 204 return checkpoints[-1]
Return the latest checkpoint for an execution, if present.
206 async def delete(self, checkpoint_id: str, agent_id: str) -> None: 207 """Remove a specific checkpoint and its index entry.""" 208 data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}" 209 await self._store.delete(data_key) 210 211 index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}" 212 index: List[str] = await self._store.get(index_key) or [] 213 updated = [cid for cid in index if cid != checkpoint_id] 214 await self._store.set(index_key, updated)
Remove a specific checkpoint and its index entry.
44class Blackboard: 45 """ 46 Shared read/write board for multi-agent communication. 47 48 Agents can post results, facts, and partial outputs here so other agents 49 in the same orchestration run can build on them. Backed by any 50 :class:`BaseStateStore`. 51 52 Example:: 53 54 board = Blackboard(store, namespace="run-abc") 55 await board.write("search_results", docs, author="search_agent") 56 results = await board.read("search_results") 57 all_entries = await board.list_entries() 58 """ 59 60 def __init__(self, store: BaseStateStore, namespace: str = "default") -> None: 61 self._store = store 62 self._namespace = namespace 63 64 # ------------------------------------------------------------------ 65 # Internal helpers 66 # ------------------------------------------------------------------ 67 68 def _board_key(self) -> str: 69 return f"{_BLACKBOARD_KEY}{self._namespace}" 70 71 async def _load(self) -> Dict[str, Dict[str, Any]]: 72 """Load the full board dict from the store.""" 73 return await self._store.get(self._board_key()) or {} 74 75 async def _save(self, board: Dict[str, Dict[str, Any]]) -> None: 76 await self._store.set(self._board_key(), board) 77 78 # ------------------------------------------------------------------ 79 # Public API 80 # ------------------------------------------------------------------ 81 82 async def write( 83 self, 84 key: str, 85 value: Any, 86 author: str, 87 **metadata: Any, 88 ) -> None: 89 """Write or overwrite a value on the blackboard.""" 90 board = await self._load() 91 entry = BlackboardEntry(key=key, value=value, author=author, metadata=dict(metadata)) 92 board[key] = entry.to_dict() 93 await self._save(board) 94 95 async def read(self, key: str) -> Optional[Any]: 96 """Read a value by key. Returns the raw value (not the full entry).""" 97 board = await self._load() 98 entry_dict = board.get(key) 99 if entry_dict is None: 100 return None 101 return BlackboardEntry.from_dict(entry_dict).value 102 103 async def read_entry(self, key: str) -> Optional[BlackboardEntry]: 104 """Read the full :class:`BlackboardEntry` including author and timestamp.""" 105 board = await self._load() 106 entry_dict = board.get(key) 107 return BlackboardEntry.from_dict(entry_dict) if entry_dict else None 108 109 async def list_entries(self) -> Dict[str, BlackboardEntry]: 110 """Return all entries on the board, keyed by their key name.""" 111 board = await self._load() 112 return {k: BlackboardEntry.from_dict(v) for k, v in board.items()} 113 114 async def delete(self, key: str) -> None: 115 """Remove one entry from the blackboard.""" 116 board = await self._load() 117 board.pop(key, None) 118 await self._save(board) 119 120 async def clear(self) -> None: 121 """Wipe the entire blackboard namespace.""" 122 await self._store.delete(self._board_key()) 123 124 async def keys(self) -> List[str]: 125 """Return the list of keys currently on the board.""" 126 board = await self._load() 127 return list(board.keys())
Shared read/write board for multi-agent communication.
Agents can post results, facts, and partial outputs here so other agents
in the same orchestration run can build on them. Backed by any
BaseStateStore.
Example::
board = Blackboard(store, namespace="run-abc")
await board.write("search_results", docs, author="search_agent")
results = await board.read("search_results")
all_entries = await board.list_entries()
82 async def write( 83 self, 84 key: str, 85 value: Any, 86 author: str, 87 **metadata: Any, 88 ) -> None: 89 """Write or overwrite a value on the blackboard.""" 90 board = await self._load() 91 entry = BlackboardEntry(key=key, value=value, author=author, metadata=dict(metadata)) 92 board[key] = entry.to_dict() 93 await self._save(board)
Write or overwrite a value on the blackboard.
95 async def read(self, key: str) -> Optional[Any]: 96 """Read a value by key. Returns the raw value (not the full entry).""" 97 board = await self._load() 98 entry_dict = board.get(key) 99 if entry_dict is None: 100 return None 101 return BlackboardEntry.from_dict(entry_dict).value
Read a value by key. Returns the raw value (not the full entry).
103 async def read_entry(self, key: str) -> Optional[BlackboardEntry]: 104 """Read the full :class:`BlackboardEntry` including author and timestamp.""" 105 board = await self._load() 106 entry_dict = board.get(key) 107 return BlackboardEntry.from_dict(entry_dict) if entry_dict else None
Read the full BlackboardEntry including author and timestamp.
109 async def list_entries(self) -> Dict[str, BlackboardEntry]: 110 """Return all entries on the board, keyed by their key name.""" 111 board = await self._load() 112 return {k: BlackboardEntry.from_dict(v) for k, v in board.items()}
Return all entries on the board, keyed by their key name.
114 async def delete(self, key: str) -> None: 115 """Remove one entry from the blackboard.""" 116 board = await self._load() 117 board.pop(key, None) 118 await self._save(board)
Remove one entry from the blackboard.
14@dataclass 15class BlackboardEntry: 16 """A single entry written to the blackboard.""" 17 18 key: str 19 value: Any 20 author: str 21 written_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) 22 metadata: Dict[str, Any] = field(default_factory=dict) 23 24 def to_dict(self) -> Dict[str, Any]: 25 return { 26 "key": self.key, 27 "value": self.value, 28 "author": self.author, 29 "written_at": self.written_at.isoformat(), 30 "metadata": self.metadata, 31 } 32 33 @classmethod 34 def from_dict(cls, data: Dict[str, Any]) -> "BlackboardEntry": 35 return cls( 36 key=data["key"], 37 value=data["value"], 38 author=data["author"], 39 written_at=datetime.fromisoformat(data["written_at"]), 40 metadata=data.get("metadata", {}), 41 )
A single entry written to the blackboard.