gmf_forge_ai_orchestration.state

State management — conversation state, checkpoints, blackboard, and store backends.

 1"""State management — conversation state, checkpoints, blackboard, and store backends."""
 2
 3from gmf_forge_ai_orchestration.state.base import (
 4    BaseStateStore,
 5    Checkpoint,
 6    ConversationMessage,
 7    ConversationState,
 8)
 9from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
10from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore
11from gmf_forge_ai_orchestration.state.factory import StateStoreFactory
12from gmf_forge_ai_orchestration.state.checkpoint_manager import CheckpointManager
13from gmf_forge_ai_orchestration.state.blackboard import Blackboard, BlackboardEntry
14
15__all__ = [
16    "BaseStateStore",
17    "Checkpoint",
18    "ConversationMessage",
19    "ConversationState",
20    "InMemoryStateStore",
21    "RedisStateStore",
22    "StateStoreFactory",
23    "CheckpointManager",
24    "Blackboard",
25    "BlackboardEntry",
26]
class BaseStateStore(abc.ABC):
102class BaseStateStore(ABC):
103    """
104    Abstract key-value state store.
105
106    Both InMemoryStateStore and RedisStateStore implement this interface,
107    making them interchangeable in all orchestration components.
108    """
109
110    @abstractmethod
111    async def get(self, key: str) -> Optional[Any]:
112        """Retrieve a value by key. Returns None if not found or expired."""
113
114    @abstractmethod
115    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
116        """
117        Store a value.
118
119        Args:
120            key: Storage key.
121            value: JSON-serialisable value.
122            ttl: Time-to-live in seconds. None means no expiry.
123        """
124
125    @abstractmethod
126    async def delete(self, key: str) -> None:
127        """Remove a key. No-op if the key does not exist."""
128
129    @abstractmethod
130    async def exists(self, key: str) -> bool:
131        """Return True if the key exists and has not expired."""
132
133    @abstractmethod
134    async def clear(self) -> None:
135        """Delete all keys managed by this store instance."""

Abstract key-value state store.

Both InMemoryStateStore and RedisStateStore implement this interface, making them interchangeable in all orchestration components.

@abstractmethod
async def get(self, key: str) -> Optional[Any]:
110    @abstractmethod
111    async def get(self, key: str) -> Optional[Any]:
112        """Retrieve a value by key. Returns None if not found or expired."""

Retrieve a value by key. Returns None if not found or expired.

@abstractmethod
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
114    @abstractmethod
115    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
116        """
117        Store a value.
118
119        Args:
120            key: Storage key.
121            value: JSON-serialisable value.
122            ttl: Time-to-live in seconds. None means no expiry.
123        """

Store a value.

Args: key: Storage key. value: JSON-serialisable value. ttl: Time-to-live in seconds. None means no expiry.

@abstractmethod
async def delete(self, key: str) -> None:
125    @abstractmethod
126    async def delete(self, key: str) -> None:
127        """Remove a key. No-op if the key does not exist."""

Remove a key. No-op if the key does not exist.

@abstractmethod
async def exists(self, key: str) -> bool:
129    @abstractmethod
130    async def exists(self, key: str) -> bool:
131        """Return True if the key exists and has not expired."""

Return True if the key exists and has not expired.

@abstractmethod
async def clear(self) -> None:
133    @abstractmethod
134    async def clear(self) -> None:
135        """Delete all keys managed by this store instance."""

Delete all keys managed by this store instance.

@dataclass
class Checkpoint:
72@dataclass
73class Checkpoint:
74    """A point-in-time snapshot of an agent's execution state."""
75
76    checkpoint_id: str
77    agent_id: str
78    state: Dict[str, Any]
79    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
80    metadata: Dict[str, Any] = field(default_factory=dict)
81
82    def to_dict(self) -> Dict[str, Any]:
83        return {
84            "checkpoint_id": self.checkpoint_id,
85            "agent_id": self.agent_id,
86            "state": self.state,
87            "timestamp": self.timestamp.isoformat(),
88            "metadata": self.metadata,
89        }
90
91    @classmethod
92    def from_dict(cls, data: Dict[str, Any]) -> "Checkpoint":
93        return cls(
94            checkpoint_id=data["checkpoint_id"],
95            agent_id=data["agent_id"],
96            state=data["state"],
97            timestamp=datetime.fromisoformat(data["timestamp"]),
98            metadata=data.get("metadata", {}),
99        )

A point-in-time snapshot of an agent's execution state.

Checkpoint( checkpoint_id: str, agent_id: str, state: Dict[str, Any], timestamp: datetime.datetime = <factory>, metadata: Dict[str, Any] = <factory>)
checkpoint_id: str
agent_id: str
state: Dict[str, Any]
timestamp: datetime.datetime
metadata: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
82    def to_dict(self) -> Dict[str, Any]:
83        return {
84            "checkpoint_id": self.checkpoint_id,
85            "agent_id": self.agent_id,
86            "state": self.state,
87            "timestamp": self.timestamp.isoformat(),
88            "metadata": self.metadata,
89        }
@classmethod
def from_dict( cls, data: Dict[str, Any]) -> Checkpoint:
91    @classmethod
92    def from_dict(cls, data: Dict[str, Any]) -> "Checkpoint":
93        return cls(
94            checkpoint_id=data["checkpoint_id"],
95            agent_id=data["agent_id"],
96            state=data["state"],
97            timestamp=datetime.fromisoformat(data["timestamp"]),
98            metadata=data.get("metadata", {}),
99        )
@dataclass
class ConversationMessage:
10@dataclass
11class ConversationMessage:
12    """A single message in a conversation."""
13
14    role: str
15    content: str
16    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
17    metadata: Dict[str, Any] = field(default_factory=dict)

A single message in a conversation.

ConversationMessage( role: str, content: str, timestamp: datetime.datetime = <factory>, metadata: Dict[str, Any] = <factory>)
role: str
content: str
timestamp: datetime.datetime
metadata: Dict[str, Any]
@dataclass
class ConversationState:
20@dataclass
21class ConversationState:
22    """Full state of an agent conversation session."""
23
24    session_id: str
25    messages: List[ConversationMessage] = field(default_factory=list)
26    agent_metadata: Dict[str, Any] = field(default_factory=dict)
27    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
28    updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
29
30    def add_message(self, role: str, content: str, **metadata: Any) -> None:
31        """Append a message and update the timestamp."""
32        self.messages.append(ConversationMessage(role=role, content=content, metadata=metadata))
33        self.updated_at = datetime.now(timezone.utc)
34
35    def to_dict(self) -> Dict[str, Any]:
36        return {
37            "session_id": self.session_id,
38            "messages": [
39                {
40                    "role": m.role,
41                    "content": m.content,
42                    "timestamp": m.timestamp.isoformat(),
43                    "metadata": m.metadata,
44                }
45                for m in self.messages
46            ],
47            "agent_metadata": self.agent_metadata,
48            "created_at": self.created_at.isoformat(),
49            "updated_at": self.updated_at.isoformat(),
50        }
51
52    @classmethod
53    def from_dict(cls, data: Dict[str, Any]) -> "ConversationState":
54        messages = [
55            ConversationMessage(
56                role=m["role"],
57                content=m["content"],
58                timestamp=datetime.fromisoformat(m["timestamp"]),
59                metadata=m.get("metadata", {}),
60            )
61            for m in data.get("messages", [])
62        ]
63        return cls(
64            session_id=data["session_id"],
65            messages=messages,
66            agent_metadata=data.get("agent_metadata", {}),
67            created_at=datetime.fromisoformat(data["created_at"]),
68            updated_at=datetime.fromisoformat(data["updated_at"]),
69        )

Full state of an agent conversation session.

ConversationState( session_id: str, messages: List[ConversationMessage] = <factory>, agent_metadata: Dict[str, Any] = <factory>, created_at: datetime.datetime = <factory>, updated_at: datetime.datetime = <factory>)
session_id: str
messages: List[ConversationMessage]
agent_metadata: Dict[str, Any]
created_at: datetime.datetime
updated_at: datetime.datetime
def add_message(self, role: str, content: str, **metadata: Any) -> None:
30    def add_message(self, role: str, content: str, **metadata: Any) -> None:
31        """Append a message and update the timestamp."""
32        self.messages.append(ConversationMessage(role=role, content=content, metadata=metadata))
33        self.updated_at = datetime.now(timezone.utc)

Append a message and update the timestamp.

def to_dict(self) -> Dict[str, Any]:
35    def to_dict(self) -> Dict[str, Any]:
36        return {
37            "session_id": self.session_id,
38            "messages": [
39                {
40                    "role": m.role,
41                    "content": m.content,
42                    "timestamp": m.timestamp.isoformat(),
43                    "metadata": m.metadata,
44                }
45                for m in self.messages
46            ],
47            "agent_metadata": self.agent_metadata,
48            "created_at": self.created_at.isoformat(),
49            "updated_at": self.updated_at.isoformat(),
50        }
@classmethod
def from_dict( cls, data: Dict[str, Any]) -> ConversationState:
52    @classmethod
53    def from_dict(cls, data: Dict[str, Any]) -> "ConversationState":
54        messages = [
55            ConversationMessage(
56                role=m["role"],
57                content=m["content"],
58                timestamp=datetime.fromisoformat(m["timestamp"]),
59                metadata=m.get("metadata", {}),
60            )
61            for m in data.get("messages", [])
62        ]
63        return cls(
64            session_id=data["session_id"],
65            messages=messages,
66            agent_metadata=data.get("agent_metadata", {}),
67            created_at=datetime.fromisoformat(data["created_at"]),
68            updated_at=datetime.fromisoformat(data["updated_at"]),
69        )
class InMemoryStateStore(gmf_forge_ai_orchestration.state.BaseStateStore):
11class InMemoryStateStore(BaseStateStore):
12    """
13    Thread-safe in-memory key-value store with optional TTL.
14
15    Suitable for single-process deployments and testing.  For multi-process
16    or persistent state, use RedisStateStore instead.
17    """
18
19    def __init__(self) -> None:
20        # values are (data, expires_at_monotonic | None)
21        self._store: Dict[str, Tuple[Any, Optional[float]]] = {}
22        self._lock = asyncio.Lock()
23
24    # ------------------------------------------------------------------
25    # Internal helpers
26    # ------------------------------------------------------------------
27
28    def _is_expired(self, expires_at: Optional[float]) -> bool:
29        if expires_at is None:
30            return False
31        return time.monotonic() > expires_at
32
33    def _clean_key(self, key: str) -> None:
34        """Remove a key if it has expired (lazy eviction)."""
35        entry = self._store.get(key)
36        if entry and self._is_expired(entry[1]):
37            del self._store[key]
38
39    # ------------------------------------------------------------------
40    # BaseStateStore implementation
41    # ------------------------------------------------------------------
42
43    async def get(self, key: str) -> Optional[Any]:
44        async with self._lock:
45            self._clean_key(key)
46            entry = self._store.get(key)
47            return entry[0] if entry else None
48
49    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
50        expires_at = time.monotonic() + ttl if ttl is not None else None
51        async with self._lock:
52            self._store[key] = (value, expires_at)
53
54    async def delete(self, key: str) -> None:
55        async with self._lock:
56            self._store.pop(key, None)
57
58    async def exists(self, key: str) -> bool:
59        async with self._lock:
60            self._clean_key(key)
61            return key in self._store
62
63    async def clear(self) -> None:
64        async with self._lock:
65            self._store.clear()

Thread-safe in-memory key-value store with optional TTL.

Suitable for single-process deployments and testing. For multi-process or persistent state, use RedisStateStore instead.

async def get(self, key: str) -> Optional[Any]:
43    async def get(self, key: str) -> Optional[Any]:
44        async with self._lock:
45            self._clean_key(key)
46            entry = self._store.get(key)
47            return entry[0] if entry else None

Retrieve a value by key. Returns None if not found or expired.

async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
49    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
50        expires_at = time.monotonic() + ttl if ttl is not None else None
51        async with self._lock:
52            self._store[key] = (value, expires_at)

Store a value.

Args: key: Storage key. value: JSON-serialisable value. ttl: Time-to-live in seconds. None means no expiry.

async def delete(self, key: str) -> None:
54    async def delete(self, key: str) -> None:
55        async with self._lock:
56            self._store.pop(key, None)

Remove a key. No-op if the key does not exist.

async def exists(self, key: str) -> bool:
58    async def exists(self, key: str) -> bool:
59        async with self._lock:
60            self._clean_key(key)
61            return key in self._store

Return True if the key exists and has not expired.

async def clear(self) -> None:
63    async def clear(self) -> None:
64        async with self._lock:
65            self._store.clear()

Delete all keys managed by this store instance.

class RedisStateStore(gmf_forge_ai_orchestration.state.BaseStateStore):
 10class RedisStateStore(BaseStateStore):
 11    """
 12    Redis-backed key-value store.
 13
 14    Uses redis.asyncio for non-blocking I/O. Values are JSON-serialised
 15    before storage and deserialised on retrieval.
 16
 17    Args:
 18        url: Redis connection URL (e.g. ``"redis://localhost:6379"``).
 19        key_prefix: Optional prefix applied to every key to avoid collisions
 20            when sharing a Redis instance across services. Defaults to
 21            ``"gmf_forge_ai:"``.
 22        decode_responses: Passed through to the Redis client. Defaults to True.
 23
 24    Example::
 25
 26        store = RedisStateStore(url="redis://localhost:6379")
 27        await store.set("session:abc", {"key": "value"}, ttl=3600)
 28        data = await store.get("session:abc")
 29    """
 30
 31    def __init__(
 32        self,
 33        url: str = "redis://localhost:6379",
 34        key_prefix: str = "gmf_forge_ai:",
 35        **redis_kwargs: Any,
 36    ) -> None:
 37        try:
 38            import redis.asyncio as aioredis  # type: ignore[import]
 39        except ImportError as exc:
 40            raise ImportError(
 41                "redis package is required for RedisStateStore. "
 42                "Install it with: pip install redis>=5.0.0"
 43            ) from exc
 44
 45        self._client = aioredis.from_url(url, decode_responses=True, **redis_kwargs)
 46        self._prefix = key_prefix
 47
 48    # ------------------------------------------------------------------
 49    # Internal helpers
 50    # ------------------------------------------------------------------
 51
 52    def _k(self, key: str) -> str:
 53        return f"{self._prefix}{key}"
 54
 55    @staticmethod
 56    def _serialize(value: Any) -> str:
 57        return json.dumps(value, default=str)
 58
 59    @staticmethod
 60    def _deserialize(raw: str) -> Any:
 61        return json.loads(raw)
 62
 63    # ------------------------------------------------------------------
 64    # BaseStateStore implementation
 65    # ------------------------------------------------------------------
 66
 67    async def get(self, key: str) -> Optional[Any]:
 68        raw = await self._client.get(self._k(key))
 69        if raw is None:
 70            return None
 71        return self._deserialize(raw)
 72
 73    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
 74        serialized = self._serialize(value)
 75        if ttl is not None:
 76            await self._client.setex(self._k(key), ttl, serialized)
 77        else:
 78            await self._client.set(self._k(key), serialized)
 79
 80    async def delete(self, key: str) -> None:
 81        await self._client.delete(self._k(key))
 82
 83    async def exists(self, key: str) -> bool:
 84        result = await self._client.exists(self._k(key))
 85        return bool(result)
 86
 87    async def clear(self) -> None:
 88        """Delete all keys matching this store's prefix."""
 89        pattern = f"{self._prefix}*"
 90        cursor = 0
 91        while True:
 92            cursor, keys = await self._client.scan(cursor, match=pattern, count=100)
 93            if keys:
 94                await self._client.delete(*keys)
 95            if cursor == 0:
 96                break
 97
 98    async def close(self) -> None:
 99        """Close the underlying Redis connection pool."""
100        await self._client.aclose()

Redis-backed key-value store.

Uses redis.asyncio for non-blocking I/O. Values are JSON-serialised before storage and deserialised on retrieval.

Args: url: Redis connection URL (e.g. "redis://localhost:6379"). key_prefix: Optional prefix applied to every key to avoid collisions when sharing a Redis instance across services. Defaults to "gmf_forge_ai:". decode_responses: Passed through to the Redis client. Defaults to True.

Example::

store = RedisStateStore(url="redis://localhost:6379")
await store.set("session:abc", {"key": "value"}, ttl=3600)
data = await store.get("session:abc")
RedisStateStore( url: str = 'redis://localhost:6379', key_prefix: str = 'gmf_forge_ai:', **redis_kwargs: Any)
31    def __init__(
32        self,
33        url: str = "redis://localhost:6379",
34        key_prefix: str = "gmf_forge_ai:",
35        **redis_kwargs: Any,
36    ) -> None:
37        try:
38            import redis.asyncio as aioredis  # type: ignore[import]
39        except ImportError as exc:
40            raise ImportError(
41                "redis package is required for RedisStateStore. "
42                "Install it with: pip install redis>=5.0.0"
43            ) from exc
44
45        self._client = aioredis.from_url(url, decode_responses=True, **redis_kwargs)
46        self._prefix = key_prefix
async def get(self, key: str) -> Optional[Any]:
67    async def get(self, key: str) -> Optional[Any]:
68        raw = await self._client.get(self._k(key))
69        if raw is None:
70            return None
71        return self._deserialize(raw)

Retrieve a value by key. Returns None if not found or expired.

async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
73    async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
74        serialized = self._serialize(value)
75        if ttl is not None:
76            await self._client.setex(self._k(key), ttl, serialized)
77        else:
78            await self._client.set(self._k(key), serialized)

Store a value.

Args: key: Storage key. value: JSON-serialisable value. ttl: Time-to-live in seconds. None means no expiry.

async def delete(self, key: str) -> None:
80    async def delete(self, key: str) -> None:
81        await self._client.delete(self._k(key))

Remove a key. No-op if the key does not exist.

async def exists(self, key: str) -> bool:
83    async def exists(self, key: str) -> bool:
84        result = await self._client.exists(self._k(key))
85        return bool(result)

Return True if the key exists and has not expired.

async def clear(self) -> None:
87    async def clear(self) -> None:
88        """Delete all keys matching this store's prefix."""
89        pattern = f"{self._prefix}*"
90        cursor = 0
91        while True:
92            cursor, keys = await self._client.scan(cursor, match=pattern, count=100)
93            if keys:
94                await self._client.delete(*keys)
95            if cursor == 0:
96                break

Delete all keys matching this store's prefix.

async def close(self) -> None:
 98    async def close(self) -> None:
 99        """Close the underlying Redis connection pool."""
100        await self._client.aclose()

Close the underlying Redis connection pool.

class StateStoreFactory:
 9class StateStoreFactory:
10    """
11    Creates configured state store instances.
12
13    Example::
14
15        # In-memory (default, no extra deps)
16        store = StateStoreFactory.create("memory")
17
18        # Redis
19        store = StateStoreFactory.create("redis", url="redis://localhost:6379")
20    """
21
22    @staticmethod
23    def create(
24        backend: Literal["memory", "redis"] = "memory",
25        **kwargs: Any,
26    ) -> BaseStateStore:
27        """
28        Instantiate a state store.
29
30        Args:
31            backend: ``"memory"`` or ``"redis"``.
32            **kwargs: Forwarded to the store constructor.
33                      For Redis: ``url``, ``key_prefix``, and any
34                      additional kwargs accepted by ``redis.asyncio.from_url``.
35
36        Returns:
37            A :class:`BaseStateStore` instance.
38
39        Raises:
40            ValueError: If an unknown backend name is provided.
41        """
42        if backend == "memory":
43            from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
44            return InMemoryStateStore()
45
46        if backend == "redis":
47            from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore
48            return RedisStateStore(**kwargs)
49
50        raise ValueError(
51            f"Unknown state store backend: {backend!r}. "
52            "Supported values: 'memory', 'redis'."
53        )

Creates configured state store instances.

Example::

# In-memory (default, no extra deps)
store = StateStoreFactory.create("memory")

# Redis
store = StateStoreFactory.create("redis", url="redis://localhost:6379")
@staticmethod
def create( backend: Literal['memory', 'redis'] = 'memory', **kwargs: Any) -> BaseStateStore:
22    @staticmethod
23    def create(
24        backend: Literal["memory", "redis"] = "memory",
25        **kwargs: Any,
26    ) -> BaseStateStore:
27        """
28        Instantiate a state store.
29
30        Args:
31            backend: ``"memory"`` or ``"redis"``.
32            **kwargs: Forwarded to the store constructor.
33                      For Redis: ``url``, ``key_prefix``, and any
34                      additional kwargs accepted by ``redis.asyncio.from_url``.
35
36        Returns:
37            A :class:`BaseStateStore` instance.
38
39        Raises:
40            ValueError: If an unknown backend name is provided.
41        """
42        if backend == "memory":
43            from gmf_forge_ai_orchestration.state.memory_store import InMemoryStateStore
44            return InMemoryStateStore()
45
46        if backend == "redis":
47            from gmf_forge_ai_orchestration.state.redis_store import RedisStateStore
48            return RedisStateStore(**kwargs)
49
50        raise ValueError(
51            f"Unknown state store backend: {backend!r}. "
52            "Supported values: 'memory', 'redis'."
53        )

Instantiate a state store.

Args: backend: "memory" or "redis". **kwargs: Forwarded to the store constructor. For Redis: url, key_prefix, and any additional kwargs accepted by redis.asyncio.from_url.

Returns: A BaseStateStore instance.

Raises: ValueError: If an unknown backend name is provided.

class CheckpointManager:
 15class CheckpointManager:
 16    """
 17    Saves and restores agent execution state checkpoints.
 18
 19    Works with any :class:`BaseStateStore` backend (in-memory or Redis).
 20
 21    Redis key structure
 22    -------------------
 23    Three key types are written per agent/execution::
 24
 25        Redis
 26
 27        ├── __ckpt_index__{agent_id}               (TTL: default_ttl, refreshed on each write)
 28        │   └── [ "ckpt-uuid-A", "ckpt-uuid-B", "ckpt-uuid-C", ... ]
 29        │         │                                  │
 30        │         │ execution-id-1                   │ execution-id-2
 31        │         ▼                                  ▼
 32        ├── __ckpt_exec__{execution_id_1}        __ckpt_exec__{execution_id_2}
 33        │   └── [ "ckpt-uuid-A",                 └── [ "ckpt-uuid-D",
 34        │          "ckpt-uuid-B",                       "ckpt-uuid-E",
 35        │          "ckpt-uuid-C" ]                      "ckpt-uuid-F" ]
 36        │         │                                      │
 37        │    step 0, 1, 2                           step 0, 1, 2
 38        │         ▼                                      ▼
 39        ├── __ckpt_data__{ckpt-uuid-A}  (TTL)     __ckpt_data__{ckpt-uuid-D}  (TTL)
 40        │   step_number: 0                         step_number: 0
 41        │   steps: [ action_0 ]                    steps: [ action_0 ]
 42
 43        ├── __ckpt_data__{ckpt-uuid-B}  (TTL)     __ckpt_data__{ckpt-uuid-E}  (TTL)
 44        │   step_number: 1                         step_number: 1
 45        │   steps: [ action_0, action_1 ]          steps: [ action_0, action_1 ]
 46
 47        └── __ckpt_data__{ckpt-uuid-C}  (TTL)     __ckpt_data__{ckpt-uuid-F}  (TTL)
 48            step_number: 2                         step_number: 2
 49            steps: [ action_0, action_1, action_2 ] steps: [ ... ]
 50
 51    Each ``__ckpt_data__`` key is a **cumulative snapshot** — step N contains all steps
 52    0..N, so only the last entry in the exec index is needed to fully resume.
 53
 54    All three key types share the same TTL (``default_ttl``).  Index and exec keys
 55    therefore expire together with the data they reference, leaving no stale entries.
 56
 57    Resuming an execution
 58    ---------------------
 59    ::
 60
 61        # 1. Look up all checkpoint IDs for the execution
 62        exec_key  →  __ckpt_exec__{execution_id}  →  [ id_0, id_1, id_2 ]
 63
 64        # 2. Load the last (highest step_number) checkpoint
 65        last_id   =  checkpoint_ids[-1]
 66        checkpoint = await manager.load(last_id)
 67
 68        # 3. The checkpoint.state contains the full steps list — resume from there
 69        steps_so_far = checkpoint.state["steps"]
 70
 71    Listing all checkpoints for an agent
 72    --------------------------------------
 73    ::
 74
 75        agent_key  →  __ckpt_index__{agent_id}  →  [ id_0, id_1, ... ]
 76        checkpoints = await manager.list(agent_id)
 77
 78    API usage
 79    ---------
 80    ``save()`` is called **internally by the agent** after each step — developers
 81    should not call it directly.  The developer-facing methods are:
 82
 83    ``load(checkpoint_id)``
 84        Load a single checkpoint by ID.  Returns ``None`` if expired or not found.
 85
 86    ``list(agent_id)``
 87        All checkpoints for an agent across all executions, oldest first.
 88        Expired entries are silently skipped.
 89
 90    ``list_by_execution(execution_id)``
 91        All checkpoints for a specific execution, oldest first.
 92
 93    ``load_latest_for_execution(execution_id)``
 94        The most recent (highest step_number) checkpoint for an execution —
 95        the primary entry point for resuming a task::
 96
 97            checkpoint = await manager.load_latest_for_execution(execution_id)
 98            if checkpoint:
 99                steps_so_far = checkpoint.state["steps"]
100                # hand steps_so_far back to the agent to continue from
101    """
102
103    def __init__(self, store: BaseStateStore, default_ttl: Optional[int] = None) -> None:
104        self._store = store
105        self._default_ttl = default_ttl
106
107    @property
108    def default_ttl(self) -> Optional[int]:
109        """TTL applied to checkpoint data keys when no explicit ttl is passed to save()."""
110        return self._default_ttl
111
112    # ------------------------------------------------------------------
113    # Public API
114    # ------------------------------------------------------------------
115
116    async def save(
117        self,
118        agent_id: str,
119        state: dict,
120        execution_id: Optional[str] = None,
121        metadata: Optional[dict] = None,
122        ttl: Optional[int] = None,
123    ) -> str:
124        """
125        Persist an agent state snapshot.
126
127        Returns:
128            The generated checkpoint_id.
129        """
130        checkpoint_id = str(uuid.uuid4())
131        checkpoint = Checkpoint(
132            checkpoint_id=checkpoint_id,
133            agent_id=agent_id,
134            state=state,
135            metadata=metadata or {},
136        )
137
138        # Store the checkpoint data
139        effective_ttl = ttl if ttl is not None else self._default_ttl
140        data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}"
141        await self._store.set(data_key, checkpoint.to_dict(), ttl=effective_ttl)
142
143        # Append to agent's checkpoint index
144        index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}"
145        index: List[str] = await self._store.get(index_key) or []
146        index.append(checkpoint_id)
147        await self._store.set(index_key, index, ttl=effective_ttl)
148
149        # Optionally index by execution_id for resume/query flows
150        if execution_id:
151            exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}"
152            exec_index: List[str] = await self._store.get(exec_key) or []
153            exec_index.append(checkpoint_id)
154            await self._store.set(exec_key, exec_index, ttl=effective_ttl)
155
156        return checkpoint_id
157
158    async def load(self, checkpoint_id: str) -> Optional[Checkpoint]:
159        """
160        Load a checkpoint by ID.
161
162        Returns:
163            The :class:`Checkpoint` or ``None`` if not found / expired.
164        """
165        data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}"
166        raw = await self._store.get(data_key)
167        if raw is None:
168            return None
169        return Checkpoint.from_dict(raw)
170
171    async def list(self, agent_id: str) -> List[Checkpoint]:
172        """
173        Return all checkpoints for an agent, oldest first.
174
175        Checkpoints that have been evicted (TTL expired) are silently skipped.
176        """
177        index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}"
178        checkpoint_ids: List[str] = await self._store.get(index_key) or []
179
180        checkpoints = []
181        for ckpt_id in checkpoint_ids:
182            checkpoint = await self.load(ckpt_id)
183            if checkpoint is not None:
184                checkpoints.append(checkpoint)
185        return checkpoints
186
187    async def list_by_execution(self, execution_id: str) -> List[Checkpoint]:
188        """Return all checkpoints for an execution, oldest first."""
189        exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}"
190        checkpoint_ids: List[str] = await self._store.get(exec_key) or []
191
192        checkpoints = []
193        for ckpt_id in checkpoint_ids:
194            checkpoint = await self.load(ckpt_id)
195            if checkpoint is not None:
196                checkpoints.append(checkpoint)
197        return checkpoints
198
199    async def load_latest_for_execution(self, execution_id: str) -> Optional[Checkpoint]:
200        """Return the latest checkpoint for an execution, if present."""
201        checkpoints = await self.list_by_execution(execution_id)
202        if not checkpoints:
203            return None
204        return checkpoints[-1]
205
206    async def delete(self, checkpoint_id: str, agent_id: str) -> None:
207        """Remove a specific checkpoint and its index entry."""
208        data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}"
209        await self._store.delete(data_key)
210
211        index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}"
212        index: List[str] = await self._store.get(index_key) or []
213        updated = [cid for cid in index if cid != checkpoint_id]
214        await self._store.set(index_key, updated)

Saves and restores agent execution state checkpoints.

Works with any BaseStateStore backend (in-memory or Redis).

Redis key structure

Three key types are written per agent/execution::

Redis
│
├── __ckpt_index__{agent_id}               (TTL: default_ttl, refreshed on each write)
│   └── [ "ckpt-uuid-A", "ckpt-uuid-B", "ckpt-uuid-C", ... ]
│         │                                  │
│         │ execution-id-1                   │ execution-id-2
│         ▼                                  ▼
├── __ckpt_exec__{execution_id_1}        __ckpt_exec__{execution_id_2}
│   └── [ "ckpt-uuid-A",                 └── [ "ckpt-uuid-D",
│          "ckpt-uuid-B",                       "ckpt-uuid-E",
│          "ckpt-uuid-C" ]                      "ckpt-uuid-F" ]
│         │                                      │
│    step 0, 1, 2                           step 0, 1, 2
│         ▼                                      ▼
├── __ckpt_data__{ckpt-uuid-A}  (TTL)     __ckpt_data__{ckpt-uuid-D}  (TTL)
│   step_number: 0                         step_number: 0
│   steps: [ action_0 ]                    steps: [ action_0 ]
│
├── __ckpt_data__{ckpt-uuid-B}  (TTL)     __ckpt_data__{ckpt-uuid-E}  (TTL)
│   step_number: 1                         step_number: 1
│   steps: [ action_0, action_1 ]          steps: [ action_0, action_1 ]
│
└── __ckpt_data__{ckpt-uuid-C}  (TTL)     __ckpt_data__{ckpt-uuid-F}  (TTL)
    step_number: 2                         step_number: 2
    steps: [ action_0, action_1, action_2 ] steps: [ ... ]

Each __ckpt_data__ key is a cumulative snapshot — step N contains all steps 0..N, so only the last entry in the exec index is needed to fully resume.

All three key types share the same TTL (default_ttl). Index and exec keys therefore expire together with the data they reference, leaving no stale entries.

Resuming an execution

::

# 1. Look up all checkpoint IDs for the execution
exec_key  →  __ckpt_exec__{execution_id}  →  [ id_0, id_1, id_2 ]

# 2. Load the last (highest step_number) checkpoint
last_id   =  checkpoint_ids[-1]
checkpoint = await manager.load(last_id)

# 3. The checkpoint.state contains the full steps list — resume from there
steps_so_far = checkpoint.state["steps"]

Listing all checkpoints for an agent

::

agent_key  →  __ckpt_index__{agent_id}  →  [ id_0, id_1, ... ]
checkpoints = await manager.list(agent_id)

API usage

save() is called internally by the agent after each step — developers should not call it directly. The developer-facing methods are:

load(checkpoint_id) Load a single checkpoint by ID. Returns None if expired or not found.

list(agent_id) All checkpoints for an agent across all executions, oldest first. Expired entries are silently skipped.

list_by_execution(execution_id) All checkpoints for a specific execution, oldest first.

load_latest_for_execution(execution_id) The most recent (highest step_number) checkpoint for an execution — the primary entry point for resuming a task::

    checkpoint = await manager.load_latest_for_execution(execution_id)
    if checkpoint:
        steps_so_far = checkpoint.state["steps"]
        # hand steps_so_far back to the agent to continue from
CheckpointManager( store: BaseStateStore, default_ttl: Optional[int] = None)
103    def __init__(self, store: BaseStateStore, default_ttl: Optional[int] = None) -> None:
104        self._store = store
105        self._default_ttl = default_ttl
default_ttl: Optional[int]
107    @property
108    def default_ttl(self) -> Optional[int]:
109        """TTL applied to checkpoint data keys when no explicit ttl is passed to save()."""
110        return self._default_ttl

TTL applied to checkpoint data keys when no explicit ttl is passed to save().

async def save( self, agent_id: str, state: dict, execution_id: Optional[str] = None, metadata: Optional[dict] = None, ttl: Optional[int] = None) -> str:
116    async def save(
117        self,
118        agent_id: str,
119        state: dict,
120        execution_id: Optional[str] = None,
121        metadata: Optional[dict] = None,
122        ttl: Optional[int] = None,
123    ) -> str:
124        """
125        Persist an agent state snapshot.
126
127        Returns:
128            The generated checkpoint_id.
129        """
130        checkpoint_id = str(uuid.uuid4())
131        checkpoint = Checkpoint(
132            checkpoint_id=checkpoint_id,
133            agent_id=agent_id,
134            state=state,
135            metadata=metadata or {},
136        )
137
138        # Store the checkpoint data
139        effective_ttl = ttl if ttl is not None else self._default_ttl
140        data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}"
141        await self._store.set(data_key, checkpoint.to_dict(), ttl=effective_ttl)
142
143        # Append to agent's checkpoint index
144        index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}"
145        index: List[str] = await self._store.get(index_key) or []
146        index.append(checkpoint_id)
147        await self._store.set(index_key, index, ttl=effective_ttl)
148
149        # Optionally index by execution_id for resume/query flows
150        if execution_id:
151            exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}"
152            exec_index: List[str] = await self._store.get(exec_key) or []
153            exec_index.append(checkpoint_id)
154            await self._store.set(exec_key, exec_index, ttl=effective_ttl)
155
156        return checkpoint_id

Persist an agent state snapshot.

Returns: The generated checkpoint_id.

async def load( self, checkpoint_id: str) -> Optional[Checkpoint]:
158    async def load(self, checkpoint_id: str) -> Optional[Checkpoint]:
159        """
160        Load a checkpoint by ID.
161
162        Returns:
163            The :class:`Checkpoint` or ``None`` if not found / expired.
164        """
165        data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}"
166        raw = await self._store.get(data_key)
167        if raw is None:
168            return None
169        return Checkpoint.from_dict(raw)

Load a checkpoint by ID.

Returns: The Checkpoint or None if not found / expired.

async def list( self, agent_id: str) -> List[Checkpoint]:
171    async def list(self, agent_id: str) -> List[Checkpoint]:
172        """
173        Return all checkpoints for an agent, oldest first.
174
175        Checkpoints that have been evicted (TTL expired) are silently skipped.
176        """
177        index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}"
178        checkpoint_ids: List[str] = await self._store.get(index_key) or []
179
180        checkpoints = []
181        for ckpt_id in checkpoint_ids:
182            checkpoint = await self.load(ckpt_id)
183            if checkpoint is not None:
184                checkpoints.append(checkpoint)
185        return checkpoints

Return all checkpoints for an agent, oldest first.

Checkpoints that have been evicted (TTL expired) are silently skipped.

async def list_by_execution( self, execution_id: str) -> List[Checkpoint]:
187    async def list_by_execution(self, execution_id: str) -> List[Checkpoint]:
188        """Return all checkpoints for an execution, oldest first."""
189        exec_key = f"{_CHECKPOINT_EXEC_PREFIX}{execution_id}"
190        checkpoint_ids: List[str] = await self._store.get(exec_key) or []
191
192        checkpoints = []
193        for ckpt_id in checkpoint_ids:
194            checkpoint = await self.load(ckpt_id)
195            if checkpoint is not None:
196                checkpoints.append(checkpoint)
197        return checkpoints

Return all checkpoints for an execution, oldest first.

async def load_latest_for_execution( self, execution_id: str) -> Optional[Checkpoint]:
199    async def load_latest_for_execution(self, execution_id: str) -> Optional[Checkpoint]:
200        """Return the latest checkpoint for an execution, if present."""
201        checkpoints = await self.list_by_execution(execution_id)
202        if not checkpoints:
203            return None
204        return checkpoints[-1]

Return the latest checkpoint for an execution, if present.

async def delete(self, checkpoint_id: str, agent_id: str) -> None:
206    async def delete(self, checkpoint_id: str, agent_id: str) -> None:
207        """Remove a specific checkpoint and its index entry."""
208        data_key = f"{_CHECKPOINT_DATA_PREFIX}{checkpoint_id}"
209        await self._store.delete(data_key)
210
211        index_key = f"{_CHECKPOINT_INDEX_PREFIX}{agent_id}"
212        index: List[str] = await self._store.get(index_key) or []
213        updated = [cid for cid in index if cid != checkpoint_id]
214        await self._store.set(index_key, updated)

Remove a specific checkpoint and its index entry.

class Blackboard:
 44class Blackboard:
 45    """
 46    Shared read/write board for multi-agent communication.
 47
 48    Agents can post results, facts, and partial outputs here so other agents
 49    in the same orchestration run can build on them. Backed by any
 50    :class:`BaseStateStore`.
 51
 52    Example::
 53
 54        board = Blackboard(store, namespace="run-abc")
 55        await board.write("search_results", docs, author="search_agent")
 56        results = await board.read("search_results")
 57        all_entries = await board.list_entries()
 58    """
 59
 60    def __init__(self, store: BaseStateStore, namespace: str = "default") -> None:
 61        self._store = store
 62        self._namespace = namespace
 63
 64    # ------------------------------------------------------------------
 65    # Internal helpers
 66    # ------------------------------------------------------------------
 67
 68    def _board_key(self) -> str:
 69        return f"{_BLACKBOARD_KEY}{self._namespace}"
 70
 71    async def _load(self) -> Dict[str, Dict[str, Any]]:
 72        """Load the full board dict from the store."""
 73        return await self._store.get(self._board_key()) or {}
 74
 75    async def _save(self, board: Dict[str, Dict[str, Any]]) -> None:
 76        await self._store.set(self._board_key(), board)
 77
 78    # ------------------------------------------------------------------
 79    # Public API
 80    # ------------------------------------------------------------------
 81
 82    async def write(
 83        self,
 84        key: str,
 85        value: Any,
 86        author: str,
 87        **metadata: Any,
 88    ) -> None:
 89        """Write or overwrite a value on the blackboard."""
 90        board = await self._load()
 91        entry = BlackboardEntry(key=key, value=value, author=author, metadata=dict(metadata))
 92        board[key] = entry.to_dict()
 93        await self._save(board)
 94
 95    async def read(self, key: str) -> Optional[Any]:
 96        """Read a value by key. Returns the raw value (not the full entry)."""
 97        board = await self._load()
 98        entry_dict = board.get(key)
 99        if entry_dict is None:
100            return None
101        return BlackboardEntry.from_dict(entry_dict).value
102
103    async def read_entry(self, key: str) -> Optional[BlackboardEntry]:
104        """Read the full :class:`BlackboardEntry` including author and timestamp."""
105        board = await self._load()
106        entry_dict = board.get(key)
107        return BlackboardEntry.from_dict(entry_dict) if entry_dict else None
108
109    async def list_entries(self) -> Dict[str, BlackboardEntry]:
110        """Return all entries on the board, keyed by their key name."""
111        board = await self._load()
112        return {k: BlackboardEntry.from_dict(v) for k, v in board.items()}
113
114    async def delete(self, key: str) -> None:
115        """Remove one entry from the blackboard."""
116        board = await self._load()
117        board.pop(key, None)
118        await self._save(board)
119
120    async def clear(self) -> None:
121        """Wipe the entire blackboard namespace."""
122        await self._store.delete(self._board_key())
123
124    async def keys(self) -> List[str]:
125        """Return the list of keys currently on the board."""
126        board = await self._load()
127        return list(board.keys())

Shared read/write board for multi-agent communication.

Agents can post results, facts, and partial outputs here so other agents in the same orchestration run can build on them. Backed by any BaseStateStore.

Example::

board = Blackboard(store, namespace="run-abc")
await board.write("search_results", docs, author="search_agent")
results = await board.read("search_results")
all_entries = await board.list_entries()
Blackboard( store: BaseStateStore, namespace: str = 'default')
60    def __init__(self, store: BaseStateStore, namespace: str = "default") -> None:
61        self._store = store
62        self._namespace = namespace
async def write(self, key: str, value: Any, author: str, **metadata: Any) -> None:
82    async def write(
83        self,
84        key: str,
85        value: Any,
86        author: str,
87        **metadata: Any,
88    ) -> None:
89        """Write or overwrite a value on the blackboard."""
90        board = await self._load()
91        entry = BlackboardEntry(key=key, value=value, author=author, metadata=dict(metadata))
92        board[key] = entry.to_dict()
93        await self._save(board)

Write or overwrite a value on the blackboard.

async def read(self, key: str) -> Optional[Any]:
 95    async def read(self, key: str) -> Optional[Any]:
 96        """Read a value by key. Returns the raw value (not the full entry)."""
 97        board = await self._load()
 98        entry_dict = board.get(key)
 99        if entry_dict is None:
100            return None
101        return BlackboardEntry.from_dict(entry_dict).value

Read a value by key. Returns the raw value (not the full entry).

async def read_entry( self, key: str) -> Optional[BlackboardEntry]:
103    async def read_entry(self, key: str) -> Optional[BlackboardEntry]:
104        """Read the full :class:`BlackboardEntry` including author and timestamp."""
105        board = await self._load()
106        entry_dict = board.get(key)
107        return BlackboardEntry.from_dict(entry_dict) if entry_dict else None

Read the full BlackboardEntry including author and timestamp.

async def list_entries( self) -> Dict[str, BlackboardEntry]:
109    async def list_entries(self) -> Dict[str, BlackboardEntry]:
110        """Return all entries on the board, keyed by their key name."""
111        board = await self._load()
112        return {k: BlackboardEntry.from_dict(v) for k, v in board.items()}

Return all entries on the board, keyed by their key name.

async def delete(self, key: str) -> None:
114    async def delete(self, key: str) -> None:
115        """Remove one entry from the blackboard."""
116        board = await self._load()
117        board.pop(key, None)
118        await self._save(board)

Remove one entry from the blackboard.

async def clear(self) -> None:
120    async def clear(self) -> None:
121        """Wipe the entire blackboard namespace."""
122        await self._store.delete(self._board_key())

Wipe the entire blackboard namespace.

async def keys(self) -> List[str]:
124    async def keys(self) -> List[str]:
125        """Return the list of keys currently on the board."""
126        board = await self._load()
127        return list(board.keys())

Return the list of keys currently on the board.

@dataclass
class BlackboardEntry:
14@dataclass
15class BlackboardEntry:
16    """A single entry written to the blackboard."""
17
18    key: str
19    value: Any
20    author: str
21    written_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
22    metadata: Dict[str, Any] = field(default_factory=dict)
23
24    def to_dict(self) -> Dict[str, Any]:
25        return {
26            "key": self.key,
27            "value": self.value,
28            "author": self.author,
29            "written_at": self.written_at.isoformat(),
30            "metadata": self.metadata,
31        }
32
33    @classmethod
34    def from_dict(cls, data: Dict[str, Any]) -> "BlackboardEntry":
35        return cls(
36            key=data["key"],
37            value=data["value"],
38            author=data["author"],
39            written_at=datetime.fromisoformat(data["written_at"]),
40            metadata=data.get("metadata", {}),
41        )

A single entry written to the blackboard.

BlackboardEntry( key: str, value: Any, author: str, written_at: datetime.datetime = <factory>, metadata: Dict[str, Any] = <factory>)
key: str
value: Any
author: str
written_at: datetime.datetime
metadata: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
24    def to_dict(self) -> Dict[str, Any]:
25        return {
26            "key": self.key,
27            "value": self.value,
28            "author": self.author,
29            "written_at": self.written_at.isoformat(),
30            "metadata": self.metadata,
31        }
@classmethod
def from_dict( cls, data: Dict[str, Any]) -> BlackboardEntry:
33    @classmethod
34    def from_dict(cls, data: Dict[str, Any]) -> "BlackboardEntry":
35        return cls(
36            key=data["key"],
37            value=data["value"],
38            author=data["author"],
39            written_at=datetime.fromisoformat(data["written_at"]),
40            metadata=data.get("metadata", {}),
41        )