gmf_forge_ai_data.connectors

Data connectors — source ingestion for RAG pipelines.

Modules: base_connector: Abstract base class for all connectors. filesystem_connector: Load documents from local directory trees. sharepoint_connector: Load documents from SharePoint via Graph API. blob_storage_connector: Load documents from Azure Blob Storage. soap_connector: Generic SOAP/WSDL base connector for any web service. azure_devops_wiki_connector: Load pages from Azure DevOps Wiki via REST API.

 1"""
 2Data connectors — source ingestion for RAG pipelines.
 3
 4Modules:
 5    base_connector:                Abstract base class for all connectors.
 6    filesystem_connector:          Load documents from local directory trees.
 7    sharepoint_connector:          Load documents from SharePoint via Graph API.
 8    blob_storage_connector:        Load documents from Azure Blob Storage.
 9    soap_connector:                Generic SOAP/WSDL base connector for any web service.
10    azure_devops_wiki_connector:   Load pages from Azure DevOps Wiki via REST API.
11"""
12
13from .base_connector import BaseConnector
14from .filesystem_connector import FilesystemConnector
15from .sharepoint_connector import SharePointConnector
16from .blob_storage_connector import BlobStorageConnector
17from .soap_connector import SoapConnector
18from .azure_devops_wiki_connector import AzureDevOpsWikiConnector
19
20__all__ = [
21    "BaseConnector",
22    "FilesystemConnector",
23    "SharePointConnector",
24    "BlobStorageConnector",
25    "SoapConnector",
26    "AzureDevOpsWikiConnector",
27]
class BaseConnector(abc.ABC):
20class BaseConnector(ABC):
21    """
22    Abstract base class for all data source connectors.
23
24    Connectors handle one concern only: sourcing raw content and converting
25    it to Documents. They do NOT chunk, embed, or index — those steps follow.
26
27    Contract for all implementations:
28    - Every returned Document must have a non-empty ``id`` and ``content``.
29    - ``embedding`` is always ``None`` — the caller is responsible for embedding.
30    - Source-specific metadata (path, URL, container, etc.) must be stored in
31      ``document.metadata`` under consistent, documented keys.
32    - Files that cannot be read should be skipped with a printed warning rather
33      than crashing the entire load.
34    """
35
36    @abstractmethod
37    def load(self) -> List[Document]:
38        """
39        Load documents from the data source.
40
41        Returns:
42            List of Document objects with ``id``, ``content``, ``timestamp``,
43            and ``metadata`` populated. ``embedding`` is always ``None``.
44        """

Abstract base class for all data source connectors.

Connectors handle one concern only: sourcing raw content and converting it to Documents. They do NOT chunk, embed, or index — those steps follow.

Contract for all implementations:

  • Every returned Document must have a non-empty id and content.
  • embedding is always None — the caller is responsible for embedding.
  • Source-specific metadata (path, URL, container, etc.) must be stored in document.metadata under consistent, documented keys.
  • Files that cannot be read should be skipped with a printed warning rather than crashing the entire load.
@abstractmethod
def load(self) -> List[gmf_forge_ai_data.Document]:
36    @abstractmethod
37    def load(self) -> List[Document]:
38        """
39        Load documents from the data source.
40
41        Returns:
42            List of Document objects with ``id``, ``content``, ``timestamp``,
43            and ``metadata`` populated. ``embedding`` is always ``None``.
44        """

Load documents from the data source.

Returns: List of Document objects with id, content, timestamp, and metadata populated. embedding is always None.

class FilesystemConnector(gmf_forge_ai_data.connectors.BaseConnector):
 59class FilesystemConnector(BaseConnector):
 60    """
 61    Loads documents from a local directory into Document objects.
 62
 63    Scans a root directory (optionally recursively) and converts each matching
 64    file into a Document. The document ``id`` is a stable MD5 hash of the
 65    absolute file path, so re-runs produce consistent IDs for upsert workflows.
 66
 67    Supported formats:
 68    - Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .js, .ts,
 69      .java, .cpp, .c, .h, .cs, .html, .htm, .xml, .toml, .ini, .cfg, .env
 70    - PDF (.pdf) — requires ``pip install pypdf``
 71    - Word Document (.docx) — requires ``pip install python-docx``
 72
 73    Metadata keys set on every returned Document:
 74        ``source``       Absolute file path as a string
 75        ``file_name``    File name including extension
 76        ``extension``    Lowercase extension including dot (e.g. ``".md"``)
 77        ``size_bytes``   File size in bytes
 78        ``modified_at``  Last modification time in ISO 8601 format
 79
 80    Example:
 81        ```python
 82        from gmf_forge_ai_data.connectors import FilesystemConnector
 83
 84        connector = FilesystemConnector(
 85            root_path="/data/docs",
 86            extensions=[".txt", ".md", ".pdf"],
 87            recursive=True,
 88        )
 89        docs = connector.load()
 90        # docs is List[Document] — pass to a chunker next
 91        ```
 92    """
 93
 94    def __init__(
 95        self,
 96        root_path: Union[str, Path],
 97        extensions: Optional[List[str]] = None,
 98        recursive: bool = True,
 99        encoding: str = "utf-8-sig",
100        skip_empty: bool = True,
101    ):
102        """
103        Args:
104            root_path:   Root directory to scan.
105            extensions:  Explicit list of extensions to include (e.g.
106                         ``[".txt", ".md"]``). Include the leading dot.
107                         If ``None``, all supported formats are loaded.
108            recursive:   If ``True`` (default), scan subdirectories recursively.
109            encoding:    Text encoding for native text files (default ``"utf-8"``).
110            skip_empty:  If ``True`` (default), skip files that produce no
111                         text content after stripping whitespace.
112        """
113        self.root_path = Path(root_path).resolve()
114        self.extensions: Optional[Set[str]] = (
115            {ext.lower() for ext in extensions}
116            if extensions is not None
117            else None  # None = all supported formats
118        )
119        self.recursive = recursive
120        self.encoding = encoding
121        self.skip_empty = skip_empty
122        self._logger = BasicLogger(__name__)
123
124    def load(self) -> List[Document]:
125        """
126        Scan the root directory and return one Document per matching file.
127
128        Files that raise an error during reading are skipped with a warning
129        printed to stdout so the rest of the load continues uninterrupted.
130
131        Returns:
132            List of Document objects, one per successfully loaded file,
133            sorted by file path for deterministic ordering.
134
135        Raises:
136            FileNotFoundError: If ``root_path`` does not exist.
137            NotADirectoryError: If ``root_path`` is not a directory.
138        """
139        if not self.root_path.exists():
140            raise FileNotFoundError(
141                f"Root path does not exist: {self.root_path}"
142            )
143        if not self.root_path.is_dir():
144            raise NotADirectoryError(
145                f"Root path is not a directory: {self.root_path}"
146            )
147
148        pattern = "**/*" if self.recursive else "*"
149        all_files = sorted(p for p in self.root_path.glob(pattern) if p.is_file())
150
151        documents: List[Document] = []
152        for file_path in all_files:
153            ext = file_path.suffix.lower()
154            if not self._is_accepted(ext):
155                continue
156
157            try:
158                content = self._read_file(file_path, ext)
159            except Exception as e:
160                self._logger.warning("Skipping file", file=file_path.name, error=str(e))
161                continue
162
163            if self.skip_empty and not content.strip():
164                continue
165
166            stat = file_path.stat()
167            doc_id = "fs_" + hashlib.md5(str(file_path).encode()).hexdigest()[:12]
168            documents.append(Document(
169                id=doc_id,
170                content=content.strip(),
171                timestamp=datetime.fromtimestamp(stat.st_mtime),
172                metadata={
173                    "source": str(file_path),
174                    "file_name": file_path.name,
175                    "extension": ext,
176                    "size_bytes": stat.st_size,
177                    "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
178                },
179            ))
180
181        return documents
182
183    # ── Private helpers ──────────────────────────────────────────────────────
184
185    def _is_accepted(self, ext: str) -> bool:
186        """Return True if this extension should be loaded."""
187        if self.extensions is not None:
188            return ext in self.extensions
189        # No filter specified: accept all natively supported + optional formats
190        return ext in _NATIVE_TEXT_EXTENSIONS or ext in {".pdf", ".docx"}
191
192    def _read_file(self, path: Path, ext: str) -> str:
193        if ext == ".pdf":
194            return _read_pdf(path)
195        if ext == ".docx":
196            return _read_docx(path)
197        return path.read_text(encoding=self.encoding, errors="replace")

Loads documents from a local directory into Document objects.

Scans a root directory (optionally recursively) and converts each matching file into a Document. The document id is a stable MD5 hash of the absolute file path, so re-runs produce consistent IDs for upsert workflows.

Supported formats:

  • Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .js, .ts, .java, .cpp, .c, .h, .cs, .html, .htm, .xml, .toml, .ini, .cfg, .env
  • PDF (.pdf) — requires pip install pypdf
  • Word Document (.docx) — requires pip install python-docx

Metadata keys set on every returned Document: source Absolute file path as a string file_name File name including extension extension Lowercase extension including dot (e.g. ".md") size_bytes File size in bytes modified_at Last modification time in ISO 8601 format

Example:

from gmf_forge_ai_data.connectors import FilesystemConnector

connector = FilesystemConnector(
    root_path="/data/docs",
    extensions=[".txt", ".md", ".pdf"],
    recursive=True,
)
docs = connector.load()
# docs is List[Document] — pass to a chunker next
FilesystemConnector( root_path: Union[str, pathlib._local.Path], extensions: Optional[List[str]] = None, recursive: bool = True, encoding: str = 'utf-8-sig', skip_empty: bool = True)
 94    def __init__(
 95        self,
 96        root_path: Union[str, Path],
 97        extensions: Optional[List[str]] = None,
 98        recursive: bool = True,
 99        encoding: str = "utf-8-sig",
100        skip_empty: bool = True,
101    ):
102        """
103        Args:
104            root_path:   Root directory to scan.
105            extensions:  Explicit list of extensions to include (e.g.
106                         ``[".txt", ".md"]``). Include the leading dot.
107                         If ``None``, all supported formats are loaded.
108            recursive:   If ``True`` (default), scan subdirectories recursively.
109            encoding:    Text encoding for native text files (default ``"utf-8"``).
110            skip_empty:  If ``True`` (default), skip files that produce no
111                         text content after stripping whitespace.
112        """
113        self.root_path = Path(root_path).resolve()
114        self.extensions: Optional[Set[str]] = (
115            {ext.lower() for ext in extensions}
116            if extensions is not None
117            else None  # None = all supported formats
118        )
119        self.recursive = recursive
120        self.encoding = encoding
121        self.skip_empty = skip_empty
122        self._logger = BasicLogger(__name__)

Args: root_path: Root directory to scan. extensions: Explicit list of extensions to include (e.g. [".txt", ".md"]). Include the leading dot. If None, all supported formats are loaded. recursive: If True (default), scan subdirectories recursively. encoding: Text encoding for native text files (default "utf-8"). skip_empty: If True (default), skip files that produce no text content after stripping whitespace.

root_path
extensions: Optional[Set[str]]
recursive
encoding
skip_empty
def load(self) -> List[gmf_forge_ai_data.Document]:
124    def load(self) -> List[Document]:
125        """
126        Scan the root directory and return one Document per matching file.
127
128        Files that raise an error during reading are skipped with a warning
129        printed to stdout so the rest of the load continues uninterrupted.
130
131        Returns:
132            List of Document objects, one per successfully loaded file,
133            sorted by file path for deterministic ordering.
134
135        Raises:
136            FileNotFoundError: If ``root_path`` does not exist.
137            NotADirectoryError: If ``root_path`` is not a directory.
138        """
139        if not self.root_path.exists():
140            raise FileNotFoundError(
141                f"Root path does not exist: {self.root_path}"
142            )
143        if not self.root_path.is_dir():
144            raise NotADirectoryError(
145                f"Root path is not a directory: {self.root_path}"
146            )
147
148        pattern = "**/*" if self.recursive else "*"
149        all_files = sorted(p for p in self.root_path.glob(pattern) if p.is_file())
150
151        documents: List[Document] = []
152        for file_path in all_files:
153            ext = file_path.suffix.lower()
154            if not self._is_accepted(ext):
155                continue
156
157            try:
158                content = self._read_file(file_path, ext)
159            except Exception as e:
160                self._logger.warning("Skipping file", file=file_path.name, error=str(e))
161                continue
162
163            if self.skip_empty and not content.strip():
164                continue
165
166            stat = file_path.stat()
167            doc_id = "fs_" + hashlib.md5(str(file_path).encode()).hexdigest()[:12]
168            documents.append(Document(
169                id=doc_id,
170                content=content.strip(),
171                timestamp=datetime.fromtimestamp(stat.st_mtime),
172                metadata={
173                    "source": str(file_path),
174                    "file_name": file_path.name,
175                    "extension": ext,
176                    "size_bytes": stat.st_size,
177                    "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(),
178                },
179            ))
180
181        return documents

Scan the root directory and return one Document per matching file.

Files that raise an error during reading are skipped with a warning printed to stdout so the rest of the load continues uninterrupted.

Returns: List of Document objects, one per successfully loaded file, sorted by file path for deterministic ordering.

Raises: FileNotFoundError: If root_path does not exist. NotADirectoryError: If root_path is not a directory.

class SharePointConnector(gmf_forge_ai_data.connectors.BaseConnector):
 34class SharePointConnector(BaseConnector):
 35    """
 36    Loads files from a SharePoint document library via the Microsoft Graph API.
 37
 38    Authenticates using OAuth2 client credentials — no user interaction or
 39    browser required. Recursively lists all files under ``folder_path``,
 40    downloads their content, and extracts text. Unsupported binary formats
 41    are skipped automatically.
 42
 43    Supported file types:
 44    - Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .html, .xml
 45    - PDF (.pdf) — requires ``pip install pypdf``
 46    - Word Document (.docx) — requires ``pip install python-docx``
 47
 48    Metadata keys set on every returned Document:
 49        ``source``         Microsoft Graph download URL
 50        ``file_name``      File name including extension
 51        ``extension``      Lowercase extension including dot
 52        ``size_bytes``     File size in bytes
 53        ``modified_at``    Last modification time (ISO 8601 string)
 54        ``sharepoint_id``  SharePoint item ID
 55
 56    Example:
 57        ```python
 58        from gmf_forge_ai_data.connectors import SharePointConnector
 59
 60        connector = SharePointConnector(
 61            tenant_id="your-tenant-id",
 62            client_id="your-client-id",
 63            client_secret="your-client-secret",
 64            site_id="your-site-id",
 65            folder_path="/Shared Documents/KnowledgeBase",
 66        )
 67        docs = connector.load()
 68        ```
 69    """
 70
 71    _GRAPH_BASE = "https://graph.microsoft.com/v1.0"
 72    _TOKEN_URL = "https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
 73    _SUPPORTED_EXTENSIONS = {
 74        ".txt", ".md", ".rst", ".csv", ".json", ".yaml", ".yml",
 75        ".py", ".html", ".htm", ".xml", ".pdf", ".docx",
 76    }
 77
 78    def __init__(
 79        self,
 80        tenant_id: str,
 81        client_id: str,
 82        client_secret: str,
 83        site_id: str,
 84        folder_path: str = "/",
 85        drive_id: Optional[str] = None,
 86        ssl_cert_path: Optional[str] = None,
 87    ):
 88        """
 89        Args:
 90            tenant_id:      Azure AD tenant ID.
 91            client_id:      App registration (service principal) client ID.
 92            client_secret:  App registration client secret.
 93            site_id:        SharePoint site ID. Use ``"root"`` for the tenant
 94                            root site or provide the full GUID from the Graph API.
 95            folder_path:    Path inside the drive to load from. Use ``"/"``
 96                            (default) for the entire drive root.
 97            drive_id:       Drive ID or ``None`` (default) to use the site's
 98                            default document library (``root`` drive).
 99            ssl_cert_path:  Optional path to a CA bundle PEM file for
100                            environments with corporate SSL inspection.
101        """
102        self.tenant_id = tenant_id
103        self.client_id = client_id
104        self.client_secret = client_secret
105        self.site_id = site_id
106        self.folder_path = folder_path.rstrip("/") or "/"
107        self.drive_id = drive_id or "root"
108        self.ssl_cert_path = ssl_cert_path
109        self._token: Optional[str] = None
110        self._logger = BasicLogger(__name__)
111
112    def load(self) -> List[Document]:
113        """
114        Authenticate and load all supported files from the configured folder.
115
116        Returns:
117            List of Document objects, one per successfully loaded file.
118
119        Raises:
120            ImportError: If the ``requests`` package is not installed.
121            requests.HTTPError: If authentication or a Graph API call fails.
122        """
123        try:
124            import requests  # type: ignore
125        except ImportError:
126            raise ImportError(
127                "requests is required for SharePointConnector. "
128                "Install it with: pip install requests"
129            )
130
131        self._token = self._acquire_token(requests)
132        items = self._list_items(requests, self.folder_path)
133
134        documents: List[Document] = []
135        for item in items:
136            name: str = item.get("name", "")
137            ext = ("." + name.rsplit(".", 1)[-1]).lower() if "." in name else ""
138            if ext not in self._SUPPORTED_EXTENSIONS:
139                continue
140
141            try:
142                content = self._download_item(requests, item, ext)
143            except Exception as e:
144                self._logger.warning("Skipping file", file=name, error=str(e))
145                continue
146
147            if not content.strip():
148                continue
149
150            modified = item.get("lastModifiedDateTime", "")
151            size = item.get("size", 0)
152            download_url = item.get("@microsoft.graph.downloadUrl", "")
153            item_id = item.get("id", "")
154            doc_id = "sp_" + hashlib.md5(item_id.encode()).hexdigest()[:12]
155
156            documents.append(Document(
157                id=doc_id,
158                content=content.strip(),
159                timestamp=(
160                    datetime.fromisoformat(modified.replace("Z", "+00:00"))
161                    if modified else datetime.now()
162                ),
163                metadata={
164                    "source": download_url,
165                    "file_name": name,
166                    "extension": ext,
167                    "size_bytes": size,
168                    "modified_at": modified,
169                    "sharepoint_id": item_id,
170                },
171            ))
172
173        return documents
174
175    # ── Private helpers ──────────────────────────────────────────────────────
176
177    def _acquire_token(self, requests) -> str:
178        """Acquire an OAuth2 access token via client credentials flow."""
179        url = self._TOKEN_URL.format(tenant_id=self.tenant_id)
180        resp = requests.post(
181            url,
182            data={
183                "grant_type": "client_credentials",
184                "client_id": self.client_id,
185                "client_secret": self.client_secret,
186                "scope": "https://graph.microsoft.com/.default",
187            },
188            verify=self.ssl_cert_path or True,
189            timeout=30,
190        )
191        resp.raise_for_status()
192        return resp.json()["access_token"]
193
194    def _headers(self) -> Dict[str, str]:
195        return {"Authorization": f"Bearer {self._token}"}
196
197    def _list_items(self, requests, folder_path: str) -> List[Dict]:
198        """Recursively list all file items under folder_path."""
199        # Graph API uses /drive/ (singular) for the default document library
200        # and /drives/{id}/ when a specific drive ID is given.
201        drive_segment = (
202            "drive" if self.drive_id == "root"
203            else f"drives/{self.drive_id}"
204        )
205        if folder_path == "/":
206            url = (
207                f"{self._GRAPH_BASE}/sites/{self.site_id}"
208                f"/{drive_segment}/root/children"
209            )
210        else:
211            encoded = folder_path.lstrip("/")
212            url = (
213                f"{self._GRAPH_BASE}/sites/{self.site_id}"
214                f"/{drive_segment}/root:/{encoded}:/children"
215            )
216
217        items: List[Dict] = []
218        while url:
219            resp = requests.get(
220                url,
221                headers=self._headers(),
222                verify=self.ssl_cert_path or True,
223                timeout=30,
224            )
225            resp.raise_for_status()
226            data = resp.json()
227            for item in data.get("value", []):
228                if "folder" in item:
229                    # Recurse into sub-folders
230                    child_path = folder_path.rstrip("/") + "/" + item["name"]
231                    self._logger.info("Scanning subfolder", path=child_path)
232                    try:
233                        items.extend(self._list_items(requests, child_path))
234                    except Exception as e:
235                        self._logger.warning("Skipping subfolder", path=child_path, error=str(e))
236                else:
237                    items.append(item)
238            url = data.get("@odata.nextLink")  # follow pagination if present
239
240        return items
241
242    def _download_item(self, requests, item: Dict, ext: str) -> str:
243        """Download a file item's raw bytes and decode to text."""
244        download_url = item.get("@microsoft.graph.downloadUrl")
245        if not download_url:
246            raise ValueError(f"No download URL for item: {item.get('name')}")
247        resp = requests.get(
248            download_url,
249            verify=self.ssl_cert_path or True,
250            timeout=60,
251        )
252        resp.raise_for_status()
253        raw: bytes = resp.content
254        if ext == ".pdf":
255            return self._extract_pdf(raw)
256        if ext == ".docx":
257            return self._extract_docx(raw)
258        return raw.decode("utf-8-sig", errors="replace")
259
260    @staticmethod
261    def _extract_pdf(raw: bytes) -> str:
262        try:
263            import pypdf  # type: ignore
264        except ImportError:
265            raise ImportError(
266                "pypdf required for PDF support: pip install pypdf"
267            )
268        import logging
269        logging.getLogger("pypdf").setLevel(logging.ERROR)
270        reader = pypdf.PdfReader(io.BytesIO(raw))
271        return "\n".join(page.extract_text() or "" for page in reader.pages)
272
273    @staticmethod
274    def _extract_docx(raw: bytes) -> str:
275        try:
276            import docx  # type: ignore
277        except ImportError:
278            raise ImportError(
279                "python-docx required for DOCX support: pip install python-docx"
280            )
281        doc = docx.Document(io.BytesIO(raw))
282        return "\n".join(p.text for p in doc.paragraphs if p.text.strip())

Loads files from a SharePoint document library via the Microsoft Graph API.

Authenticates using OAuth2 client credentials — no user interaction or browser required. Recursively lists all files under folder_path, downloads their content, and extracts text. Unsupported binary formats are skipped automatically.

Supported file types:

  • Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .html, .xml
  • PDF (.pdf) — requires pip install pypdf
  • Word Document (.docx) — requires pip install python-docx

Metadata keys set on every returned Document: source Microsoft Graph download URL file_name File name including extension extension Lowercase extension including dot size_bytes File size in bytes modified_at Last modification time (ISO 8601 string) sharepoint_id SharePoint item ID

Example:

from gmf_forge_ai_data.connectors import SharePointConnector

connector = SharePointConnector(
    tenant_id="your-tenant-id",
    client_id="your-client-id",
    client_secret="your-client-secret",
    site_id="your-site-id",
    folder_path="/Shared Documents/KnowledgeBase",
)
docs = connector.load()
SharePointConnector( tenant_id: str, client_id: str, client_secret: str, site_id: str, folder_path: str = '/', drive_id: Optional[str] = None, ssl_cert_path: Optional[str] = None)
 78    def __init__(
 79        self,
 80        tenant_id: str,
 81        client_id: str,
 82        client_secret: str,
 83        site_id: str,
 84        folder_path: str = "/",
 85        drive_id: Optional[str] = None,
 86        ssl_cert_path: Optional[str] = None,
 87    ):
 88        """
 89        Args:
 90            tenant_id:      Azure AD tenant ID.
 91            client_id:      App registration (service principal) client ID.
 92            client_secret:  App registration client secret.
 93            site_id:        SharePoint site ID. Use ``"root"`` for the tenant
 94                            root site or provide the full GUID from the Graph API.
 95            folder_path:    Path inside the drive to load from. Use ``"/"``
 96                            (default) for the entire drive root.
 97            drive_id:       Drive ID or ``None`` (default) to use the site's
 98                            default document library (``root`` drive).
 99            ssl_cert_path:  Optional path to a CA bundle PEM file for
100                            environments with corporate SSL inspection.
101        """
102        self.tenant_id = tenant_id
103        self.client_id = client_id
104        self.client_secret = client_secret
105        self.site_id = site_id
106        self.folder_path = folder_path.rstrip("/") or "/"
107        self.drive_id = drive_id or "root"
108        self.ssl_cert_path = ssl_cert_path
109        self._token: Optional[str] = None
110        self._logger = BasicLogger(__name__)

Args: tenant_id: Azure AD tenant ID. client_id: App registration (service principal) client ID. client_secret: App registration client secret. site_id: SharePoint site ID. Use "root" for the tenant root site or provide the full GUID from the Graph API. folder_path: Path inside the drive to load from. Use "/" (default) for the entire drive root. drive_id: Drive ID or None (default) to use the site's default document library (root drive). ssl_cert_path: Optional path to a CA bundle PEM file for environments with corporate SSL inspection.

tenant_id
client_id
client_secret
site_id
folder_path
drive_id
ssl_cert_path
def load(self) -> List[gmf_forge_ai_data.Document]:
112    def load(self) -> List[Document]:
113        """
114        Authenticate and load all supported files from the configured folder.
115
116        Returns:
117            List of Document objects, one per successfully loaded file.
118
119        Raises:
120            ImportError: If the ``requests`` package is not installed.
121            requests.HTTPError: If authentication or a Graph API call fails.
122        """
123        try:
124            import requests  # type: ignore
125        except ImportError:
126            raise ImportError(
127                "requests is required for SharePointConnector. "
128                "Install it with: pip install requests"
129            )
130
131        self._token = self._acquire_token(requests)
132        items = self._list_items(requests, self.folder_path)
133
134        documents: List[Document] = []
135        for item in items:
136            name: str = item.get("name", "")
137            ext = ("." + name.rsplit(".", 1)[-1]).lower() if "." in name else ""
138            if ext not in self._SUPPORTED_EXTENSIONS:
139                continue
140
141            try:
142                content = self._download_item(requests, item, ext)
143            except Exception as e:
144                self._logger.warning("Skipping file", file=name, error=str(e))
145                continue
146
147            if not content.strip():
148                continue
149
150            modified = item.get("lastModifiedDateTime", "")
151            size = item.get("size", 0)
152            download_url = item.get("@microsoft.graph.downloadUrl", "")
153            item_id = item.get("id", "")
154            doc_id = "sp_" + hashlib.md5(item_id.encode()).hexdigest()[:12]
155
156            documents.append(Document(
157                id=doc_id,
158                content=content.strip(),
159                timestamp=(
160                    datetime.fromisoformat(modified.replace("Z", "+00:00"))
161                    if modified else datetime.now()
162                ),
163                metadata={
164                    "source": download_url,
165                    "file_name": name,
166                    "extension": ext,
167                    "size_bytes": size,
168                    "modified_at": modified,
169                    "sharepoint_id": item_id,
170                },
171            ))
172
173        return documents

Authenticate and load all supported files from the configured folder.

Returns: List of Document objects, one per successfully loaded file.

Raises: ImportError: If the requests package is not installed. requests.HTTPError: If authentication or a Graph API call fails.

class BlobStorageConnector(gmf_forge_ai_data.connectors.BaseConnector):
 35class BlobStorageConnector(BaseConnector):
 36    """
 37    Loads blobs from an Azure Blob Storage container into Document objects.
 38
 39    Lists all blobs in the container (optionally filtered by a path prefix),
 40    downloads their content, and extracts text. Blobs whose extension is not
 41    in the supported set are skipped automatically.
 42
 43    Supported file types:
 44    - Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .html, .xml
 45    - PDF (.pdf) — requires ``pip install pypdf``
 46    - Word Document (.docx) — requires ``pip install python-docx``
 47
 48    Metadata keys set on every returned Document:
 49        ``source``       Full blob URL
 50        ``file_name``    Last path segment of the blob name
 51        ``blob_name``    Full blob path inside the container
 52        ``extension``    Lowercase extension including dot
 53        ``size_bytes``   Blob content length in bytes
 54        ``modified_at``  Last modified time in ISO 8601 format
 55        ``container``    Container name
 56
 57    Example::
 58
 59        from gmf_forge_ai_data.connectors import BlobStorageConnector
 60
 61        connector = BlobStorageConnector(
 62            account_name="myaccount",
 63            access_key="your-storage-account-access-key",
 64            container_name="documents",
 65            prefix="knowledge-base/",
 66        )
 67        docs = connector.load()
 68    """
 69
 70    def __init__(
 71        self,
 72        account_name: str,
 73        access_key: str,
 74        container_name: str,
 75        prefix: str = "",
 76        ssl_cert_path: Optional[str] = None,
 77    ):
 78        """
 79        Args:
 80            account_name:  Storage account name.
 81            access_key:    Storage account access key.
 82            container_name: Name of the blob container to read from.
 83            prefix:        Optional blob name prefix for filtering, acting
 84                           like a folder path (e.g. ``"knowledge-base/"``).
 85                           Pass ``""`` (default) to list the entire container.
 86            ssl_cert_path: Optional path to a CA bundle PEM file for
 87                           environments with corporate SSL inspection.
 88        """
 89        self.account_name = account_name
 90        self.access_key = access_key
 91        self.container_name = container_name
 92        self.connection_string = (
 93            f"DefaultEndpointsProtocol=https;"
 94            f"AccountName={account_name};"
 95            f"AccountKey={access_key};"
 96            f"EndpointSuffix=core.windows.net"
 97        )
 98        self.account_url = f"https://{account_name}.blob.core.windows.net"
 99        self.prefix = prefix
100        self.ssl_cert_path = ssl_cert_path
101        self._logger = BasicLogger(__name__)
102
103    def load(self) -> List[Document]:
104        """
105        List and download all supported blobs in the container under ``prefix``.
106
107        Returns:
108            List of Document objects, one per successfully loaded blob.
109
110        Raises:
111            ImportError: If the ``azure-storage-blob`` package is not installed.
112        """
113        try:
114            from azure.storage.blob import BlobServiceClient  # type: ignore
115        except ImportError:
116            raise ImportError(
117                "azure-storage-blob is required for BlobStorageConnector. "
118                "Install it with: pip install azure-storage-blob"
119            )
120
121        client = BlobServiceClient.from_connection_string(self.connection_string)
122        container_client = client.get_container_client(self.container_name)
123
124        documents: List[Document] = []
125        for blob in container_client.list_blobs(name_starts_with=self.prefix or None):
126            name: str = blob.name
127            ext = ("." + name.rsplit(".", 1)[-1]).lower() if "." in name else ""
128            if ext not in _SUPPORTED_EXTENSIONS:
129                continue
130
131            try:
132                content = self._download_blob(container_client, name, ext)
133            except Exception as e:
134                self._logger.warning("Skipping blob", blob=name, error=str(e))
135                continue
136
137            if not content.strip():
138                continue
139
140            modified = blob.last_modified
141            size = blob.size or 0
142            blob_url = f"{self.account_url}/{self.container_name}/{name}"
143            doc_id = "blob_" + hashlib.md5(blob_url.encode()).hexdigest()[:12]
144            file_name = name.split("/")[-1]
145
146            documents.append(Document(
147                id=doc_id,
148                content=content.strip(),
149                timestamp=(
150                    modified if isinstance(modified, datetime) else datetime.now()
151                ),
152                metadata={
153                    "source": blob_url,
154                    "file_name": file_name,
155                    "blob_name": name,
156                    "extension": ext,
157                    "size_bytes": size,
158                    "modified_at": modified.isoformat() if modified else "",
159                    "container": self.container_name,
160                },
161            ))
162
163        return documents
164
165    # ── Private helpers ──────────────────────────────────────────────────────
166
167    def _download_blob(self, container_client, name: str, ext: str) -> str:
168        raw: bytes = container_client.download_blob(name).readall()
169        if ext == ".pdf":
170            return self._extract_pdf(raw)
171        if ext == ".docx":
172            return self._extract_docx(raw)
173        return raw.decode("utf-8-sig", errors="replace")
174
175    @staticmethod
176    def _extract_pdf(raw: bytes) -> str:
177        try:
178            import pypdf  # type: ignore
179        except ImportError:
180            raise ImportError(
181                "pypdf required for PDF support: pip install pypdf"
182            )
183        import logging
184        logging.getLogger("pypdf").setLevel(logging.ERROR)
185        reader = pypdf.PdfReader(io.BytesIO(raw))
186        return "\n".join(page.extract_text() or "" for page in reader.pages)
187
188    @staticmethod
189    def _extract_docx(raw: bytes) -> str:
190        try:
191            import docx  # type: ignore
192        except ImportError:
193            raise ImportError(
194                "python-docx required for DOCX support: pip install python-docx"
195            )
196        doc = docx.Document(io.BytesIO(raw))
197        return "\n".join(p.text for p in doc.paragraphs if p.text.strip())

Loads blobs from an Azure Blob Storage container into Document objects.

Lists all blobs in the container (optionally filtered by a path prefix), downloads their content, and extracts text. Blobs whose extension is not in the supported set are skipped automatically.

Supported file types:

  • Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .html, .xml
  • PDF (.pdf) — requires pip install pypdf
  • Word Document (.docx) — requires pip install python-docx

Metadata keys set on every returned Document: source Full blob URL file_name Last path segment of the blob name blob_name Full blob path inside the container extension Lowercase extension including dot size_bytes Blob content length in bytes modified_at Last modified time in ISO 8601 format container Container name

Example::

from gmf_forge_ai_data.connectors import BlobStorageConnector

connector = BlobStorageConnector(
    account_name="myaccount",
    access_key="your-storage-account-access-key",
    container_name="documents",
    prefix="knowledge-base/",
)
docs = connector.load()
BlobStorageConnector( account_name: str, access_key: str, container_name: str, prefix: str = '', ssl_cert_path: Optional[str] = None)
 70    def __init__(
 71        self,
 72        account_name: str,
 73        access_key: str,
 74        container_name: str,
 75        prefix: str = "",
 76        ssl_cert_path: Optional[str] = None,
 77    ):
 78        """
 79        Args:
 80            account_name:  Storage account name.
 81            access_key:    Storage account access key.
 82            container_name: Name of the blob container to read from.
 83            prefix:        Optional blob name prefix for filtering, acting
 84                           like a folder path (e.g. ``"knowledge-base/"``).
 85                           Pass ``""`` (default) to list the entire container.
 86            ssl_cert_path: Optional path to a CA bundle PEM file for
 87                           environments with corporate SSL inspection.
 88        """
 89        self.account_name = account_name
 90        self.access_key = access_key
 91        self.container_name = container_name
 92        self.connection_string = (
 93            f"DefaultEndpointsProtocol=https;"
 94            f"AccountName={account_name};"
 95            f"AccountKey={access_key};"
 96            f"EndpointSuffix=core.windows.net"
 97        )
 98        self.account_url = f"https://{account_name}.blob.core.windows.net"
 99        self.prefix = prefix
100        self.ssl_cert_path = ssl_cert_path
101        self._logger = BasicLogger(__name__)

Args: account_name: Storage account name. access_key: Storage account access key. container_name: Name of the blob container to read from. prefix: Optional blob name prefix for filtering, acting like a folder path (e.g. "knowledge-base/"). Pass "" (default) to list the entire container. ssl_cert_path: Optional path to a CA bundle PEM file for environments with corporate SSL inspection.

account_name
access_key
container_name
connection_string
account_url
prefix
ssl_cert_path
def load(self) -> List[gmf_forge_ai_data.Document]:
103    def load(self) -> List[Document]:
104        """
105        List and download all supported blobs in the container under ``prefix``.
106
107        Returns:
108            List of Document objects, one per successfully loaded blob.
109
110        Raises:
111            ImportError: If the ``azure-storage-blob`` package is not installed.
112        """
113        try:
114            from azure.storage.blob import BlobServiceClient  # type: ignore
115        except ImportError:
116            raise ImportError(
117                "azure-storage-blob is required for BlobStorageConnector. "
118                "Install it with: pip install azure-storage-blob"
119            )
120
121        client = BlobServiceClient.from_connection_string(self.connection_string)
122        container_client = client.get_container_client(self.container_name)
123
124        documents: List[Document] = []
125        for blob in container_client.list_blobs(name_starts_with=self.prefix or None):
126            name: str = blob.name
127            ext = ("." + name.rsplit(".", 1)[-1]).lower() if "." in name else ""
128            if ext not in _SUPPORTED_EXTENSIONS:
129                continue
130
131            try:
132                content = self._download_blob(container_client, name, ext)
133            except Exception as e:
134                self._logger.warning("Skipping blob", blob=name, error=str(e))
135                continue
136
137            if not content.strip():
138                continue
139
140            modified = blob.last_modified
141            size = blob.size or 0
142            blob_url = f"{self.account_url}/{self.container_name}/{name}"
143            doc_id = "blob_" + hashlib.md5(blob_url.encode()).hexdigest()[:12]
144            file_name = name.split("/")[-1]
145
146            documents.append(Document(
147                id=doc_id,
148                content=content.strip(),
149                timestamp=(
150                    modified if isinstance(modified, datetime) else datetime.now()
151                ),
152                metadata={
153                    "source": blob_url,
154                    "file_name": file_name,
155                    "blob_name": name,
156                    "extension": ext,
157                    "size_bytes": size,
158                    "modified_at": modified.isoformat() if modified else "",
159                    "container": self.container_name,
160                },
161            ))
162
163        return documents

List and download all supported blobs in the container under prefix.

Returns: List of Document objects, one per successfully loaded blob.

Raises: ImportError: If the azure-storage-blob package is not installed.

class SoapConnector(gmf_forge_ai_data.connectors.BaseConnector):
 89class SoapConnector(BaseConnector):
 90    """
 91    Generic SOAP / WSDL connector base class.
 92
 93    Manages the complete connection lifecycle for a single WSDL endpoint:
 94    authentication via HTTP Basic Auth, optional SSL certificate configuration,
 95    lazy connection initialisation, method invocation, and automatic response
 96    serialisation from zeep objects to plain Python dicts.
 97
 98    Subclass this to add service-specific method wrappers and implement
 99    ``load()`` with your document-extraction logic.
100
101    Parameters
102    ----------
103    wsdl_url:
104        Full URL to the WSDL endpoint, e.g.
105        ``"https://example.com/Api/AdminApi.svc?wsdl"``.
106    username:
107        HTTP Basic Auth username for the service.
108    password:
109        HTTP Basic Auth password for the service.
110    credentials_type_qname:
111        Fully qualified name of the SOAP ``Credentials`` complex type as
112        defined in the WSDL, e.g.
113        ``"{http://schemas.datacontract.org/2004/07/My.Api}Credentials"``.
114        When supplied, a typed credentials object is created and injected
115        automatically into every ``call()`` invocation as the
116        ``credentials`` keyword argument.
117        When ``None`` (default), no credentials object is injected — use
118        this when the service authenticates via HTTP headers only.
119    credentials_username_field:
120        Field name for the username inside the Credentials complex type.
121        Default: ``"Username"``.
122    credentials_password_field:
123        Field name for the password inside the Credentials complex type.
124        Default: ``"Password"``.
125    ssl_verify:
126        Whether to verify SSL certificates. Set to ``False`` for services
127        using self-signed certificates (issues a warning).
128        Default: ``True``.
129    ssl_cert_path:
130        Path to a CA certificate bundle PEM file for corporate SSL
131        inspection proxies. Sets ``REQUESTS_CA_BUNDLE`` and
132        ``SSL_CERT_FILE`` environment variables automatically.
133    connect_on_init:
134        If ``True``, call ``connect()`` immediately in ``__init__``.
135        If ``False`` (default), connection is established lazily on the
136        first ``call()`` invocation.
137
138    Raises
139    ------
140    ImportError
141        If the ``zeep`` package is not installed.
142    """
143
144    def __init__(
145        self,
146        wsdl_url: str,
147        username: str,
148        password: str,
149        credentials_type_qname: Optional[str] = None,
150        credentials_username_field: str = "Username",
151        credentials_password_field: str = "Password",
152        ssl_verify: Union[bool, str] = True,
153        ssl_cert_path: Optional[str] = None,
154        connect_on_init: bool = False,
155    ) -> None:
156        try:
157            import zeep  # noqa: F401
158        except ImportError as exc:
159            raise ImportError(
160                "zeep is required for SoapConnector. "
161                "Install it with:  pip install zeep"
162            ) from exc
163
164        self.wsdl_url = wsdl_url
165        self.username = username
166        self.password = password
167        self._credentials_type_qname = credentials_type_qname
168        self._credentials_username_field = credentials_username_field
169        self._credentials_password_field = credentials_password_field
170        self.ssl_verify: Union[bool, str] = ssl_verify
171        self.ssl_cert_path = ssl_cert_path
172
173        self._client: Optional[Any] = None       # zeep.Client
174        self._credentials: Optional[Any] = None  # typed Credentials object
175        self._logger = BasicLogger(__name__)
176
177        if ssl_cert_path:
178            import os as _os
179            _os.environ.setdefault("REQUESTS_CA_BUNDLE", ssl_cert_path)
180            _os.environ.setdefault("SSL_CERT_FILE", ssl_cert_path)
181            self.ssl_verify = ssl_cert_path
182
183        if not ssl_verify:
184            import urllib3
185            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
186
187        if connect_on_init:
188            self.connect()
189
190    # ------------------------------------------------------------------ #
191    # Connection management                                                #
192    # ------------------------------------------------------------------ #
193
194    def connect(self) -> None:
195        """
196        Establish the SOAP connection.
197
198        Creates a ``zeep.Client`` configured with HTTP Basic Auth and the
199        SSL settings supplied at construction. If ``credentials_type_qname``
200        was provided, also builds the typed Credentials object that will be
201        injected into every subsequent ``call()``.
202
203        This method is idempotent — calling it when already connected is a
204        no-op. It is called automatically by ``call()`` on first use, so
205        explicit calls are optional unless you want to fail fast at startup.
206
207        Raises
208        ------
209        Exception
210            Any exception raised by the zeep ``Client`` constructor (e.g.
211            network unreachable, invalid WSDL URL) is logged and re-raised.
212        """
213        if self._client is not None:
214            return
215
216        from zeep import Client
217        from zeep.transports import Transport
218        from requests import Session
219        from requests.auth import HTTPBasicAuth
220
221        try:
222            session = Session()
223            session.auth = HTTPBasicAuth(self.username, self.password)
224            session.verify = self.ssl_verify
225            transport = Transport(session=session)
226            self._client = Client(wsdl=self.wsdl_url, transport=transport)
227            self._logger.info("SOAP client connected", wsdl=self.wsdl_url)
228        except Exception as exc:
229            self._logger.error(
230                "Failed to connect SOAP client",
231                wsdl=self.wsdl_url,
232                error=str(exc),
233            )
234            raise
235
236        # Build the typed Credentials object if the WSDL uses one
237        if self._credentials_type_qname:
238            try:
239                cred_type = self._client.get_type(self._credentials_type_qname)
240                self._credentials = cred_type(**{
241                    self._credentials_username_field: self.username,
242                    self._credentials_password_field: self.password,
243                })
244                self._logger.info(
245                    "SOAP credentials object created",
246                    type_qname=self._credentials_type_qname,
247                )
248            except Exception as exc:
249                self._logger.warning(
250                    "Could not create typed credentials object — "
251                    "credentials will NOT be injected automatically",
252                    type_qname=self._credentials_type_qname,
253                    error=str(exc),
254                )
255
256    def disconnect(self) -> None:
257        """Close the underlying requests session and reset the client."""
258        if self._client is not None:
259            try:
260                self._client.transport.session.close()
261            except Exception:
262                pass
263            self._client = None
264            self._credentials = None
265            self._logger.info("SOAP client disconnected", wsdl=self.wsdl_url)
266
267    # ------------------------------------------------------------------ #
268    # Method invocation                                                    #
269    # ------------------------------------------------------------------ #
270
271    def call(self, method: str, **kwargs: Any) -> Any:
272        """
273        Invoke a SOAP service method and return a serialised plain dict.
274
275        Connects automatically on first use. If a typed Credentials object
276        was built during ``connect()``, it is injected as the ``credentials``
277        keyword argument before the call — no need to pass it manually.
278
279        Parameters
280        ----------
281        method:
282            Name of the SOAP operation as declared in the WSDL, e.g.
283            ``"GetDocumentsByFolder"``.
284        **kwargs:
285            Additional keyword arguments forwarded to the operation, e.g.
286            ``folderId="abc-123"``.
287
288        Returns
289        -------
290        Any
291            Python-native value (dict, list, str, int, etc.) produced by
292            ``zeep.helpers.serialize_object()``. All zeep-specific types
293            are stripped so callers work with plain Python objects.
294
295        Raises
296        ------
297        AttributeError
298            If ``method`` is not found in the WSDL service definition.
299        Exception
300            Any SOAP fault or transport-level error raised by zeep.
301        """
302        self.connect()  # no-op if already connected
303
304        if self._credentials is not None and "credentials" not in kwargs:
305            kwargs["credentials"] = self._credentials
306
307        operation = getattr(self._client.service, method)
308
309        self._logger.debug("Calling SOAP method", method=method)
310        try:
311            raw = operation(**kwargs)
312        except Exception as exc:
313            err = str(exc)
314            # SOAP faults that are navigational noise (folder not found,
315            # invalid id) stay at debug.  All other faults log at warning —
316            # the caller decides whether to escalate further.
317            _navigational = ("not be found", "invalid", "could not be found")
318            if any(e in err.lower() for e in _navigational):
319                self._logger.debug("SOAP method call failed", method=method, error=err)
320            else:
321                self._logger.warning("SOAP method call failed", method=method, error=err)
322            raise
323
324        from zeep import helpers
325        result = helpers.serialize_object(raw, target_cls=dict)
326        self._logger.debug("SOAP method returned", method=method)
327        return result
328
329    # ------------------------------------------------------------------ #
330    # Text extraction utility                                              #
331    # ------------------------------------------------------------------ #
332
333    def extract_text(
334        self,
335        data: Union[bytes, str],
336        filename: str,
337    ) -> str:
338        """
339        Extract plain text from raw file bytes or base64-encoded data.
340
341        Dispatches by file extension:
342        - ``.pdf``  — extracts text via ``pypdf`` (``pip install pypdf``)
343        - ``.docx`` — extracts text via ``python-docx`` (``pip install python-docx``)
344        - anything else — decodes as UTF-8
345
346        Call this inside your ``load()`` implementation after retrieving the
347        raw file content from the service::
348
349            data    = self.call("GetDocumentData", documentId=doc_id)
350            content = self.extract_text(data["Data"], "policy.pdf")
351
352        Parameters
353        ----------
354        data:
355            Raw bytes **or** a base64-encoded string as typically returned by
356            SOAP document download operations.
357        filename:
358            File name (or any string ending with the correct extension) used
359            to determine the extraction strategy, e.g. ``"policy.pdf"``.
360
361        Returns
362        -------
363        str
364            Extracted plain text. Returns an empty string when ``data`` is
365            empty or ``None``.
366        """
367        if not data:
368            return ""
369
370        # Normalise to bytes
371        if isinstance(data, str):
372            try:
373                file_bytes = base64.b64decode(data)
374            except Exception:
375                # Not base64 — treat as raw text
376                return data
377        else:
378            file_bytes = data
379
380        ext = ("." + filename.rsplit(".", 1)[-1]).lower() if "." in filename else ""
381
382        if ext == ".pdf":
383            return self._extract_pdf(file_bytes, filename)
384        if ext == ".docx":
385            return self._extract_docx(file_bytes, filename)
386
387        # Default: decode as UTF-8
388        try:
389            return file_bytes.decode("utf-8", errors="replace")
390        except Exception as exc:
391            self._logger.warning(
392                "Text decode failed, returning empty string",
393                file=filename,
394                error=str(exc),
395            )
396            return ""
397
398    # ------------------------------------------------------------------ #
399    # Abstract interface (developer must implement)                        #
400    # ------------------------------------------------------------------ #
401
402    @abstractmethod
403    def load(self) -> List[Document]:
404        """
405        Load documents from the SOAP service.
406
407        This method is entirely the developer's responsibility. Use
408        ``self.call()`` to invoke service methods and ``self.extract_text()``
409        to convert file bytes to plain text. Assemble and return
410        ``Document`` objects with the ``id``, ``content``, and ``metadata``
411        fields that matter for your use case.
412
413        Returns
414        -------
415        List[Document]
416            One Document per logical document retrieved from the service.
417            ``embedding`` should always be ``None`` — the caller embeds.
418        """
419
420    # ------------------------------------------------------------------ #
421    # Private helpers                                                      #
422    # ------------------------------------------------------------------ #
423
424    def _extract_pdf(self, file_bytes: bytes, filename: str) -> str:
425        try:
426            import pypdf  # type: ignore
427        except ImportError:
428            self._logger.warning(
429                "pypdf not installed — skipping PDF extraction",
430                file=filename,
431            )
432            return f"[PDF extraction skipped — install pypdf: {filename}]"
433        try:
434            reader = pypdf.PdfReader(io.BytesIO(file_bytes))
435            return "\n\n".join(
436                page.extract_text() or "" for page in reader.pages
437            ).strip()
438        except Exception as exc:
439            self._logger.warning(
440                "PDF extraction failed", file=filename, error=str(exc)
441            )
442            return f"[PDF extraction error: {filename}]"
443
444    def _extract_docx(self, file_bytes: bytes, filename: str) -> str:
445        try:
446            import docx  # type: ignore
447        except ImportError:
448            self._logger.warning(
449                "python-docx not installed — skipping DOCX extraction",
450                file=filename,
451            )
452            return f"[DOCX extraction skipped — install python-docx: {filename}]"
453        try:
454            doc = docx.Document(io.BytesIO(file_bytes))
455            return "\n\n".join(
456                p.text for p in doc.paragraphs if p.text.strip()
457            ).strip()
458        except Exception as exc:
459            self._logger.warning(
460                "DOCX extraction failed", file=filename, error=str(exc)
461            )
462            return f"[DOCX extraction error: {filename}]"
463
464    # ------------------------------------------------------------------ #
465    # Convenience                                                          #
466    # ------------------------------------------------------------------ #
467
468    def _make_doc_id(self, *parts: str) -> str:
469        """
470        Generate a stable document ID from one or more string parts.
471
472        Convenience helper for ``load()`` implementations to produce
473        consistent, collision-resistant IDs::
474
475            doc_id = self._make_doc_id(str(doc["DocumentId"]))
476        """
477        combined = "_".join(str(p) for p in parts)
478        return "soap_" + hashlib.md5(combined.encode()).hexdigest()[:12]
479
480    def __enter__(self) -> "SoapConnector":
481        self.connect()
482        return self
483
484    def __exit__(self, *_: Any) -> None:
485        self.disconnect()

Generic SOAP / WSDL connector base class.

Manages the complete connection lifecycle for a single WSDL endpoint: authentication via HTTP Basic Auth, optional SSL certificate configuration, lazy connection initialisation, method invocation, and automatic response serialisation from zeep objects to plain Python dicts.

Subclass this to add service-specific method wrappers and implement load() with your document-extraction logic.

Parameters

wsdl_url: Full URL to the WSDL endpoint, e.g. "https://example.com/Api/AdminApi.svc?wsdl". username: HTTP Basic Auth username for the service. password: HTTP Basic Auth password for the service. credentials_type_qname: Fully qualified name of the SOAP Credentials complex type as defined in the WSDL, e.g. "{http://schemas.datacontract.org/2004/07/My.Api}Credentials". When supplied, a typed credentials object is created and injected automatically into every call() invocation as the credentials keyword argument. When None (default), no credentials object is injected — use this when the service authenticates via HTTP headers only. credentials_username_field: Field name for the username inside the Credentials complex type. Default: "Username". credentials_password_field: Field name for the password inside the Credentials complex type. Default: "Password". ssl_verify: Whether to verify SSL certificates. Set to False for services using self-signed certificates (issues a warning). Default: True. ssl_cert_path: Path to a CA certificate bundle PEM file for corporate SSL inspection proxies. Sets REQUESTS_CA_BUNDLE and SSL_CERT_FILE environment variables automatically. connect_on_init: If True, call connect() immediately in __init__. If False (default), connection is established lazily on the first call() invocation.

Raises

ImportError If the zeep package is not installed.

wsdl_url
username
password
ssl_verify: Union[bool, str]
ssl_cert_path
def connect(self) -> None:
194    def connect(self) -> None:
195        """
196        Establish the SOAP connection.
197
198        Creates a ``zeep.Client`` configured with HTTP Basic Auth and the
199        SSL settings supplied at construction. If ``credentials_type_qname``
200        was provided, also builds the typed Credentials object that will be
201        injected into every subsequent ``call()``.
202
203        This method is idempotent — calling it when already connected is a
204        no-op. It is called automatically by ``call()`` on first use, so
205        explicit calls are optional unless you want to fail fast at startup.
206
207        Raises
208        ------
209        Exception
210            Any exception raised by the zeep ``Client`` constructor (e.g.
211            network unreachable, invalid WSDL URL) is logged and re-raised.
212        """
213        if self._client is not None:
214            return
215
216        from zeep import Client
217        from zeep.transports import Transport
218        from requests import Session
219        from requests.auth import HTTPBasicAuth
220
221        try:
222            session = Session()
223            session.auth = HTTPBasicAuth(self.username, self.password)
224            session.verify = self.ssl_verify
225            transport = Transport(session=session)
226            self._client = Client(wsdl=self.wsdl_url, transport=transport)
227            self._logger.info("SOAP client connected", wsdl=self.wsdl_url)
228        except Exception as exc:
229            self._logger.error(
230                "Failed to connect SOAP client",
231                wsdl=self.wsdl_url,
232                error=str(exc),
233            )
234            raise
235
236        # Build the typed Credentials object if the WSDL uses one
237        if self._credentials_type_qname:
238            try:
239                cred_type = self._client.get_type(self._credentials_type_qname)
240                self._credentials = cred_type(**{
241                    self._credentials_username_field: self.username,
242                    self._credentials_password_field: self.password,
243                })
244                self._logger.info(
245                    "SOAP credentials object created",
246                    type_qname=self._credentials_type_qname,
247                )
248            except Exception as exc:
249                self._logger.warning(
250                    "Could not create typed credentials object — "
251                    "credentials will NOT be injected automatically",
252                    type_qname=self._credentials_type_qname,
253                    error=str(exc),
254                )

Establish the SOAP connection.

Creates a zeep.Client configured with HTTP Basic Auth and the SSL settings supplied at construction. If credentials_type_qname was provided, also builds the typed Credentials object that will be injected into every subsequent call().

This method is idempotent — calling it when already connected is a no-op. It is called automatically by call() on first use, so explicit calls are optional unless you want to fail fast at startup.

Raises

Exception Any exception raised by the zeep Client constructor (e.g. network unreachable, invalid WSDL URL) is logged and re-raised.

def disconnect(self) -> None:
256    def disconnect(self) -> None:
257        """Close the underlying requests session and reset the client."""
258        if self._client is not None:
259            try:
260                self._client.transport.session.close()
261            except Exception:
262                pass
263            self._client = None
264            self._credentials = None
265            self._logger.info("SOAP client disconnected", wsdl=self.wsdl_url)

Close the underlying requests session and reset the client.

def call(self, method: str, **kwargs: Any) -> Any:
271    def call(self, method: str, **kwargs: Any) -> Any:
272        """
273        Invoke a SOAP service method and return a serialised plain dict.
274
275        Connects automatically on first use. If a typed Credentials object
276        was built during ``connect()``, it is injected as the ``credentials``
277        keyword argument before the call — no need to pass it manually.
278
279        Parameters
280        ----------
281        method:
282            Name of the SOAP operation as declared in the WSDL, e.g.
283            ``"GetDocumentsByFolder"``.
284        **kwargs:
285            Additional keyword arguments forwarded to the operation, e.g.
286            ``folderId="abc-123"``.
287
288        Returns
289        -------
290        Any
291            Python-native value (dict, list, str, int, etc.) produced by
292            ``zeep.helpers.serialize_object()``. All zeep-specific types
293            are stripped so callers work with plain Python objects.
294
295        Raises
296        ------
297        AttributeError
298            If ``method`` is not found in the WSDL service definition.
299        Exception
300            Any SOAP fault or transport-level error raised by zeep.
301        """
302        self.connect()  # no-op if already connected
303
304        if self._credentials is not None and "credentials" not in kwargs:
305            kwargs["credentials"] = self._credentials
306
307        operation = getattr(self._client.service, method)
308
309        self._logger.debug("Calling SOAP method", method=method)
310        try:
311            raw = operation(**kwargs)
312        except Exception as exc:
313            err = str(exc)
314            # SOAP faults that are navigational noise (folder not found,
315            # invalid id) stay at debug.  All other faults log at warning —
316            # the caller decides whether to escalate further.
317            _navigational = ("not be found", "invalid", "could not be found")
318            if any(e in err.lower() for e in _navigational):
319                self._logger.debug("SOAP method call failed", method=method, error=err)
320            else:
321                self._logger.warning("SOAP method call failed", method=method, error=err)
322            raise
323
324        from zeep import helpers
325        result = helpers.serialize_object(raw, target_cls=dict)
326        self._logger.debug("SOAP method returned", method=method)
327        return result

Invoke a SOAP service method and return a serialised plain dict.

Connects automatically on first use. If a typed Credentials object was built during connect(), it is injected as the credentials keyword argument before the call — no need to pass it manually.

Parameters

method: Name of the SOAP operation as declared in the WSDL, e.g. "GetDocumentsByFolder". **kwargs: Additional keyword arguments forwarded to the operation, e.g. folderId="abc-123".

Returns

Any Python-native value (dict, list, str, int, etc.) produced by zeep.helpers.serialize_object(). All zeep-specific types are stripped so callers work with plain Python objects.

Raises

AttributeError If method is not found in the WSDL service definition. Exception Any SOAP fault or transport-level error raised by zeep.

def extract_text(self, data: Union[bytes, str], filename: str) -> str:
333    def extract_text(
334        self,
335        data: Union[bytes, str],
336        filename: str,
337    ) -> str:
338        """
339        Extract plain text from raw file bytes or base64-encoded data.
340
341        Dispatches by file extension:
342        - ``.pdf``  — extracts text via ``pypdf`` (``pip install pypdf``)
343        - ``.docx`` — extracts text via ``python-docx`` (``pip install python-docx``)
344        - anything else — decodes as UTF-8
345
346        Call this inside your ``load()`` implementation after retrieving the
347        raw file content from the service::
348
349            data    = self.call("GetDocumentData", documentId=doc_id)
350            content = self.extract_text(data["Data"], "policy.pdf")
351
352        Parameters
353        ----------
354        data:
355            Raw bytes **or** a base64-encoded string as typically returned by
356            SOAP document download operations.
357        filename:
358            File name (or any string ending with the correct extension) used
359            to determine the extraction strategy, e.g. ``"policy.pdf"``.
360
361        Returns
362        -------
363        str
364            Extracted plain text. Returns an empty string when ``data`` is
365            empty or ``None``.
366        """
367        if not data:
368            return ""
369
370        # Normalise to bytes
371        if isinstance(data, str):
372            try:
373                file_bytes = base64.b64decode(data)
374            except Exception:
375                # Not base64 — treat as raw text
376                return data
377        else:
378            file_bytes = data
379
380        ext = ("." + filename.rsplit(".", 1)[-1]).lower() if "." in filename else ""
381
382        if ext == ".pdf":
383            return self._extract_pdf(file_bytes, filename)
384        if ext == ".docx":
385            return self._extract_docx(file_bytes, filename)
386
387        # Default: decode as UTF-8
388        try:
389            return file_bytes.decode("utf-8", errors="replace")
390        except Exception as exc:
391            self._logger.warning(
392                "Text decode failed, returning empty string",
393                file=filename,
394                error=str(exc),
395            )
396            return ""

Extract plain text from raw file bytes or base64-encoded data.

Dispatches by file extension:

  • .pdf — extracts text via pypdf (pip install pypdf)
  • .docx — extracts text via python-docx (pip install python-docx)
  • anything else — decodes as UTF-8

Call this inside your load() implementation after retrieving the raw file content from the service::

data    = self.call("GetDocumentData", documentId=doc_id)
content = self.extract_text(data["Data"], "policy.pdf")

Parameters

data: Raw bytes or a base64-encoded string as typically returned by SOAP document download operations. filename: File name (or any string ending with the correct extension) used to determine the extraction strategy, e.g. "policy.pdf".

Returns

str Extracted plain text. Returns an empty string when data is empty or None.

@abstractmethod
def load(self) -> List[gmf_forge_ai_data.Document]:
402    @abstractmethod
403    def load(self) -> List[Document]:
404        """
405        Load documents from the SOAP service.
406
407        This method is entirely the developer's responsibility. Use
408        ``self.call()`` to invoke service methods and ``self.extract_text()``
409        to convert file bytes to plain text. Assemble and return
410        ``Document`` objects with the ``id``, ``content``, and ``metadata``
411        fields that matter for your use case.
412
413        Returns
414        -------
415        List[Document]
416            One Document per logical document retrieved from the service.
417            ``embedding`` should always be ``None`` — the caller embeds.
418        """

Load documents from the SOAP service.

This method is entirely the developer's responsibility. Use self.call() to invoke service methods and self.extract_text() to convert file bytes to plain text. Assemble and return Document objects with the id, content, and metadata fields that matter for your use case.

Returns

List[Document] One Document per logical document retrieved from the service. embedding should always be None — the caller embeds.

class AzureDevOpsWikiConnector(gmf_forge_ai_data.connectors.BaseConnector):
 35class AzureDevOpsWikiConnector(BaseConnector):
 36    """
 37    Loads pages from one or more Azure DevOps wikis via the REST API.
 38
 39    The developer provides an organization, project, and PAT. The connector
 40    handles everything else: auth, wiki discovery, recursive page-tree
 41    traversal, URL encoding, content fetching, and Document creation.
 42
 43    Content is served as Markdown -- pair with WikiPageChunker.chunk_markdown()
 44    or MarkdownChunker for best results.
 45
 46    There are three loading modes depending on how much of the wiki you need.
 47    See the examples below for guidance on which to use.
 48
 49    Metadata keys set on every returned Document:
 50        ``source``       Browser URL of the page
 51        ``page_path``    Full path within the wiki (e.g. ``/Team Wikis/Overview``)
 52        ``root_path``    Sub-tree root this page was loaded under
 53        ``wiki_id``      Wiki name used as the API identifier
 54        ``wiki_name``    Human-readable wiki name
 55        ``organization`` Azure DevOps organization name
 56        ``project``      Azure DevOps project name
 57
 58    NOTE: ADO REST API paths use the actual page title with spaces
 59    (e.g. ``/Team Wikis``), NOT the hyphenated browser URL slug
 60    (e.g. ``/Team-Wikis``).
 61
 62    Example -- Mode 1: root_path (entire sub-tree from one starting point)
 63        Use when you want everything under a single section. One API call
 64        fetches the full tree; all descendant pages are loaded.
 65
 66        ```python
 67        connector = AzureDevOpsWikiConnector(
 68            organization="MyOrg",
 69            project="MyProject",
 70            pat="your-personal-access-token",
 71            root_path="/Team Wikis",   # loads this page + all its children
 72        )
 73        docs = connector.load()
 74        ```
 75
 76    Example -- Mode 2: page_paths exact (specific pages, no children)
 77        Use when you know the exact pages you need. Tree discovery is skipped;
 78        only the listed paths are fetched (page_paths_recursive=False, the default).
 79
 80        ```python
 81        connector = AzureDevOpsWikiConnector(
 82            organization="MyOrg",
 83            project="MyProject",
 84            pat="your-personal-access-token",
 85            page_paths=["/Team Wikis/Innovation and GenAI Lab"],
 86        )
 87        docs = connector.load()
 88        ```
 89
 90    Example -- Mode 3: page_paths recursive (multiple targeted sub-trees)
 91        Use when you want full sub-trees for several sections without loading
 92        the entire wiki. Each entry is treated as its own root: the page itself
 93        and all its descendants are loaded. page_paths takes precedence over
 94        root_path when both are provided.
 95
 96        ```python
 97        connector = AzureDevOpsWikiConnector(
 98            organization="MyOrg",
 99            project="MyProject",
100            pat="your-personal-access-token",
101            page_paths=[
102                "/Team Wikis/Innovation and GenAI Lab",
103                "/Team Wikis/GMF Bank",
104            ],
105            page_paths_recursive=True,
106        )
107        docs = connector.load()
108        ```
109    """
110
111    _API_BASE = "https://dev.azure.com"
112    _API_VERSION = "7.1"
113
114    def __init__(
115        self,
116        organization: str,
117        project: str,
118        pat: str,
119        wiki_ids: Optional[List[str]] = None,
120        root_path: str = "/",
121        page_paths: Optional[List[str]] = None,
122        page_paths_recursive: bool = False,
123        ssl_cert_path: Optional[str] = None,
124    ):
125        """
126        Args:
127            organization:        Azure DevOps organization name (e.g. ``"MyOrg"``).
128            project:             Azure DevOps project name (e.g. ``"MyProject"``).
129            pat:                 Personal Access Token with Wiki (Read) scope.
130            wiki_ids:            List of wiki identifiers (names or GUIDs) to ingest.
131                                 Pass ``None`` (default) to auto-discover and ingest
132                                 all wikis in the project.
133            root_path:           Starting page path for the tree walk.
134                                 ``"/"`` (default) ingests the entire wiki.
135                                 Any sub-path (e.g. ``"/Team Wikis"``) scopes
136                                 ingestion to that page and all its descendants.
137                                 Ignored when ``page_paths`` is provided.
138            page_paths:          Explicit list of page paths to fetch (e.g.
139                                 ``["/Team Wikis/Innovation and GenAI Lab"]``).
140                                 When set, tree discovery via ``root_path`` is
141                                 skipped.  Each entry is fetched exactly as-is
142                                 unless ``page_paths_recursive=True``.
143            page_paths_recursive: When ``True`` and ``page_paths`` is set, each
144                                 path is treated as a sub-tree root: the page
145                                 itself **and all its descendants** are loaded.
146                                 When ``False`` (default), only the exact pages
147                                 listed are fetched.
148            ssl_cert_path:       Path to a CA bundle PEM file for environments
149                                 with corporate SSL inspection (optional).
150        """
151        self._org = organization
152        self._project = project
153        self._pat = pat
154        self._wiki_ids = wiki_ids
155        self._root_path = root_path.rstrip("/") or "/"
156        self._page_paths = page_paths
157        self._page_paths_recursive = page_paths_recursive
158        self._verify: str | bool = ssl_cert_path or True
159        self._logger = BasicLogger(__name__)
160
161    # ── Public interface ─────────────────────────────────────────────────────
162
163    def load(self) -> List[Document]:
164        """
165        Load all pages from the configured Azure DevOps wiki(s).
166
167        Handles everything automatically:
168        - Discovers all wikis if wiki_ids is not specified
169        - Walks the full page tree from root_path
170        - Fetches Markdown content for each page
171        - Returns one Document per page with complete metadata
172
173        Returns:
174            List of Document objects, one per successfully loaded page.
175
176        Raises:
177            ImportError: If the ``requests`` package is not installed.
178            requests.HTTPError: If authentication or a critical API call fails.
179        """
180        try:
181            import requests  # type: ignore
182        except ImportError:
183            raise ImportError(
184                "requests is required for AzureDevOpsWikiConnector. "
185                "Install it with: pip install requests"
186            )
187
188        session = self._build_session(requests)
189
190        # Auto-discover wikis if not specified
191        wiki_list = self._list_wikis(session)
192        if self._wiki_ids:
193            # Match on GUID *or* name — the ADO list-wikis endpoint returns
194            # a GUID as ``id`` but users typically supply the readable wiki
195            # name (e.g. "MyProject.wiki").  Matching on name covers that case.
196            wiki_list = [
197                (wid, name) for wid, name in wiki_list
198                if wid in self._wiki_ids or name in self._wiki_ids
199            ]
200            # If discovery returned nothing (empty project, auth issue, etc.)
201            # fall back to using the caller-supplied identifiers directly.
202            # The ADO pages API requires the wiki *name* as wikiIdentifier
203            # (GUIDs are only accepted by the wiki-level endpoints, not pages).
204            if not wiki_list:
205                self._logger.warning(
206                    "Wiki discovery returned no matches — using provided wiki_ids directly",
207                    wiki_ids=self._wiki_ids,
208                )
209                wiki_list = [(wid, wid) for wid in self._wiki_ids]
210
211        effective_scope = self._page_paths if self._page_paths is not None else [self._root_path]
212        self._logger.info(
213            "Starting Azure DevOps Wiki ingest",
214            organization=self._org,
215            project=self._project,
216            wikis=[wid for wid, _ in wiki_list],
217            scope=effective_scope,
218        )
219
220        documents: List[Document] = []
221        for wiki_id, wiki_name in wiki_list:
222            # The ADO pages API uses the wiki *name* as the wikiIdentifier in the
223            # URL path — GUIDs work only for the wiki-metadata endpoints, not pages.
224            api_identifier = wiki_name
225            self._logger.info("Ingesting wiki", wiki_id=wiki_id, wiki_name=wiki_name)
226
227            # Build (page_path, origin_root) pairs so _fetch_page can store the
228            # correct root_path in metadata without mutating self._root_path.
229            if self._page_paths is not None:
230                if self._page_paths_recursive:
231                    # Treat each entry as a sub-tree root; collect all descendants.
232                    path_root_pairs: List[Tuple[str, str]] = []
233                    seen: set = set()
234                    for root in self._page_paths:
235                        for p in self._collect_page_paths(session, api_identifier, root_path=root):
236                            if p not in seen:
237                                seen.add(p)
238                                path_root_pairs.append((p, root))
239                    self._logger.info(
240                        "Using recursive page list",
241                        wiki_id=wiki_id,
242                        roots=self._page_paths,
243                        count=len(path_root_pairs),
244                    )
245                else:
246                    # Exact pages only — each path is its own root for metadata.
247                    path_root_pairs = [(p, p) for p in self._page_paths]
248                    self._logger.info(
249                        "Using explicit page list",
250                        wiki_id=wiki_id,
251                        count=len(path_root_pairs),
252                    )
253            else:
254                collected = self._collect_page_paths(session, api_identifier)
255                path_root_pairs = [(p, self._root_path) for p in collected]
256                self._logger.info("Pages found", wiki_id=wiki_id, count=len(path_root_pairs))
257
258            for path, origin_root in path_root_pairs:
259                doc = self._fetch_page(session, api_identifier, wiki_name, path, root_path=origin_root)
260                if doc:
261                    documents.append(doc)
262
263        self._logger.info(
264            "Azure DevOps Wiki ingest complete",
265            total_documents=len(documents),
266            scope=effective_scope,
267        )
268        return documents
269
270    # ── Private helpers ──────────────────────────────────────────────────────
271
272    def _build_session(self, requests):
273        """Create a requests.Session with PAT Basic Auth pre-configured."""
274        token = base64.b64encode(f":{self._pat}".encode()).decode()
275        session = requests.Session()
276        session.headers.update({
277            "Authorization": f"Basic {token}",
278            "Accept": "application/json",
279        })
280        session.verify = self._verify
281        return session
282
283    def _api_url(self, path: str) -> str:
284        return (
285            f"{self._API_BASE}/{self._org}/{self._project}"
286            f"/_apis/wiki/{path}"
287        )
288
289    def _get(self, session, path: str, params: Optional[Dict] = None) -> dict:
290        url = self._api_url(path)
291        all_params: Dict = {"api-version": self._API_VERSION}
292        if params:
293            all_params.update(params)
294        resp = session.get(url, params=all_params, timeout=30)
295        resp.raise_for_status()
296        return resp.json()
297
298    def _list_wikis(self, session) -> List[Tuple[str, str]]:
299        """
300        Return (id, name) pairs for all wikis in the project.
301        Reading name here avoids a separate API call per wiki.
302        """
303        try:
304            data = self._get(session, "wikis")
305        except Exception as exc:
306            self._logger.warning(
307                "Failed to list wikis — check PAT permissions (Wiki Read scope required)",
308                error=str(exc),
309            )
310            return []
311        return [(w["id"], w.get("name", w["id"])) for w in data.get("value", [])]
312
313    def _collect_page_paths(self, session, wiki_id: str, root_path=None) -> List[str]:
314        """
315        Walk the page tree from the given root_path and return a flat list of paths.
316
317        Uses recursionLevel=full to retrieve the entire sub-tree in a single
318        API call, then flattens it. The root "/" is never returned as a page
319        (it is a virtual container); all other paths -- including root_path
320        itself if it is not "/" -- are included.
321
322        Args:
323            root_path: Starting path for this call. Falls back to
324                       ``self._root_path`` when not provided.
325        """
326        effective_root = root_path if root_path is not None else self._root_path
327        try:
328            tree = self._get(
329                session,
330                f"wikis/{wiki_id}/pages",
331                params={
332                    "path": effective_root,
333                    "recursionLevel": "full",
334                    "includeContent": "false",
335                },
336            )
337        except Exception as exc:
338            self._logger.warning(
339                "Could not list pages for wiki",
340                wiki_id=wiki_id,
341                root_path=effective_root,
342                error=str(exc),
343            )
344            return []
345
346        paths: List[str] = []
347        self._walk_tree(tree, paths)
348        return paths
349
350    def _walk_tree(self, node: dict, paths: List[str]) -> None:
351        """
352        Recursively flatten the page tree into a list of paths.
353
354        Rule: include every path except the virtual root "/".
355        The API response is already scoped to root_path, so no
356        filtering beyond skipping "/" is needed.
357        """
358        path = node.get("path")
359        if path and path != "/":
360            paths.append(path)
361        for child in node.get("subPages", []):
362            self._walk_tree(child, paths)
363
364    def _fetch_page(
365        self,
366        session,
367        wiki_id: str,
368        wiki_name: str,
369        page_path: str,
370        root_path=None,
371    ) -> Optional[Document]:
372        """Fetch the Markdown content of a single page and return a Document."""
373        try:
374            data = self._get(
375                session,
376                f"wikis/{wiki_id}/pages",
377                params={
378                    "path": page_path,
379                    "includeContent": "true",
380                },
381            )
382        except Exception as exc:
383            self._logger.warning(
384                "Skipping page — failed to fetch",
385                wiki_id=wiki_id,
386                path=page_path,
387                error=str(exc),
388            )
389            return None
390
391        content: str = data.get("content", "").strip()
392        if not content:
393            self._logger.warning("Skipping empty page", wiki_id=wiki_id, path=page_path)
394            return None
395
396        doc_id = "adowiki_" + hashlib.md5(f"{wiki_id}{page_path}".encode()).hexdigest()[:12]
397
398        return Document(
399            id=doc_id,
400            content=content,
401            timestamp=datetime.now(),
402            metadata={
403                "source":       data.get("remoteUrl", ""),
404                "page_path":    page_path,
405                "root_path":    root_path if root_path is not None else self._root_path,
406                "wiki_id":      wiki_id,
407                "wiki_name":    wiki_name,
408                "organization": self._org,
409                "project":      self._project,
410            },
411        )

Loads pages from one or more Azure DevOps wikis via the REST API.

The developer provides an organization, project, and PAT. The connector handles everything else: auth, wiki discovery, recursive page-tree traversal, URL encoding, content fetching, and Document creation.

Content is served as Markdown -- pair with WikiPageChunker.chunk_markdown() or MarkdownChunker for best results.

There are three loading modes depending on how much of the wiki you need. See the examples below for guidance on which to use.

Metadata keys set on every returned Document: source Browser URL of the page page_path Full path within the wiki (e.g. /Team Wikis/Overview) root_path Sub-tree root this page was loaded under wiki_id Wiki name used as the API identifier wiki_name Human-readable wiki name organization Azure DevOps organization name project Azure DevOps project name

NOTE: ADO REST API paths use the actual page title with spaces (e.g. /Team Wikis), NOT the hyphenated browser URL slug (e.g. /Team-Wikis).

Example -- Mode 1: root_path (entire sub-tree from one starting point) Use when you want everything under a single section. One API call fetches the full tree; all descendant pages are loaded.

connector = AzureDevOpsWikiConnector(
    organization="MyOrg",
    project="MyProject",
    pat="your-personal-access-token",
    root_path="/Team Wikis",   # loads this page + all its children
)
docs = connector.load()

Example -- Mode 2: page_paths exact (specific pages, no children) Use when you know the exact pages you need. Tree discovery is skipped; only the listed paths are fetched (page_paths_recursive=False, the default).

connector = AzureDevOpsWikiConnector(
    organization="MyOrg",
    project="MyProject",
    pat="your-personal-access-token",
    page_paths=["/Team Wikis/Innovation and GenAI Lab"],
)
docs = connector.load()

Example -- Mode 3: page_paths recursive (multiple targeted sub-trees) Use when you want full sub-trees for several sections without loading the entire wiki. Each entry is treated as its own root: the page itself and all its descendants are loaded. page_paths takes precedence over root_path when both are provided.

connector = AzureDevOpsWikiConnector(
    organization="MyOrg",
    project="MyProject",
    pat="your-personal-access-token",
    page_paths=[
        "/Team Wikis/Innovation and GenAI Lab",
        "/Team Wikis/GMF Bank",
    ],
    page_paths_recursive=True,
)
docs = connector.load()
AzureDevOpsWikiConnector( organization: str, project: str, pat: str, wiki_ids: Optional[List[str]] = None, root_path: str = '/', page_paths: Optional[List[str]] = None, page_paths_recursive: bool = False, ssl_cert_path: Optional[str] = None)
114    def __init__(
115        self,
116        organization: str,
117        project: str,
118        pat: str,
119        wiki_ids: Optional[List[str]] = None,
120        root_path: str = "/",
121        page_paths: Optional[List[str]] = None,
122        page_paths_recursive: bool = False,
123        ssl_cert_path: Optional[str] = None,
124    ):
125        """
126        Args:
127            organization:        Azure DevOps organization name (e.g. ``"MyOrg"``).
128            project:             Azure DevOps project name (e.g. ``"MyProject"``).
129            pat:                 Personal Access Token with Wiki (Read) scope.
130            wiki_ids:            List of wiki identifiers (names or GUIDs) to ingest.
131                                 Pass ``None`` (default) to auto-discover and ingest
132                                 all wikis in the project.
133            root_path:           Starting page path for the tree walk.
134                                 ``"/"`` (default) ingests the entire wiki.
135                                 Any sub-path (e.g. ``"/Team Wikis"``) scopes
136                                 ingestion to that page and all its descendants.
137                                 Ignored when ``page_paths`` is provided.
138            page_paths:          Explicit list of page paths to fetch (e.g.
139                                 ``["/Team Wikis/Innovation and GenAI Lab"]``).
140                                 When set, tree discovery via ``root_path`` is
141                                 skipped.  Each entry is fetched exactly as-is
142                                 unless ``page_paths_recursive=True``.
143            page_paths_recursive: When ``True`` and ``page_paths`` is set, each
144                                 path is treated as a sub-tree root: the page
145                                 itself **and all its descendants** are loaded.
146                                 When ``False`` (default), only the exact pages
147                                 listed are fetched.
148            ssl_cert_path:       Path to a CA bundle PEM file for environments
149                                 with corporate SSL inspection (optional).
150        """
151        self._org = organization
152        self._project = project
153        self._pat = pat
154        self._wiki_ids = wiki_ids
155        self._root_path = root_path.rstrip("/") or "/"
156        self._page_paths = page_paths
157        self._page_paths_recursive = page_paths_recursive
158        self._verify: str | bool = ssl_cert_path or True
159        self._logger = BasicLogger(__name__)

Args: organization: Azure DevOps organization name (e.g. "MyOrg"). project: Azure DevOps project name (e.g. "MyProject"). pat: Personal Access Token with Wiki (Read) scope. wiki_ids: List of wiki identifiers (names or GUIDs) to ingest. Pass None (default) to auto-discover and ingest all wikis in the project. root_path: Starting page path for the tree walk. "/" (default) ingests the entire wiki. Any sub-path (e.g. "/Team Wikis") scopes ingestion to that page and all its descendants. Ignored when page_paths is provided. page_paths: Explicit list of page paths to fetch (e.g. ["/Team Wikis/Innovation and GenAI Lab"]). When set, tree discovery via root_path is skipped. Each entry is fetched exactly as-is unless page_paths_recursive=True. page_paths_recursive: When True and page_paths is set, each path is treated as a sub-tree root: the page itself and all its descendants are loaded. When False (default), only the exact pages listed are fetched. ssl_cert_path: Path to a CA bundle PEM file for environments with corporate SSL inspection (optional).

def load(self) -> List[gmf_forge_ai_data.Document]:
163    def load(self) -> List[Document]:
164        """
165        Load all pages from the configured Azure DevOps wiki(s).
166
167        Handles everything automatically:
168        - Discovers all wikis if wiki_ids is not specified
169        - Walks the full page tree from root_path
170        - Fetches Markdown content for each page
171        - Returns one Document per page with complete metadata
172
173        Returns:
174            List of Document objects, one per successfully loaded page.
175
176        Raises:
177            ImportError: If the ``requests`` package is not installed.
178            requests.HTTPError: If authentication or a critical API call fails.
179        """
180        try:
181            import requests  # type: ignore
182        except ImportError:
183            raise ImportError(
184                "requests is required for AzureDevOpsWikiConnector. "
185                "Install it with: pip install requests"
186            )
187
188        session = self._build_session(requests)
189
190        # Auto-discover wikis if not specified
191        wiki_list = self._list_wikis(session)
192        if self._wiki_ids:
193            # Match on GUID *or* name — the ADO list-wikis endpoint returns
194            # a GUID as ``id`` but users typically supply the readable wiki
195            # name (e.g. "MyProject.wiki").  Matching on name covers that case.
196            wiki_list = [
197                (wid, name) for wid, name in wiki_list
198                if wid in self._wiki_ids or name in self._wiki_ids
199            ]
200            # If discovery returned nothing (empty project, auth issue, etc.)
201            # fall back to using the caller-supplied identifiers directly.
202            # The ADO pages API requires the wiki *name* as wikiIdentifier
203            # (GUIDs are only accepted by the wiki-level endpoints, not pages).
204            if not wiki_list:
205                self._logger.warning(
206                    "Wiki discovery returned no matches — using provided wiki_ids directly",
207                    wiki_ids=self._wiki_ids,
208                )
209                wiki_list = [(wid, wid) for wid in self._wiki_ids]
210
211        effective_scope = self._page_paths if self._page_paths is not None else [self._root_path]
212        self._logger.info(
213            "Starting Azure DevOps Wiki ingest",
214            organization=self._org,
215            project=self._project,
216            wikis=[wid for wid, _ in wiki_list],
217            scope=effective_scope,
218        )
219
220        documents: List[Document] = []
221        for wiki_id, wiki_name in wiki_list:
222            # The ADO pages API uses the wiki *name* as the wikiIdentifier in the
223            # URL path — GUIDs work only for the wiki-metadata endpoints, not pages.
224            api_identifier = wiki_name
225            self._logger.info("Ingesting wiki", wiki_id=wiki_id, wiki_name=wiki_name)
226
227            # Build (page_path, origin_root) pairs so _fetch_page can store the
228            # correct root_path in metadata without mutating self._root_path.
229            if self._page_paths is not None:
230                if self._page_paths_recursive:
231                    # Treat each entry as a sub-tree root; collect all descendants.
232                    path_root_pairs: List[Tuple[str, str]] = []
233                    seen: set = set()
234                    for root in self._page_paths:
235                        for p in self._collect_page_paths(session, api_identifier, root_path=root):
236                            if p not in seen:
237                                seen.add(p)
238                                path_root_pairs.append((p, root))
239                    self._logger.info(
240                        "Using recursive page list",
241                        wiki_id=wiki_id,
242                        roots=self._page_paths,
243                        count=len(path_root_pairs),
244                    )
245                else:
246                    # Exact pages only — each path is its own root for metadata.
247                    path_root_pairs = [(p, p) for p in self._page_paths]
248                    self._logger.info(
249                        "Using explicit page list",
250                        wiki_id=wiki_id,
251                        count=len(path_root_pairs),
252                    )
253            else:
254                collected = self._collect_page_paths(session, api_identifier)
255                path_root_pairs = [(p, self._root_path) for p in collected]
256                self._logger.info("Pages found", wiki_id=wiki_id, count=len(path_root_pairs))
257
258            for path, origin_root in path_root_pairs:
259                doc = self._fetch_page(session, api_identifier, wiki_name, path, root_path=origin_root)
260                if doc:
261                    documents.append(doc)
262
263        self._logger.info(
264            "Azure DevOps Wiki ingest complete",
265            total_documents=len(documents),
266            scope=effective_scope,
267        )
268        return documents

Load all pages from the configured Azure DevOps wiki(s).

Handles everything automatically:

  • Discovers all wikis if wiki_ids is not specified
  • Walks the full page tree from root_path
  • Fetches Markdown content for each page
  • Returns one Document per page with complete metadata

Returns: List of Document objects, one per successfully loaded page.

Raises: ImportError: If the requests package is not installed. requests.HTTPError: If authentication or a critical API call fails.