gmf_forge_ai_data.indexing

Indexing module — schema provisioning for vector stores.

This module separates infrastructure concerns (index / container creation, HNSW tuning, partition key configuration) from application concerns (document CRUD and similarity search, handled by the vector_stores module).

Typical developer workflow

  1. Infrastructure step (run once per environment):

    Use a builder to create the backend index with the right schema and performance parameters::

    from gmf_forge_ai_data.indexing import AzureAISearchIndexBuilder
    
    builder = AzureAISearchIndexBuilder(
        endpoint="https://my-search.search.windows.net",
        api_key="...",
        index_name="policy_docs",
        embedding_dimension=1536,
        hnsw_m=4,
        hnsw_ef_construction=400,
        hnsw_ef_search=500,
        metric="cosine",
    )
    builder.create_index()
    
  2. Application step (each run):

    Construct the corresponding vector store and perform document operations::

    from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore
    
    store = AzureAISearchVectorStore(
        endpoint="https://my-search.search.windows.net",
        api_key="...",
        index_name="policy_docs",
        embedding_dimension=1536,
    )
    store.add_documents(chunks)
    results = store.search(query_embedding=embedding, top_k=5)
    

Available builders

 1"""
 2Indexing module — schema provisioning for vector stores.
 3
 4This module separates *infrastructure* concerns (index / container creation,
 5HNSW tuning, partition key configuration) from *application* concerns
 6(document CRUD and similarity search, handled by the ``vector_stores`` module).
 7
 8Typical developer workflow
 9--------------------------
10
111. **Infrastructure step** (run once per environment):
12
13   Use a builder to create the backend index with the right schema and
14   performance parameters::
15
16       from gmf_forge_ai_data.indexing import AzureAISearchIndexBuilder
17
18       builder = AzureAISearchIndexBuilder(
19           endpoint="https://my-search.search.windows.net",
20           api_key="...",
21           index_name="policy_docs",
22           embedding_dimension=1536,
23           hnsw_m=4,
24           hnsw_ef_construction=400,
25           hnsw_ef_search=500,
26           metric="cosine",
27       )
28       builder.create_index()
29
302. **Application step** (each run):
31
32   Construct the corresponding vector store and perform document operations::
33
34       from gmf_forge_ai_data.vector_stores import AzureAISearchVectorStore
35
36       store = AzureAISearchVectorStore(
37           endpoint="https://my-search.search.windows.net",
38           api_key="...",
39           index_name="policy_docs",
40           embedding_dimension=1536,
41       )
42       store.add_documents(chunks)
43       results = store.search(query_embedding=embedding, top_k=5)
44
45Available builders
46------------------
47- ``AzureAISearchIndexBuilder`` — Azure AI Search with full HNSW control
48- ``CosmosDBIndexBuilder``      — Cosmos DB NoSQL with vector embedding policy
49- ``MongoDBIndexBuilder``       — MongoDB Atlas with vector search + text index
50"""
51
52from .base_index_builder import BaseIndexBuilder
53from .azure_ai_search_index_builder import AzureAISearchIndexBuilder
54from .cosmos_db_index_builder import CosmosDBIndexBuilder
55from .mongodb_index_builder import MongoDBIndexBuilder
56
57__all__ = [
58    "BaseIndexBuilder",
59    "AzureAISearchIndexBuilder",
60    "CosmosDBIndexBuilder",
61    "MongoDBIndexBuilder",
62]
class BaseIndexBuilder(abc.ABC):
47class BaseIndexBuilder(ABC):
48    """
49    Abstract base class for index builders.
50
51    Each backend (Azure AI Search, Cosmos DB, MongoDB) provides a concrete
52    subclass that exposes backend-specific tuning parameters while sharing
53    the same management interface.
54    """
55
56    # ------------------------------------------------------------------ #
57    # Core lifecycle                                                       #
58    # ------------------------------------------------------------------ #
59
60    @abstractmethod
61    def create_index(self) -> None:
62        """Create the index if it does not already exist.
63
64        Safe to call multiple times — must be a no-op when the index exists.
65        Use this for idempotent provisioning (CI/CD pipelines, first-run
66        setup scripts).
67        """
68
69    @abstractmethod
70    def create_or_replace_index(self) -> None:
71        """Delete the index if it exists, then create it fresh.
72
73        Use this when you need to apply schema changes that cannot be done
74        via an in-place update (e.g. changing HNSW parameters or adding a
75        new vector field).
76
77        Warning: All documents are lost.  Only use in dev/staging or after a
78        full re-ingestion has been planned.
79        """
80
81    @abstractmethod
82    def delete_index(self) -> None:
83        """Permanently delete the index and all its documents.
84
85        Raises:
86            RuntimeError: If the index does not exist.
87        """
88
89    @abstractmethod
90    def index_exists(self) -> bool:
91        """Return True if the index currently exists, False otherwise."""
92
93    @abstractmethod
94    def list_indexes(self) -> List[str]:
95        """Return the names of all indexes on this backend/service."""

Abstract base class for index builders.

Each backend (Azure AI Search, Cosmos DB, MongoDB) provides a concrete subclass that exposes backend-specific tuning parameters while sharing the same management interface.

@abstractmethod
def create_index(self) -> None:
60    @abstractmethod
61    def create_index(self) -> None:
62        """Create the index if it does not already exist.
63
64        Safe to call multiple times — must be a no-op when the index exists.
65        Use this for idempotent provisioning (CI/CD pipelines, first-run
66        setup scripts).
67        """

Create the index if it does not already exist.

Safe to call multiple times — must be a no-op when the index exists. Use this for idempotent provisioning (CI/CD pipelines, first-run setup scripts).

@abstractmethod
def create_or_replace_index(self) -> None:
69    @abstractmethod
70    def create_or_replace_index(self) -> None:
71        """Delete the index if it exists, then create it fresh.
72
73        Use this when you need to apply schema changes that cannot be done
74        via an in-place update (e.g. changing HNSW parameters or adding a
75        new vector field).
76
77        Warning: All documents are lost.  Only use in dev/staging or after a
78        full re-ingestion has been planned.
79        """

Delete the index if it exists, then create it fresh.

Use this when you need to apply schema changes that cannot be done via an in-place update (e.g. changing HNSW parameters or adding a new vector field).

Warning: All documents are lost. Only use in dev/staging or after a full re-ingestion has been planned.

@abstractmethod
def delete_index(self) -> None:
81    @abstractmethod
82    def delete_index(self) -> None:
83        """Permanently delete the index and all its documents.
84
85        Raises:
86            RuntimeError: If the index does not exist.
87        """

Permanently delete the index and all its documents.

Raises: RuntimeError: If the index does not exist.

@abstractmethod
def index_exists(self) -> bool:
89    @abstractmethod
90    def index_exists(self) -> bool:
91        """Return True if the index currently exists, False otherwise."""

Return True if the index currently exists, False otherwise.

@abstractmethod
def list_indexes(self) -> List[str]:
93    @abstractmethod
94    def list_indexes(self) -> List[str]:
95        """Return the names of all indexes on this backend/service."""

Return the names of all indexes on this backend/service.

class AzureAISearchIndexBuilder(gmf_forge_ai_data.indexing.BaseIndexBuilder):
 91class AzureAISearchIndexBuilder(BaseIndexBuilder):
 92    """
 93    Builds and manages Azure AI Search indexes with full developer control.
 94
 95    The builder owns *schema* concerns only.  Document operations (add,
 96    search, delete) belong to ``AzureAISearchVectorStore``.
 97
 98    Parameters
 99    ----------
100    endpoint:
101        Azure AI Search service endpoint URL.
102    api_key:
103        Azure AI Search admin API key. Use for local development or
104        when managed identity is not available.
105    token_provider:
106        Zero-argument callable that returns a bearer token string.
107        Use for managed identity / workload identity scenarios.
108        The callable must request the **Azure AI Search** scope::
109
110            from azure.identity import DefaultAzureCredential, get_bearer_token_provider
111            token_provider = get_bearer_token_provider(
112                DefaultAzureCredential(),
113                "https://search.azure.com/.default"
114            )
115
116        Note: this scope is different from Azure OpenAI / Cognitive Services
117        (``https://cognitiveservices.azure.com/.default``) — each service
118        requires its own token_provider.
119    index_name:
120        Name of the index to create / manage.
121    embedding_dimension:
122        Number of dimensions in the embedding vectors (must match the
123        embedding model — e.g. 1536 for text-embedding-ada-002, 3072 for
124        text-embedding-3-large).
125    document_type:
126        Optional Document subclass.  When provided, all dataclass fields
127        not in the base Document are automatically added as indexed fields
128        (filterable, sortable, facetable where appropriate).
129    hnsw_m:
130        Number of bi-directional links created per node.  Higher = better
131        recall but more memory.  Typical range 4–16.  Default: 4.
132    hnsw_ef_construction:
133        Size of the candidate list during index construction.  Higher =
134        better recall, slower build time.  Typical range 100–800.
135        Default: 400.
136    hnsw_ef_search:
137        Size of the candidate list during search.  Higher = better recall,
138        slower queries.  Typical range 100–1000.  Default: 500.
139    metric:
140        Similarity metric.  One of ``"cosine"``, ``"euclidean"``,
141        ``"dotProduct"``.  Default: ``"cosine"``.
142    ssl_cert_path:
143        Optional path to a PEM certificate bundle for corporate SSL
144        inspection proxies.  Sets ``REQUESTS_CA_BUNDLE`` and
145        ``SSL_CERT_FILE`` environment variables before building the client.
146    semantic_config:
147        Optional semantic search configuration.  When provided the index is
148        provisioned with a ``SemanticSearch`` configuration that enables
149        Azure AI semantic reranking (``BoostedRerankerScore``).
150
151        Expected keys:
152
153        - ``name`` (str) — semantic config name (default
154          ``"default-semantic-config"``)
155        - ``title_field`` (str, optional) — field used as the document title
156        - ``content_fields`` (list[str]) — primary body content fields
157        - ``keyword_fields`` (list[str], optional) — keyword/facet fields
158
159        Example::
160
161            {
162                "name": "policyhub-semantic-config",
163                "title_field": "document_name",
164                "content_fields": ["content"],
165                "keyword_fields": ["language", "locale", "source"],
166            }
167    """
168
169    def __init__(
170        self,
171        endpoint: str,
172        index_name: str,
173        api_key: Optional[str] = None,
174        token_provider: Optional[Callable[[], str]] = None,
175        embedding_dimension: int = 1536,
176        document_type: Type[Document] = Document,
177        hnsw_m: int = 4,
178        hnsw_ef_construction: int = 400,
179        hnsw_ef_search: int = 500,
180        metric: str = "cosine",
181        ssl_cert_path: Optional[str] = None,
182        semantic_config: Optional[dict] = None,
183    ) -> None:
184        self.index_name = index_name
185        self.embedding_dimension = embedding_dimension
186        self.document_type = document_type
187        self.hnsw_m = hnsw_m
188        self.hnsw_ef_construction = hnsw_ef_construction
189        self.hnsw_ef_search = hnsw_ef_search
190        self.metric = metric
191        self.semantic_config = semantic_config
192
193        if not api_key and not token_provider:
194            raise ValueError(
195                "Either api_key or token_provider must be supplied to AzureAISearchIndexBuilder."
196            )
197
198        if ssl_cert_path:
199            import os as _os
200            _os.environ.setdefault("REQUESTS_CA_BUNDLE", ssl_cert_path)
201            _os.environ.setdefault("SSL_CERT_FILE", ssl_cert_path)
202
203        if token_provider:
204            credential = _TokenProviderCredential(token_provider)
205        else:
206            credential = AzureKeyCredential(api_key)
207        self._index_client = SearchIndexClient(
208            endpoint=endpoint,
209            credential=credential,
210        )
211
212    # ------------------------------------------------------------------ #
213    # BaseIndexBuilder interface                                           #
214    # ------------------------------------------------------------------ #
215
216    def create_index(self) -> None:
217        """Create the index if it does not already exist (idempotent)."""
218        if self.index_exists():
219            logger.info("Index already exists — skipping creation", index=self.index_name)
220            return
221        self._create(self.index_name)
222        logger.info("Index created successfully", index=self.index_name)
223
224    def create_or_replace_index(self) -> None:
225        """Delete the existing index (if any) then create it fresh.
226
227        Warning: All documents are permanently lost.
228        """
229        if self.index_exists():
230            self._index_client.delete_index(self.index_name)
231            logger.info("Index deleted for replacement", index=self.index_name)
232        self._create(self.index_name)
233        logger.info("Index created (replaced)", index=self.index_name)
234
235    def delete_index(self) -> None:
236        """Permanently delete the index and all its documents.
237
238        Raises:
239            RuntimeError: If the index does not exist.
240        """
241        if not self.index_exists():
242            raise RuntimeError(
243                f"Cannot delete index '{self.index_name}': it does not exist."
244            )
245        self._index_client.delete_index(self.index_name)
246        logger.info("Index deleted", index=self.index_name)
247
248    def index_exists(self) -> bool:
249        """Return True if the index currently exists."""
250        try:
251            self._index_client.get_index(self.index_name)
252            return True
253        except ResourceNotFoundError:
254            return False
255        except Exception:
256            # Treat any other error as non-existence to keep callers safe
257            return False
258
259    def list_indexes(self) -> List[str]:
260        """Return the names of all indexes on this Azure AI Search service."""
261        return [idx.name for idx in self._index_client.list_indexes()]
262
263    # ------------------------------------------------------------------ #
264    # Internal helpers                                                     #
265    # ------------------------------------------------------------------ #
266
267    def _build_fields(self) -> list:
268        """Build the Azure Search field list from base + document_type fields."""
269        fields = [
270            SimpleField(
271                name="id",
272                type=SearchFieldDataType.String,
273                key=True,
274                filterable=True,
275            ),
276            SearchableField(
277                name="content",
278                type=SearchFieldDataType.String,
279                searchable=True,
280            ),
281            SearchField(
282                name="embedding",
283                type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
284                searchable=True,
285                vector_search_dimensions=self.embedding_dimension,
286                vector_search_profile_name="default-vector-profile",
287            ),
288            SimpleField(
289                name="timestamp",
290                type=SearchFieldDataType.DateTimeOffset,
291                filterable=True,
292                sortable=True,
293            ),
294            # Stores serialised metadata dict and any non-indexed custom fields
295            SimpleField(
296                name="document_data",
297                type=SearchFieldDataType.String,
298                filterable=False,
299            ),
300        ]
301
302        # Infer custom fields from the document_type dataclass
303        if dataclasses.is_dataclass(self.document_type):
304            base_field_names = {"id", "content", "embedding", "timestamp", "metadata"}
305            for field in dataclasses.fields(self.document_type):
306                if field.name in base_field_names:
307                    continue
308                azure_type = self._map_python_type(field.type)
309                if azure_type is None:
310                    continue
311                scalar_types = {
312                    SearchFieldDataType.String,
313                    SearchFieldDataType.Int32,
314                    SearchFieldDataType.Int64,
315                    SearchFieldDataType.Double,
316                    SearchFieldDataType.DateTimeOffset,
317                    SearchFieldDataType.Boolean,
318                }
319                # Per-field overrides from dataclass field metadata.
320                # Falls back to the original defaults when not specified, so
321                # existing document types without metadata are unaffected.
322                meta = field.metadata
323                is_searchable = meta.get("searchable", False)
324                is_filterable = meta.get("filterable", True)
325                is_sortable = meta.get("sortable", azure_type in scalar_types)
326                is_facetable = meta.get(
327                    "facetable",
328                    azure_type in {SearchFieldDataType.String, SearchFieldDataType.Boolean},
329                )
330                if is_searchable:
331                    fields.append(
332                        SearchableField(
333                            name=field.name,
334                            filterable=is_filterable,
335                            sortable=is_sortable,
336                            facetable=is_facetable,
337                        )
338                    )
339                else:
340                    fields.append(
341                        SimpleField(
342                            name=field.name,
343                            type=azure_type,
344                            filterable=is_filterable,
345                            sortable=is_sortable,
346                            facetable=is_facetable,
347                        )
348                    )
349                logger.info(
350                    "Added indexed field",
351                    field=field.name,
352                    azure_type=str(azure_type),
353                    searchable=is_searchable,
354                )
355
356        return fields
357
358    def _create(self, index_name: str) -> None:
359        """Internal: build and submit the index definition to Azure."""
360        fields = self._build_fields()
361
362        vector_search = VectorSearch(
363            algorithms=[
364                HnswAlgorithmConfiguration(
365                    name="default-hnsw",
366                    parameters={
367                        "m": self.hnsw_m,
368                        "efConstruction": self.hnsw_ef_construction,
369                        "efSearch": self.hnsw_ef_search,
370                        "metric": self.metric,
371                    },
372                )
373            ],
374            profiles=[
375                VectorSearchProfile(
376                    name="default-vector-profile",
377                    algorithm_configuration_name="default-hnsw",
378                )
379            ],
380        )
381
382        semantic_search = None
383        if self.semantic_config:
384            sc = self.semantic_config
385            title_field = (
386                SemanticField(field_name=sc["title_field"])
387                if sc.get("title_field") else None
388            )
389            semantic_search = SemanticSearch(
390                configurations=[
391                    SemanticConfiguration(
392                        name=sc.get("name", "default-semantic-config"),
393                        prioritized_fields=SemanticPrioritizedFields(
394                            title_field=title_field,
395                            content_fields=[
396                                SemanticField(field_name=f)
397                                for f in sc.get("content_fields", [])
398                            ],
399                            keywords_fields=[
400                                SemanticField(field_name=f)
401                                for f in sc.get("keyword_fields", [])
402                            ],
403                        ),
404                    )
405                ]
406            )
407
408        index = SearchIndex(
409            name=index_name,
410            fields=fields,
411            vector_search=vector_search,
412            semantic_search=semantic_search,
413        )
414
415        self._index_client.create_index(index)
416        logger.info(
417            "Azure AI Search index provisioned",
418            index=index_name,
419            dim=self.embedding_dimension,
420            metric=self.metric,
421            hnsw_m=self.hnsw_m,
422            ef_construction=self.hnsw_ef_construction,
423            ef_search=self.hnsw_ef_search,
424            fields=len(fields),
425        )
426
427    @staticmethod
428    def _map_python_type(python_type) -> Optional[SearchFieldDataType]:
429        """Map a Python / dataclass field type to an Azure Search field type."""
430        _map = {
431            str: SearchFieldDataType.String,
432            int: SearchFieldDataType.Int64,
433            float: SearchFieldDataType.Double,
434            bool: SearchFieldDataType.Boolean,
435            datetime: SearchFieldDataType.DateTimeOffset,
436        }
437
438        # Handle Optional[X]  →  extract X
439        if hasattr(python_type, "__origin__"):
440            args = getattr(python_type, "__args__", ())
441            for arg in args:
442                if arg is type(None):
443                    continue
444                return _map.get(arg, SearchFieldDataType.String)
445
446        # Handle string annotations
447        if isinstance(python_type, str):
448            s = python_type.lower()
449            if "datetime" in s:
450                return SearchFieldDataType.DateTimeOffset
451            if "int" in s:
452                return SearchFieldDataType.Int64
453            if "float" in s or "double" in s:
454                return SearchFieldDataType.Double
455            if "bool" in s:
456                return SearchFieldDataType.Boolean
457            return SearchFieldDataType.String
458
459        return _map.get(python_type, SearchFieldDataType.String)

Builds and manages Azure AI Search indexes with full developer control.

The builder owns schema concerns only. Document operations (add, search, delete) belong to AzureAISearchVectorStore.

Parameters

endpoint: Azure AI Search service endpoint URL. api_key: Azure AI Search admin API key. Use for local development or when managed identity is not available. token_provider: Zero-argument callable that returns a bearer token string. Use for managed identity / workload identity scenarios. The callable must request the Azure AI Search scope::

    from azure.identity import DefaultAzureCredential, get_bearer_token_provider
    token_provider = get_bearer_token_provider(
        DefaultAzureCredential(),
        "https://search.azure.com/.default"
    )

Note: this scope is different from Azure OpenAI / Cognitive Services
(``https://cognitiveservices.azure.com/.default``) — each service
requires its own token_provider.

index_name: Name of the index to create / manage. embedding_dimension: Number of dimensions in the embedding vectors (must match the embedding model — e.g. 1536 for text-embedding-ada-002, 3072 for text-embedding-3-large). document_type: Optional Document subclass. When provided, all dataclass fields not in the base Document are automatically added as indexed fields (filterable, sortable, facetable where appropriate). hnsw_m: Number of bi-directional links created per node. Higher = better recall but more memory. Typical range 4–16. Default: 4. hnsw_ef_construction: Size of the candidate list during index construction. Higher = better recall, slower build time. Typical range 100–800. Default: 400. hnsw_ef_search: Size of the candidate list during search. Higher = better recall, slower queries. Typical range 100–1000. Default: 500. metric: Similarity metric. One of "cosine", "euclidean", "dotProduct". Default: "cosine". ssl_cert_path: Optional path to a PEM certificate bundle for corporate SSL inspection proxies. Sets REQUESTS_CA_BUNDLE and SSL_CERT_FILE environment variables before building the client. semantic_config: Optional semantic search configuration. When provided the index is provisioned with a SemanticSearch configuration that enables Azure AI semantic reranking (BoostedRerankerScore).

Expected keys:

- ``name`` (str) — semantic config name (default
  ``"default-semantic-config"``)
- ``title_field`` (str, optional) — field used as the document title
- ``content_fields`` (list[str]) — primary body content fields
- ``keyword_fields`` (list[str], optional) — keyword/facet fields

Example::

    {
        "name": "policyhub-semantic-config",
        "title_field": "document_name",
        "content_fields": ["content"],
        "keyword_fields": ["language", "locale", "source"],
    }
AzureAISearchIndexBuilder( endpoint: str, index_name: str, api_key: Optional[str] = None, token_provider: Optional[Callable[[], str]] = None, embedding_dimension: int = 1536, document_type: Type[gmf_forge_ai_data.Document] = <class 'gmf_forge_ai_data.Document'>, hnsw_m: int = 4, hnsw_ef_construction: int = 400, hnsw_ef_search: int = 500, metric: str = 'cosine', ssl_cert_path: Optional[str] = None, semantic_config: Optional[dict] = None)
169    def __init__(
170        self,
171        endpoint: str,
172        index_name: str,
173        api_key: Optional[str] = None,
174        token_provider: Optional[Callable[[], str]] = None,
175        embedding_dimension: int = 1536,
176        document_type: Type[Document] = Document,
177        hnsw_m: int = 4,
178        hnsw_ef_construction: int = 400,
179        hnsw_ef_search: int = 500,
180        metric: str = "cosine",
181        ssl_cert_path: Optional[str] = None,
182        semantic_config: Optional[dict] = None,
183    ) -> None:
184        self.index_name = index_name
185        self.embedding_dimension = embedding_dimension
186        self.document_type = document_type
187        self.hnsw_m = hnsw_m
188        self.hnsw_ef_construction = hnsw_ef_construction
189        self.hnsw_ef_search = hnsw_ef_search
190        self.metric = metric
191        self.semantic_config = semantic_config
192
193        if not api_key and not token_provider:
194            raise ValueError(
195                "Either api_key or token_provider must be supplied to AzureAISearchIndexBuilder."
196            )
197
198        if ssl_cert_path:
199            import os as _os
200            _os.environ.setdefault("REQUESTS_CA_BUNDLE", ssl_cert_path)
201            _os.environ.setdefault("SSL_CERT_FILE", ssl_cert_path)
202
203        if token_provider:
204            credential = _TokenProviderCredential(token_provider)
205        else:
206            credential = AzureKeyCredential(api_key)
207        self._index_client = SearchIndexClient(
208            endpoint=endpoint,
209            credential=credential,
210        )
index_name
embedding_dimension
document_type
hnsw_m
hnsw_ef_construction
metric
semantic_config
def create_index(self) -> None:
216    def create_index(self) -> None:
217        """Create the index if it does not already exist (idempotent)."""
218        if self.index_exists():
219            logger.info("Index already exists — skipping creation", index=self.index_name)
220            return
221        self._create(self.index_name)
222        logger.info("Index created successfully", index=self.index_name)

Create the index if it does not already exist (idempotent).

def create_or_replace_index(self) -> None:
224    def create_or_replace_index(self) -> None:
225        """Delete the existing index (if any) then create it fresh.
226
227        Warning: All documents are permanently lost.
228        """
229        if self.index_exists():
230            self._index_client.delete_index(self.index_name)
231            logger.info("Index deleted for replacement", index=self.index_name)
232        self._create(self.index_name)
233        logger.info("Index created (replaced)", index=self.index_name)

Delete the existing index (if any) then create it fresh.

Warning: All documents are permanently lost.

def delete_index(self) -> None:
235    def delete_index(self) -> None:
236        """Permanently delete the index and all its documents.
237
238        Raises:
239            RuntimeError: If the index does not exist.
240        """
241        if not self.index_exists():
242            raise RuntimeError(
243                f"Cannot delete index '{self.index_name}': it does not exist."
244            )
245        self._index_client.delete_index(self.index_name)
246        logger.info("Index deleted", index=self.index_name)

Permanently delete the index and all its documents.

Raises: RuntimeError: If the index does not exist.

def index_exists(self) -> bool:
248    def index_exists(self) -> bool:
249        """Return True if the index currently exists."""
250        try:
251            self._index_client.get_index(self.index_name)
252            return True
253        except ResourceNotFoundError:
254            return False
255        except Exception:
256            # Treat any other error as non-existence to keep callers safe
257            return False

Return True if the index currently exists.

def list_indexes(self) -> List[str]:
259    def list_indexes(self) -> List[str]:
260        """Return the names of all indexes on this Azure AI Search service."""
261        return [idx.name for idx in self._index_client.list_indexes()]

Return the names of all indexes on this Azure AI Search service.

class CosmosDBIndexBuilder(gmf_forge_ai_data.indexing.BaseIndexBuilder):
 67class CosmosDBIndexBuilder(BaseIndexBuilder):
 68    """
 69    Builds and manages Cosmos DB databases and containers for vector search.
 70
 71    The builder owns *schema / provisioning* concerns only.  Document
 72    operations (add, search, delete) belong to ``AzureCosmosDBVectorStore``.
 73
 74    Parameters
 75    ----------
 76    endpoint:
 77        Cosmos DB account endpoint URL.
 78    api_key:
 79        Cosmos DB account primary or secondary key.
 80    database_name:
 81        Name of the Cosmos DB database to create / manage.
 82    container_name:
 83        Name of the container to create / manage.
 84    embedding_dimension:
 85        Number of dimensions in the embedding vectors.
 86    distance_function:
 87        Vector similarity function.  ``"cosine"`` (default), ``"euclidean"``,
 88        or ``"dotproduct"``.
 89    vector_index_type:
 90        Index structure for vector search.  ``"quantizedFlat"`` (default,
 91        lower memory) or ``"diskANN"`` (higher recall on large datasets).
 92    partition_key:
 93        Cosmos DB partition key path.  Default: ``"/id"``.
 94    throughput:
 95        Manual RU/s throughput for the container.  ``None`` uses the Cosmos
 96        DB account default.  Ignored if the container already exists.
 97    ssl_cert_path:
 98        Optional path to a PEM certificate bundle for corporate SSL
 99        inspection proxies.
100    """
101
102    def __init__(
103        self,
104        endpoint: str,
105        api_key: str,
106        database_name: str,
107        container_name: str,
108        embedding_dimension: int = 1536,
109        distance_function: DistanceFunction = "cosine",
110        vector_index_type: VectorIndexType = "quantizedFlat",
111        partition_key: str = "/id",
112        throughput: Optional[int] = None,
113        ssl_cert_path: Optional[str] = None,
114    ) -> None:
115        self.database_name = database_name
116        self.container_name = container_name
117        self.embedding_dimension = embedding_dimension
118        self.distance_function = distance_function
119        self.vector_index_type = vector_index_type
120        self.partition_key = partition_key
121        self.throughput = throughput
122        self._ssl_cert_path = ssl_cert_path
123        self._endpoint = endpoint
124        self._api_key = api_key
125
126        self._client = self._build_client(endpoint, api_key, ssl_cert_path)
127
128    # ------------------------------------------------------------------ #
129    # BaseIndexBuilder interface                                           #
130    # ------------------------------------------------------------------ #
131
132    def create_index(self) -> None:
133        """Create the Cosmos DB database and container if they don't exist.
134
135        Safe to call multiple times — no-op if both already exist.
136        """
137        self._ensure_database()
138        if self.index_exists():
139            logger.info(
140                "Cosmos DB container already exists — skipping creation",
141                database=self.database_name,
142                container=self.container_name,
143            )
144            return
145        self._create_container()
146        logger.info(
147            "Cosmos DB container created",
148            database=self.database_name,
149            container=self.container_name,
150            dim=self.embedding_dimension,
151            distance_function=self.distance_function,
152            vector_index_type=self.vector_index_type,
153        )
154
155    def create_or_replace_index(self) -> None:
156        """Delete the container if it exists then create it fresh.
157
158        Warning: All documents are permanently lost.
159        """
160        self._ensure_database()
161        if self.index_exists():
162            db = self._client.get_database_client(self.database_name)
163            db.delete_container(self.container_name)
164            logger.info(
165                "Cosmos DB container deleted for replacement",
166                database=self.database_name,
167                container=self.container_name,
168            )
169        self._create_container()
170        logger.info(
171            "Cosmos DB container created (replaced)",
172            database=self.database_name,
173            container=self.container_name,
174        )
175
176    def delete_index(self) -> None:
177        """Permanently delete the container and all its documents.
178
179        Raises:
180            RuntimeError: If the container does not exist.
181        """
182        if not self.index_exists():
183            raise RuntimeError(
184                f"Cannot delete container '{self.database_name}/{self.container_name}': "
185                "it does not exist."
186            )
187        db = self._client.get_database_client(self.database_name)
188        db.delete_container(self.container_name)
189        logger.info(
190            "Cosmos DB container deleted",
191            database=self.database_name,
192            container=self.container_name,
193        )
194
195    def index_exists(self) -> bool:
196        """Return True if the container currently exists."""
197        try:
198            db = self._client.get_database_client(self.database_name)
199            db.get_container_client(self.container_name).read()
200            return True
201        except Exception:
202            return False
203
204    def list_indexes(self) -> List[str]:
205        """Return the names of all containers in the database."""
206        try:
207            db = self._client.get_database_client(self.database_name)
208            return [c["id"] for c in db.list_containers()]
209        except Exception:
210            return []
211
212    # ------------------------------------------------------------------ #
213    # Internal helpers                                                     #
214    # ------------------------------------------------------------------ #
215
216    @staticmethod
217    def _build_client(endpoint: str, api_key: str, ssl_cert_path: Optional[str]):
218        """Build a CosmosClient, optionally with a corporate SSL bundle."""
219        from azure.cosmos import CosmosClient
220        kwargs = {"url": endpoint, "credential": api_key}
221        if ssl_cert_path:
222            import ssl, os
223            os.environ.setdefault("REQUESTS_CA_BUNDLE", ssl_cert_path)
224            os.environ.setdefault("SSL_CERT_FILE", ssl_cert_path)
225        return CosmosClient(**kwargs)
226
227    def _ensure_database(self) -> None:
228        """Create the database if it does not already exist."""
229        self._client.create_database_if_not_exists(self.database_name)
230
231    def _create_container(self) -> None:
232        """Create the container with vector embedding and indexing policies."""
233        from azure.cosmos import PartitionKey
234        from azure.cosmos.exceptions import CosmosHttpResponseError
235
236        db = self._client.get_database_client(self.database_name)
237
238        vector_embedding_policy = {
239            "vectorEmbeddings": [
240                {
241                    "path": "/embedding",
242                    "dataType": "float32",
243                    "distanceFunction": self.distance_function,
244                    "dimensions": self.embedding_dimension,
245                }
246            ]
247        }
248
249        indexing_policy = {
250            "includedPaths": [{"path": "/*"}],
251            "excludedPaths": [{"path": "/embedding/*"}],
252            "vectorIndexes": [
253                {"path": "/embedding", "type": self.vector_index_type}
254            ],
255        }
256
257        kwargs = dict(
258            id=self.container_name,
259            partition_key=PartitionKey(path=self.partition_key),
260            vector_embedding_policy=vector_embedding_policy,
261            indexing_policy=indexing_policy,
262        )
263        if self.throughput is not None:
264            kwargs["offer_throughput"] = self.throughput
265
266        try:
267            db.create_container(**kwargs)
268        except CosmosHttpResponseError as exc:
269            if "Vector Policy" in str(exc) or "capability" in str(exc):
270                raise RuntimeError(
271                    "Vector Search capability is not enabled on this Cosmos DB account. "
272                    "Enable via: az cosmosdb update --resource-group <RG> "
273                    "--name <ACCOUNT> --capabilities EnableNoSQLVectorSearch"
274                ) from exc
275            raise

Builds and manages Cosmos DB databases and containers for vector search.

The builder owns schema / provisioning concerns only. Document operations (add, search, delete) belong to AzureCosmosDBVectorStore.

Parameters

endpoint: Cosmos DB account endpoint URL. api_key: Cosmos DB account primary or secondary key. database_name: Name of the Cosmos DB database to create / manage. container_name: Name of the container to create / manage. embedding_dimension: Number of dimensions in the embedding vectors. distance_function: Vector similarity function. "cosine" (default), "euclidean", or "dotproduct". vector_index_type: Index structure for vector search. "quantizedFlat" (default, lower memory) or "diskANN" (higher recall on large datasets). partition_key: Cosmos DB partition key path. Default: "/id". throughput: Manual RU/s throughput for the container. None uses the Cosmos DB account default. Ignored if the container already exists. ssl_cert_path: Optional path to a PEM certificate bundle for corporate SSL inspection proxies.

CosmosDBIndexBuilder( endpoint: str, api_key: str, database_name: str, container_name: str, embedding_dimension: int = 1536, distance_function: Literal['cosine', 'euclidean', 'dotproduct'] = 'cosine', vector_index_type: Literal['quantizedFlat', 'diskANN'] = 'quantizedFlat', partition_key: str = '/id', throughput: Optional[int] = None, ssl_cert_path: Optional[str] = None)
102    def __init__(
103        self,
104        endpoint: str,
105        api_key: str,
106        database_name: str,
107        container_name: str,
108        embedding_dimension: int = 1536,
109        distance_function: DistanceFunction = "cosine",
110        vector_index_type: VectorIndexType = "quantizedFlat",
111        partition_key: str = "/id",
112        throughput: Optional[int] = None,
113        ssl_cert_path: Optional[str] = None,
114    ) -> None:
115        self.database_name = database_name
116        self.container_name = container_name
117        self.embedding_dimension = embedding_dimension
118        self.distance_function = distance_function
119        self.vector_index_type = vector_index_type
120        self.partition_key = partition_key
121        self.throughput = throughput
122        self._ssl_cert_path = ssl_cert_path
123        self._endpoint = endpoint
124        self._api_key = api_key
125
126        self._client = self._build_client(endpoint, api_key, ssl_cert_path)
database_name
container_name
embedding_dimension
distance_function
vector_index_type
partition_key
throughput
def create_index(self) -> None:
132    def create_index(self) -> None:
133        """Create the Cosmos DB database and container if they don't exist.
134
135        Safe to call multiple times — no-op if both already exist.
136        """
137        self._ensure_database()
138        if self.index_exists():
139            logger.info(
140                "Cosmos DB container already exists — skipping creation",
141                database=self.database_name,
142                container=self.container_name,
143            )
144            return
145        self._create_container()
146        logger.info(
147            "Cosmos DB container created",
148            database=self.database_name,
149            container=self.container_name,
150            dim=self.embedding_dimension,
151            distance_function=self.distance_function,
152            vector_index_type=self.vector_index_type,
153        )

Create the Cosmos DB database and container if they don't exist.

Safe to call multiple times — no-op if both already exist.

def create_or_replace_index(self) -> None:
155    def create_or_replace_index(self) -> None:
156        """Delete the container if it exists then create it fresh.
157
158        Warning: All documents are permanently lost.
159        """
160        self._ensure_database()
161        if self.index_exists():
162            db = self._client.get_database_client(self.database_name)
163            db.delete_container(self.container_name)
164            logger.info(
165                "Cosmos DB container deleted for replacement",
166                database=self.database_name,
167                container=self.container_name,
168            )
169        self._create_container()
170        logger.info(
171            "Cosmos DB container created (replaced)",
172            database=self.database_name,
173            container=self.container_name,
174        )

Delete the container if it exists then create it fresh.

Warning: All documents are permanently lost.

def delete_index(self) -> None:
176    def delete_index(self) -> None:
177        """Permanently delete the container and all its documents.
178
179        Raises:
180            RuntimeError: If the container does not exist.
181        """
182        if not self.index_exists():
183            raise RuntimeError(
184                f"Cannot delete container '{self.database_name}/{self.container_name}': "
185                "it does not exist."
186            )
187        db = self._client.get_database_client(self.database_name)
188        db.delete_container(self.container_name)
189        logger.info(
190            "Cosmos DB container deleted",
191            database=self.database_name,
192            container=self.container_name,
193        )

Permanently delete the container and all its documents.

Raises: RuntimeError: If the container does not exist.

def index_exists(self) -> bool:
195    def index_exists(self) -> bool:
196        """Return True if the container currently exists."""
197        try:
198            db = self._client.get_database_client(self.database_name)
199            db.get_container_client(self.container_name).read()
200            return True
201        except Exception:
202            return False

Return True if the container currently exists.

def list_indexes(self) -> List[str]:
204    def list_indexes(self) -> List[str]:
205        """Return the names of all containers in the database."""
206        try:
207            db = self._client.get_database_client(self.database_name)
208            return [c["id"] for c in db.list_containers()]
209        except Exception:
210            return []

Return the names of all containers in the database.

class MongoDBIndexBuilder(gmf_forge_ai_data.indexing.BaseIndexBuilder):
 63class MongoDBIndexBuilder(BaseIndexBuilder):
 64    """
 65    Builds and manages Atlas Vector Search and text indexes for a MongoDB
 66    collection.
 67
 68    The builder owns *schema / provisioning* concerns only.  Document
 69    operations belong to ``MongoDBVectorStore``.
 70
 71    Parameters
 72    ----------
 73    connection_string:
 74        MongoDB Atlas connection string, e.g.
 75        ``"mongodb+srv://user:pass@cluster.mongodb.net/"``.
 76    database_name:
 77        Name of the MongoDB database.
 78    collection_name:
 79        Name of the collection to index.
 80    embedding_dimension:
 81        Number of dimensions in the embedding vectors.
 82    document_type:
 83        Document dataclass whose *extra* fields will be added as Atlas
 84        filter fields (fields other than ``id``, ``content``, ``embedding``,
 85        ``timestamp``, and ``metadata``).
 86    vector_index_name:
 87        Name of the Atlas Vector Search index.  Must match the
 88        ``vector_index_name`` used when constructing ``MongoDBVectorStore``.
 89        Default: ``"vector_index"``.
 90    similarity:
 91        Similarity metric for the vector index.  ``"cosine"`` (default),
 92        ``"euclidean"``, or ``"dotProduct"``.
 93    extra_filter_paths:
 94        Additional document paths to register as Atlas filter fields beyond
 95        those inferred from *document_type*.  Useful for arbitrary metadata
 96        fields stored in the ``metadata`` sub-document.
 97    text_index_fields:
 98        Fields to include in the MongoDB ``$text`` full-text index.
 99        Default: ``["content"]``.
100    ssl_cert_path:
101        Path to a CA certificate bundle (PEM) for TLS verification in
102        corporate environments with custom certificate authorities.
103    """
104
105    _BASE_KEYS = frozenset({"id", "content", "embedding", "timestamp", "metadata"})
106
107    def __init__(
108        self,
109        connection_string: str,
110        database_name: str,
111        collection_name: str,
112        embedding_dimension: int = 1536,
113        document_type: Type[Document] = Document,
114        vector_index_name: str = "vector_index",
115        similarity: Similarity = "cosine",
116        extra_filter_paths: Optional[List[str]] = None,
117        text_index_fields: Optional[List[str]] = None,
118        ssl_cert_path: Optional[str] = None,
119    ) -> None:
120        try:
121            import pymongo  # noqa: F401
122        except ImportError as exc:
123            raise ImportError(
124                "pymongo is required for MongoDBIndexBuilder. "
125                "Install it with:  pip install pymongo"
126            ) from exc
127
128        import pymongo
129
130        self.database_name = database_name
131        self.collection_name = collection_name
132        self.embedding_dimension = embedding_dimension
133        self.document_type = document_type
134        self.vector_index_name = vector_index_name
135        self.similarity = similarity
136        self.extra_filter_paths: List[str] = extra_filter_paths or []
137        self.text_index_fields: List[str] = text_index_fields or ["content"]
138
139        client_kwargs: Dict[str, Any] = {}
140        if ssl_cert_path:
141            client_kwargs["tlsCAFile"] = ssl_cert_path
142
143        self._client = pymongo.MongoClient(connection_string, **client_kwargs)
144        self._db = self._client[database_name]
145        self._collection = self._db[collection_name]
146
147    # ------------------------------------------------------------------ #
148    # BaseIndexBuilder interface                                           #
149    # ------------------------------------------------------------------ #
150
151    def create_index(self) -> None:
152        """Create the Atlas Vector Search index and the text index if they
153        don't already exist.
154
155        Safe to call multiple times — each component is idempotent.
156        """
157        self._create_vector_index(replace=False)
158        self._create_text_index()
159
160    def create_or_replace_index(self) -> None:
161        """Drop the Atlas Vector Search index if it exists, then create it
162        fresh alongside the text index.
163
164        The text index is not recreated if it already exists (text indexes
165        are schema-agnostic and need no replacement).
166
167        Warning: Existing vector index data is lost.
168        """
169        import time
170
171        if self.index_exists():
172            self._collection.drop_search_index(self.vector_index_name)
173            logger.info(
174                "Atlas Vector Search index dropped for replacement",
175                index=self.vector_index_name,
176            )
177            # Atlas drops are asynchronous — poll until the index is gone
178            # before submitting the creation request with the same name.
179            deadline = time.monotonic() + 60
180            while time.monotonic() < deadline:
181                if self.vector_index_name not in [
182                    idx["name"] for idx in self._collection.list_search_indexes()
183                ]:
184                    break
185                time.sleep(2)
186            else:
187                raise RuntimeError(
188                    f"Timed out waiting for Atlas to finish dropping index "
189                    f"'{self.vector_index_name}'. Try again in a moment."
190                )
191        self._create_vector_index(replace=True)
192        self._create_text_index()
193
194    def delete_index(self) -> None:
195        """Drop the Atlas Vector Search index.
196
197        The MongoDB text index (``content_text``) is left in place because
198        it is independent of vector dimensionality.
199
200        Raises:
201            RuntimeError: If the vector index does not exist.
202        """
203        if not self.index_exists():
204            raise RuntimeError(
205                f"Cannot delete vector index '{self.vector_index_name}' on "
206                f"'{self.database_name}.{self.collection_name}': it does not exist."
207            )
208        self._collection.drop_search_index(self.vector_index_name)
209        logger.info(
210            "Atlas Vector Search index deleted",
211            index=self.vector_index_name,
212            database=self.database_name,
213            collection=self.collection_name,
214        )
215
216    def index_exists(self) -> bool:
217        """Return True if the Atlas Vector Search index currently exists."""
218        existing = [idx["name"] for idx in self._collection.list_search_indexes()]
219        return self.vector_index_name in existing
220
221    def list_indexes(self) -> List[str]:
222        """Return the names of all Atlas Vector Search indexes on the collection."""
223        return [idx["name"] for idx in self._collection.list_search_indexes()]
224
225    # ------------------------------------------------------------------ #
226    # Additional helpers                                                   #
227    # ------------------------------------------------------------------ #
228
229    def list_text_indexes(self) -> List[str]:
230        """Return the names of all standard MongoDB indexes on the collection."""
231        return [idx["name"] for idx in self._collection.list_indexes()]
232
233    # ------------------------------------------------------------------ #
234    # Internal helpers                                                     #
235    # ------------------------------------------------------------------ #
236
237    def _build_filter_fields(self) -> List[Dict[str, str]]:
238        """Build the list of Atlas filter field definitions.
239
240        Includes custom fields inferred from *document_type* plus any
241        *extra_filter_paths* provided at construction.
242        """
243        paths = set(self.extra_filter_paths)
244
245        if dataclasses.is_dataclass(self.document_type):
246            for field in dataclasses.fields(self.document_type):
247                if field.name not in self._BASE_KEYS:
248                    paths.add(field.name)
249
250        return [{"type": "filter", "path": p} for p in sorted(paths)]
251
252    def _create_vector_index(self, replace: bool = False) -> None:
253        """Submit the Atlas Vector Search index creation request."""
254        if not replace and self.index_exists():
255            logger.info(
256                "Atlas Vector Search index already exists — skipping",
257                index=self.vector_index_name,
258                database=self.database_name,
259                collection=self.collection_name,
260            )
261            return
262
263        filter_fields = self._build_filter_fields()
264
265        index_spec: Dict[str, Any] = {
266            "name": self.vector_index_name,
267            "type": "vectorSearch",
268            "definition": {
269                "fields": [
270                    {
271                        "type": "vector",
272                        "path": "embedding",
273                        "numDimensions": self.embedding_dimension,
274                        "similarity": self.similarity,
275                    },
276                    *filter_fields,
277                ]
278            },
279        }
280
281        self._collection.create_search_index(index_spec)
282        logger.info(
283            "Atlas Vector Search index created",
284            index=self.vector_index_name,
285            database=self.database_name,
286            collection=self.collection_name,
287            dim=self.embedding_dimension,
288            similarity=self.similarity,
289            filter_fields=len(filter_fields),
290        )
291
292    def _create_text_index(self) -> None:
293        """Create a MongoDB ``$text`` index if it does not already exist."""
294        existing = [idx["name"] for idx in self._collection.list_indexes()]
295        if "content_text" in existing:
296            logger.info(
297                "Text index already exists — skipping",
298                index="content_text",
299                database=self.database_name,
300                collection=self.collection_name,
301            )
302            return
303
304        keys = [(field, "text") for field in self.text_index_fields]
305        self._collection.create_index(keys, name="content_text")
306        logger.info(
307            "Text index created",
308            index="content_text",
309            database=self.database_name,
310            collection=self.collection_name,
311            fields=self.text_index_fields,
312        )

Builds and manages Atlas Vector Search and text indexes for a MongoDB collection.

The builder owns schema / provisioning concerns only. Document operations belong to MongoDBVectorStore.

Parameters

connection_string: MongoDB Atlas connection string, e.g. "mongodb+srv://user:pass@cluster.mongodb.net/". database_name: Name of the MongoDB database. collection_name: Name of the collection to index. embedding_dimension: Number of dimensions in the embedding vectors. document_type: Document dataclass whose extra fields will be added as Atlas filter fields (fields other than id, content, embedding, timestamp, and metadata). vector_index_name: Name of the Atlas Vector Search index. Must match the vector_index_name used when constructing MongoDBVectorStore. Default: "vector_index". similarity: Similarity metric for the vector index. "cosine" (default), "euclidean", or "dotProduct". extra_filter_paths: Additional document paths to register as Atlas filter fields beyond those inferred from document_type. Useful for arbitrary metadata fields stored in the metadata sub-document. text_index_fields: Fields to include in the MongoDB $text full-text index. Default: ["content"]. ssl_cert_path: Path to a CA certificate bundle (PEM) for TLS verification in corporate environments with custom certificate authorities.

MongoDBIndexBuilder( connection_string: str, database_name: str, collection_name: str, embedding_dimension: int = 1536, document_type: Type[gmf_forge_ai_data.Document] = <class 'gmf_forge_ai_data.Document'>, vector_index_name: str = 'vector_index', similarity: Literal['cosine', 'euclidean', 'dotProduct'] = 'cosine', extra_filter_paths: Optional[List[str]] = None, text_index_fields: Optional[List[str]] = None, ssl_cert_path: Optional[str] = None)
107    def __init__(
108        self,
109        connection_string: str,
110        database_name: str,
111        collection_name: str,
112        embedding_dimension: int = 1536,
113        document_type: Type[Document] = Document,
114        vector_index_name: str = "vector_index",
115        similarity: Similarity = "cosine",
116        extra_filter_paths: Optional[List[str]] = None,
117        text_index_fields: Optional[List[str]] = None,
118        ssl_cert_path: Optional[str] = None,
119    ) -> None:
120        try:
121            import pymongo  # noqa: F401
122        except ImportError as exc:
123            raise ImportError(
124                "pymongo is required for MongoDBIndexBuilder. "
125                "Install it with:  pip install pymongo"
126            ) from exc
127
128        import pymongo
129
130        self.database_name = database_name
131        self.collection_name = collection_name
132        self.embedding_dimension = embedding_dimension
133        self.document_type = document_type
134        self.vector_index_name = vector_index_name
135        self.similarity = similarity
136        self.extra_filter_paths: List[str] = extra_filter_paths or []
137        self.text_index_fields: List[str] = text_index_fields or ["content"]
138
139        client_kwargs: Dict[str, Any] = {}
140        if ssl_cert_path:
141            client_kwargs["tlsCAFile"] = ssl_cert_path
142
143        self._client = pymongo.MongoClient(connection_string, **client_kwargs)
144        self._db = self._client[database_name]
145        self._collection = self._db[collection_name]
database_name
collection_name
embedding_dimension
document_type
vector_index_name
similarity
extra_filter_paths: List[str]
text_index_fields: List[str]
def create_index(self) -> None:
151    def create_index(self) -> None:
152        """Create the Atlas Vector Search index and the text index if they
153        don't already exist.
154
155        Safe to call multiple times — each component is idempotent.
156        """
157        self._create_vector_index(replace=False)
158        self._create_text_index()

Create the Atlas Vector Search index and the text index if they don't already exist.

Safe to call multiple times — each component is idempotent.

def create_or_replace_index(self) -> None:
160    def create_or_replace_index(self) -> None:
161        """Drop the Atlas Vector Search index if it exists, then create it
162        fresh alongside the text index.
163
164        The text index is not recreated if it already exists (text indexes
165        are schema-agnostic and need no replacement).
166
167        Warning: Existing vector index data is lost.
168        """
169        import time
170
171        if self.index_exists():
172            self._collection.drop_search_index(self.vector_index_name)
173            logger.info(
174                "Atlas Vector Search index dropped for replacement",
175                index=self.vector_index_name,
176            )
177            # Atlas drops are asynchronous — poll until the index is gone
178            # before submitting the creation request with the same name.
179            deadline = time.monotonic() + 60
180            while time.monotonic() < deadline:
181                if self.vector_index_name not in [
182                    idx["name"] for idx in self._collection.list_search_indexes()
183                ]:
184                    break
185                time.sleep(2)
186            else:
187                raise RuntimeError(
188                    f"Timed out waiting for Atlas to finish dropping index "
189                    f"'{self.vector_index_name}'. Try again in a moment."
190                )
191        self._create_vector_index(replace=True)
192        self._create_text_index()

Drop the Atlas Vector Search index if it exists, then create it fresh alongside the text index.

The text index is not recreated if it already exists (text indexes are schema-agnostic and need no replacement).

Warning: Existing vector index data is lost.

def delete_index(self) -> None:
194    def delete_index(self) -> None:
195        """Drop the Atlas Vector Search index.
196
197        The MongoDB text index (``content_text``) is left in place because
198        it is independent of vector dimensionality.
199
200        Raises:
201            RuntimeError: If the vector index does not exist.
202        """
203        if not self.index_exists():
204            raise RuntimeError(
205                f"Cannot delete vector index '{self.vector_index_name}' on "
206                f"'{self.database_name}.{self.collection_name}': it does not exist."
207            )
208        self._collection.drop_search_index(self.vector_index_name)
209        logger.info(
210            "Atlas Vector Search index deleted",
211            index=self.vector_index_name,
212            database=self.database_name,
213            collection=self.collection_name,
214        )

Drop the Atlas Vector Search index.

The MongoDB text index (content_text) is left in place because it is independent of vector dimensionality.

Raises: RuntimeError: If the vector index does not exist.

def index_exists(self) -> bool:
216    def index_exists(self) -> bool:
217        """Return True if the Atlas Vector Search index currently exists."""
218        existing = [idx["name"] for idx in self._collection.list_search_indexes()]
219        return self.vector_index_name in existing

Return True if the Atlas Vector Search index currently exists.

def list_indexes(self) -> List[str]:
221    def list_indexes(self) -> List[str]:
222        """Return the names of all Atlas Vector Search indexes on the collection."""
223        return [idx["name"] for idx in self._collection.list_search_indexes()]

Return the names of all Atlas Vector Search indexes on the collection.

def list_text_indexes(self) -> List[str]:
229    def list_text_indexes(self) -> List[str]:
230        """Return the names of all standard MongoDB indexes on the collection."""
231        return [idx["name"] for idx in self._collection.list_indexes()]

Return the names of all standard MongoDB indexes on the collection.