gmf_forge_ai_data.connectors
Data connectors — source ingestion for RAG pipelines.
Modules: base_connector: Abstract base class for all connectors. filesystem_connector: Load documents from local directory trees. sharepoint_connector: Load documents from SharePoint via Graph API. blob_storage_connector: Load documents from Azure Blob Storage. soap_connector: Generic SOAP/WSDL base connector for any web service. azure_devops_wiki_connector: Load pages from Azure DevOps Wiki via REST API.
1""" 2Data connectors — source ingestion for RAG pipelines. 3 4Modules: 5 base_connector: Abstract base class for all connectors. 6 filesystem_connector: Load documents from local directory trees. 7 sharepoint_connector: Load documents from SharePoint via Graph API. 8 blob_storage_connector: Load documents from Azure Blob Storage. 9 soap_connector: Generic SOAP/WSDL base connector for any web service. 10 azure_devops_wiki_connector: Load pages from Azure DevOps Wiki via REST API. 11""" 12 13from .base_connector import BaseConnector 14from .filesystem_connector import FilesystemConnector 15from .sharepoint_connector import SharePointConnector 16from .blob_storage_connector import BlobStorageConnector 17from .soap_connector import SoapConnector 18from .azure_devops_wiki_connector import AzureDevOpsWikiConnector 19 20__all__ = [ 21 "BaseConnector", 22 "FilesystemConnector", 23 "SharePointConnector", 24 "BlobStorageConnector", 25 "SoapConnector", 26 "AzureDevOpsWikiConnector", 27]
20class BaseConnector(ABC): 21 """ 22 Abstract base class for all data source connectors. 23 24 Connectors handle one concern only: sourcing raw content and converting 25 it to Documents. They do NOT chunk, embed, or index — those steps follow. 26 27 Contract for all implementations: 28 - Every returned Document must have a non-empty ``id`` and ``content``. 29 - ``embedding`` is always ``None`` — the caller is responsible for embedding. 30 - Source-specific metadata (path, URL, container, etc.) must be stored in 31 ``document.metadata`` under consistent, documented keys. 32 - Files that cannot be read should be skipped with a printed warning rather 33 than crashing the entire load. 34 """ 35 36 @abstractmethod 37 def load(self) -> List[Document]: 38 """ 39 Load documents from the data source. 40 41 Returns: 42 List of Document objects with ``id``, ``content``, ``timestamp``, 43 and ``metadata`` populated. ``embedding`` is always ``None``. 44 """
Abstract base class for all data source connectors.
Connectors handle one concern only: sourcing raw content and converting it to Documents. They do NOT chunk, embed, or index — those steps follow.
Contract for all implementations:
- Every returned Document must have a non-empty
idandcontent. embeddingis alwaysNone— the caller is responsible for embedding.- Source-specific metadata (path, URL, container, etc.) must be stored in
document.metadataunder consistent, documented keys. - Files that cannot be read should be skipped with a printed warning rather than crashing the entire load.
36 @abstractmethod 37 def load(self) -> List[Document]: 38 """ 39 Load documents from the data source. 40 41 Returns: 42 List of Document objects with ``id``, ``content``, ``timestamp``, 43 and ``metadata`` populated. ``embedding`` is always ``None``. 44 """
Load documents from the data source.
Returns:
List of Document objects with id, content, timestamp,
and metadata populated. embedding is always None.
59class FilesystemConnector(BaseConnector): 60 """ 61 Loads documents from a local directory into Document objects. 62 63 Scans a root directory (optionally recursively) and converts each matching 64 file into a Document. The document ``id`` is a stable MD5 hash of the 65 absolute file path, so re-runs produce consistent IDs for upsert workflows. 66 67 Supported formats: 68 - Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .js, .ts, 69 .java, .cpp, .c, .h, .cs, .html, .htm, .xml, .toml, .ini, .cfg, .env 70 - PDF (.pdf) — requires ``pip install pypdf`` 71 - Word Document (.docx) — requires ``pip install python-docx`` 72 73 Metadata keys set on every returned Document: 74 ``source`` Absolute file path as a string 75 ``file_name`` File name including extension 76 ``extension`` Lowercase extension including dot (e.g. ``".md"``) 77 ``size_bytes`` File size in bytes 78 ``modified_at`` Last modification time in ISO 8601 format 79 80 Example: 81 ```python 82 from gmf_forge_ai_data.connectors import FilesystemConnector 83 84 connector = FilesystemConnector( 85 root_path="/data/docs", 86 extensions=[".txt", ".md", ".pdf"], 87 recursive=True, 88 ) 89 docs = connector.load() 90 # docs is List[Document] — pass to a chunker next 91 ``` 92 """ 93 94 def __init__( 95 self, 96 root_path: Union[str, Path], 97 extensions: Optional[List[str]] = None, 98 recursive: bool = True, 99 encoding: str = "utf-8-sig", 100 skip_empty: bool = True, 101 ): 102 """ 103 Args: 104 root_path: Root directory to scan. 105 extensions: Explicit list of extensions to include (e.g. 106 ``[".txt", ".md"]``). Include the leading dot. 107 If ``None``, all supported formats are loaded. 108 recursive: If ``True`` (default), scan subdirectories recursively. 109 encoding: Text encoding for native text files (default ``"utf-8"``). 110 skip_empty: If ``True`` (default), skip files that produce no 111 text content after stripping whitespace. 112 """ 113 self.root_path = Path(root_path).resolve() 114 self.extensions: Optional[Set[str]] = ( 115 {ext.lower() for ext in extensions} 116 if extensions is not None 117 else None # None = all supported formats 118 ) 119 self.recursive = recursive 120 self.encoding = encoding 121 self.skip_empty = skip_empty 122 self._logger = BasicLogger(__name__) 123 124 def load(self) -> List[Document]: 125 """ 126 Scan the root directory and return one Document per matching file. 127 128 Files that raise an error during reading are skipped with a warning 129 printed to stdout so the rest of the load continues uninterrupted. 130 131 Returns: 132 List of Document objects, one per successfully loaded file, 133 sorted by file path for deterministic ordering. 134 135 Raises: 136 FileNotFoundError: If ``root_path`` does not exist. 137 NotADirectoryError: If ``root_path`` is not a directory. 138 """ 139 if not self.root_path.exists(): 140 raise FileNotFoundError( 141 f"Root path does not exist: {self.root_path}" 142 ) 143 if not self.root_path.is_dir(): 144 raise NotADirectoryError( 145 f"Root path is not a directory: {self.root_path}" 146 ) 147 148 pattern = "**/*" if self.recursive else "*" 149 all_files = sorted(p for p in self.root_path.glob(pattern) if p.is_file()) 150 151 documents: List[Document] = [] 152 for file_path in all_files: 153 ext = file_path.suffix.lower() 154 if not self._is_accepted(ext): 155 continue 156 157 try: 158 content = self._read_file(file_path, ext) 159 except Exception as e: 160 self._logger.warning("Skipping file", file=file_path.name, error=str(e)) 161 continue 162 163 if self.skip_empty and not content.strip(): 164 continue 165 166 stat = file_path.stat() 167 doc_id = "fs_" + hashlib.md5(str(file_path).encode()).hexdigest()[:12] 168 documents.append(Document( 169 id=doc_id, 170 content=content.strip(), 171 timestamp=datetime.fromtimestamp(stat.st_mtime), 172 metadata={ 173 "source": str(file_path), 174 "file_name": file_path.name, 175 "extension": ext, 176 "size_bytes": stat.st_size, 177 "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), 178 }, 179 )) 180 181 return documents 182 183 # ── Private helpers ────────────────────────────────────────────────────── 184 185 def _is_accepted(self, ext: str) -> bool: 186 """Return True if this extension should be loaded.""" 187 if self.extensions is not None: 188 return ext in self.extensions 189 # No filter specified: accept all natively supported + optional formats 190 return ext in _NATIVE_TEXT_EXTENSIONS or ext in {".pdf", ".docx"} 191 192 def _read_file(self, path: Path, ext: str) -> str: 193 if ext == ".pdf": 194 return _read_pdf(path) 195 if ext == ".docx": 196 return _read_docx(path) 197 return path.read_text(encoding=self.encoding, errors="replace")
Loads documents from a local directory into Document objects.
Scans a root directory (optionally recursively) and converts each matching
file into a Document. The document id is a stable MD5 hash of the
absolute file path, so re-runs produce consistent IDs for upsert workflows.
Supported formats:
- Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .js, .ts, .java, .cpp, .c, .h, .cs, .html, .htm, .xml, .toml, .ini, .cfg, .env
- PDF (.pdf) — requires
pip install pypdf - Word Document (.docx) — requires
pip install python-docx
Metadata keys set on every returned Document:
source Absolute file path as a string
file_name File name including extension
extension Lowercase extension including dot (e.g. ".md")
size_bytes File size in bytes
modified_at Last modification time in ISO 8601 format
Example:
from gmf_forge_ai_data.connectors import FilesystemConnector
connector = FilesystemConnector(
root_path="/data/docs",
extensions=[".txt", ".md", ".pdf"],
recursive=True,
)
docs = connector.load()
# docs is List[Document] — pass to a chunker next
94 def __init__( 95 self, 96 root_path: Union[str, Path], 97 extensions: Optional[List[str]] = None, 98 recursive: bool = True, 99 encoding: str = "utf-8-sig", 100 skip_empty: bool = True, 101 ): 102 """ 103 Args: 104 root_path: Root directory to scan. 105 extensions: Explicit list of extensions to include (e.g. 106 ``[".txt", ".md"]``). Include the leading dot. 107 If ``None``, all supported formats are loaded. 108 recursive: If ``True`` (default), scan subdirectories recursively. 109 encoding: Text encoding for native text files (default ``"utf-8"``). 110 skip_empty: If ``True`` (default), skip files that produce no 111 text content after stripping whitespace. 112 """ 113 self.root_path = Path(root_path).resolve() 114 self.extensions: Optional[Set[str]] = ( 115 {ext.lower() for ext in extensions} 116 if extensions is not None 117 else None # None = all supported formats 118 ) 119 self.recursive = recursive 120 self.encoding = encoding 121 self.skip_empty = skip_empty 122 self._logger = BasicLogger(__name__)
Args:
root_path: Root directory to scan.
extensions: Explicit list of extensions to include (e.g.
[".txt", ".md"]). Include the leading dot.
If None, all supported formats are loaded.
recursive: If True (default), scan subdirectories recursively.
encoding: Text encoding for native text files (default "utf-8").
skip_empty: If True (default), skip files that produce no
text content after stripping whitespace.
124 def load(self) -> List[Document]: 125 """ 126 Scan the root directory and return one Document per matching file. 127 128 Files that raise an error during reading are skipped with a warning 129 printed to stdout so the rest of the load continues uninterrupted. 130 131 Returns: 132 List of Document objects, one per successfully loaded file, 133 sorted by file path for deterministic ordering. 134 135 Raises: 136 FileNotFoundError: If ``root_path`` does not exist. 137 NotADirectoryError: If ``root_path`` is not a directory. 138 """ 139 if not self.root_path.exists(): 140 raise FileNotFoundError( 141 f"Root path does not exist: {self.root_path}" 142 ) 143 if not self.root_path.is_dir(): 144 raise NotADirectoryError( 145 f"Root path is not a directory: {self.root_path}" 146 ) 147 148 pattern = "**/*" if self.recursive else "*" 149 all_files = sorted(p for p in self.root_path.glob(pattern) if p.is_file()) 150 151 documents: List[Document] = [] 152 for file_path in all_files: 153 ext = file_path.suffix.lower() 154 if not self._is_accepted(ext): 155 continue 156 157 try: 158 content = self._read_file(file_path, ext) 159 except Exception as e: 160 self._logger.warning("Skipping file", file=file_path.name, error=str(e)) 161 continue 162 163 if self.skip_empty and not content.strip(): 164 continue 165 166 stat = file_path.stat() 167 doc_id = "fs_" + hashlib.md5(str(file_path).encode()).hexdigest()[:12] 168 documents.append(Document( 169 id=doc_id, 170 content=content.strip(), 171 timestamp=datetime.fromtimestamp(stat.st_mtime), 172 metadata={ 173 "source": str(file_path), 174 "file_name": file_path.name, 175 "extension": ext, 176 "size_bytes": stat.st_size, 177 "modified_at": datetime.fromtimestamp(stat.st_mtime).isoformat(), 178 }, 179 )) 180 181 return documents
Scan the root directory and return one Document per matching file.
Files that raise an error during reading are skipped with a warning printed to stdout so the rest of the load continues uninterrupted.
Returns: List of Document objects, one per successfully loaded file, sorted by file path for deterministic ordering.
Raises:
FileNotFoundError: If root_path does not exist.
NotADirectoryError: If root_path is not a directory.
35class BlobStorageConnector(BaseConnector): 36 """ 37 Loads blobs from an Azure Blob Storage container into Document objects. 38 39 Lists all blobs in the container (optionally filtered by a path prefix), 40 downloads their content, and extracts text. Blobs whose extension is not 41 in the supported set are skipped automatically. 42 43 Supported file types: 44 - Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .html, .xml 45 - PDF (.pdf) — requires ``pip install pypdf`` 46 - Word Document (.docx) — requires ``pip install python-docx`` 47 48 Metadata keys set on every returned Document: 49 ``source`` Full blob URL 50 ``file_name`` Last path segment of the blob name 51 ``blob_name`` Full blob path inside the container 52 ``extension`` Lowercase extension including dot 53 ``size_bytes`` Blob content length in bytes 54 ``modified_at`` Last modified time in ISO 8601 format 55 ``container`` Container name 56 57 Example:: 58 59 from gmf_forge_ai_data.connectors import BlobStorageConnector 60 61 connector = BlobStorageConnector( 62 account_name="myaccount", 63 access_key="your-storage-account-access-key", 64 container_name="documents", 65 prefix="knowledge-base/", 66 ) 67 docs = connector.load() 68 """ 69 70 def __init__( 71 self, 72 account_name: str, 73 access_key: str, 74 container_name: str, 75 prefix: str = "", 76 ssl_cert_path: Optional[str] = None, 77 ): 78 """ 79 Args: 80 account_name: Storage account name. 81 access_key: Storage account access key. 82 container_name: Name of the blob container to read from. 83 prefix: Optional blob name prefix for filtering, acting 84 like a folder path (e.g. ``"knowledge-base/"``). 85 Pass ``""`` (default) to list the entire container. 86 ssl_cert_path: Optional path to a CA bundle PEM file for 87 environments with corporate SSL inspection. 88 """ 89 self.account_name = account_name 90 self.access_key = access_key 91 self.container_name = container_name 92 self.connection_string = ( 93 f"DefaultEndpointsProtocol=https;" 94 f"AccountName={account_name};" 95 f"AccountKey={access_key};" 96 f"EndpointSuffix=core.windows.net" 97 ) 98 self.account_url = f"https://{account_name}.blob.core.windows.net" 99 self.prefix = prefix 100 self.ssl_cert_path = ssl_cert_path 101 self._logger = BasicLogger(__name__) 102 103 def load(self) -> List[Document]: 104 """ 105 List and download all supported blobs in the container under ``prefix``. 106 107 Returns: 108 List of Document objects, one per successfully loaded blob. 109 110 Raises: 111 ImportError: If the ``azure-storage-blob`` package is not installed. 112 """ 113 try: 114 from azure.storage.blob import BlobServiceClient # type: ignore 115 except ImportError: 116 raise ImportError( 117 "azure-storage-blob is required for BlobStorageConnector. " 118 "Install it with: pip install azure-storage-blob" 119 ) 120 121 client = BlobServiceClient.from_connection_string(self.connection_string) 122 container_client = client.get_container_client(self.container_name) 123 124 documents: List[Document] = [] 125 for blob in container_client.list_blobs(name_starts_with=self.prefix or None): 126 name: str = blob.name 127 ext = ("." + name.rsplit(".", 1)[-1]).lower() if "." in name else "" 128 if ext not in _SUPPORTED_EXTENSIONS: 129 continue 130 131 try: 132 content = self._download_blob(container_client, name, ext) 133 except Exception as e: 134 self._logger.warning("Skipping blob", blob=name, error=str(e)) 135 continue 136 137 if not content.strip(): 138 continue 139 140 modified = blob.last_modified 141 size = blob.size or 0 142 blob_url = f"{self.account_url}/{self.container_name}/{name}" 143 doc_id = "blob_" + hashlib.md5(blob_url.encode()).hexdigest()[:12] 144 file_name = name.split("/")[-1] 145 146 documents.append(Document( 147 id=doc_id, 148 content=content.strip(), 149 timestamp=( 150 modified if isinstance(modified, datetime) else datetime.now() 151 ), 152 metadata={ 153 "source": blob_url, 154 "file_name": file_name, 155 "blob_name": name, 156 "extension": ext, 157 "size_bytes": size, 158 "modified_at": modified.isoformat() if modified else "", 159 "container": self.container_name, 160 }, 161 )) 162 163 return documents 164 165 # ── Private helpers ────────────────────────────────────────────────────── 166 167 def _download_blob(self, container_client, name: str, ext: str) -> str: 168 raw: bytes = container_client.download_blob(name).readall() 169 if ext == ".pdf": 170 return self._extract_pdf(raw) 171 if ext == ".docx": 172 return self._extract_docx(raw) 173 return raw.decode("utf-8-sig", errors="replace") 174 175 @staticmethod 176 def _extract_pdf(raw: bytes) -> str: 177 try: 178 import pypdf # type: ignore 179 except ImportError: 180 raise ImportError( 181 "pypdf required for PDF support: pip install pypdf" 182 ) 183 import logging 184 logging.getLogger("pypdf").setLevel(logging.ERROR) 185 reader = pypdf.PdfReader(io.BytesIO(raw)) 186 return "\n".join(page.extract_text() or "" for page in reader.pages) 187 188 @staticmethod 189 def _extract_docx(raw: bytes) -> str: 190 try: 191 import docx # type: ignore 192 except ImportError: 193 raise ImportError( 194 "python-docx required for DOCX support: pip install python-docx" 195 ) 196 doc = docx.Document(io.BytesIO(raw)) 197 return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
Loads blobs from an Azure Blob Storage container into Document objects.
Lists all blobs in the container (optionally filtered by a path prefix), downloads their content, and extracts text. Blobs whose extension is not in the supported set are skipped automatically.
Supported file types:
- Native text: .txt, .md, .rst, .csv, .json, .yaml, .yml, .py, .html, .xml
- PDF (.pdf) — requires
pip install pypdf - Word Document (.docx) — requires
pip install python-docx
Metadata keys set on every returned Document:
source Full blob URL
file_name Last path segment of the blob name
blob_name Full blob path inside the container
extension Lowercase extension including dot
size_bytes Blob content length in bytes
modified_at Last modified time in ISO 8601 format
container Container name
Example::
from gmf_forge_ai_data.connectors import BlobStorageConnector
connector = BlobStorageConnector(
account_name="myaccount",
access_key="your-storage-account-access-key",
container_name="documents",
prefix="knowledge-base/",
)
docs = connector.load()
70 def __init__( 71 self, 72 account_name: str, 73 access_key: str, 74 container_name: str, 75 prefix: str = "", 76 ssl_cert_path: Optional[str] = None, 77 ): 78 """ 79 Args: 80 account_name: Storage account name. 81 access_key: Storage account access key. 82 container_name: Name of the blob container to read from. 83 prefix: Optional blob name prefix for filtering, acting 84 like a folder path (e.g. ``"knowledge-base/"``). 85 Pass ``""`` (default) to list the entire container. 86 ssl_cert_path: Optional path to a CA bundle PEM file for 87 environments with corporate SSL inspection. 88 """ 89 self.account_name = account_name 90 self.access_key = access_key 91 self.container_name = container_name 92 self.connection_string = ( 93 f"DefaultEndpointsProtocol=https;" 94 f"AccountName={account_name};" 95 f"AccountKey={access_key};" 96 f"EndpointSuffix=core.windows.net" 97 ) 98 self.account_url = f"https://{account_name}.blob.core.windows.net" 99 self.prefix = prefix 100 self.ssl_cert_path = ssl_cert_path 101 self._logger = BasicLogger(__name__)
Args:
account_name: Storage account name.
access_key: Storage account access key.
container_name: Name of the blob container to read from.
prefix: Optional blob name prefix for filtering, acting
like a folder path (e.g. "knowledge-base/").
Pass "" (default) to list the entire container.
ssl_cert_path: Optional path to a CA bundle PEM file for
environments with corporate SSL inspection.
103 def load(self) -> List[Document]: 104 """ 105 List and download all supported blobs in the container under ``prefix``. 106 107 Returns: 108 List of Document objects, one per successfully loaded blob. 109 110 Raises: 111 ImportError: If the ``azure-storage-blob`` package is not installed. 112 """ 113 try: 114 from azure.storage.blob import BlobServiceClient # type: ignore 115 except ImportError: 116 raise ImportError( 117 "azure-storage-blob is required for BlobStorageConnector. " 118 "Install it with: pip install azure-storage-blob" 119 ) 120 121 client = BlobServiceClient.from_connection_string(self.connection_string) 122 container_client = client.get_container_client(self.container_name) 123 124 documents: List[Document] = [] 125 for blob in container_client.list_blobs(name_starts_with=self.prefix or None): 126 name: str = blob.name 127 ext = ("." + name.rsplit(".", 1)[-1]).lower() if "." in name else "" 128 if ext not in _SUPPORTED_EXTENSIONS: 129 continue 130 131 try: 132 content = self._download_blob(container_client, name, ext) 133 except Exception as e: 134 self._logger.warning("Skipping blob", blob=name, error=str(e)) 135 continue 136 137 if not content.strip(): 138 continue 139 140 modified = blob.last_modified 141 size = blob.size or 0 142 blob_url = f"{self.account_url}/{self.container_name}/{name}" 143 doc_id = "blob_" + hashlib.md5(blob_url.encode()).hexdigest()[:12] 144 file_name = name.split("/")[-1] 145 146 documents.append(Document( 147 id=doc_id, 148 content=content.strip(), 149 timestamp=( 150 modified if isinstance(modified, datetime) else datetime.now() 151 ), 152 metadata={ 153 "source": blob_url, 154 "file_name": file_name, 155 "blob_name": name, 156 "extension": ext, 157 "size_bytes": size, 158 "modified_at": modified.isoformat() if modified else "", 159 "container": self.container_name, 160 }, 161 )) 162 163 return documents
List and download all supported blobs in the container under prefix.
Returns: List of Document objects, one per successfully loaded blob.
Raises:
ImportError: If the azure-storage-blob package is not installed.
89class SoapConnector(BaseConnector): 90 """ 91 Generic SOAP / WSDL connector base class. 92 93 Manages the complete connection lifecycle for a single WSDL endpoint: 94 authentication via HTTP Basic Auth, optional SSL certificate configuration, 95 lazy connection initialisation, method invocation, and automatic response 96 serialisation from zeep objects to plain Python dicts. 97 98 Subclass this to add service-specific method wrappers and implement 99 ``load()`` with your document-extraction logic. 100 101 Parameters 102 ---------- 103 wsdl_url: 104 Full URL to the WSDL endpoint, e.g. 105 ``"https://example.com/Api/AdminApi.svc?wsdl"``. 106 username: 107 HTTP Basic Auth username for the service. 108 password: 109 HTTP Basic Auth password for the service. 110 credentials_type_qname: 111 Fully qualified name of the SOAP ``Credentials`` complex type as 112 defined in the WSDL, e.g. 113 ``"{http://schemas.datacontract.org/2004/07/My.Api}Credentials"``. 114 When supplied, a typed credentials object is created and injected 115 automatically into every ``call()`` invocation as the 116 ``credentials`` keyword argument. 117 When ``None`` (default), no credentials object is injected — use 118 this when the service authenticates via HTTP headers only. 119 credentials_username_field: 120 Field name for the username inside the Credentials complex type. 121 Default: ``"Username"``. 122 credentials_password_field: 123 Field name for the password inside the Credentials complex type. 124 Default: ``"Password"``. 125 ssl_verify: 126 Whether to verify SSL certificates. Set to ``False`` for services 127 using self-signed certificates (issues a warning). 128 Default: ``True``. 129 ssl_cert_path: 130 Path to a CA certificate bundle PEM file for corporate SSL 131 inspection proxies. Sets ``REQUESTS_CA_BUNDLE`` and 132 ``SSL_CERT_FILE`` environment variables automatically. 133 connect_on_init: 134 If ``True``, call ``connect()`` immediately in ``__init__``. 135 If ``False`` (default), connection is established lazily on the 136 first ``call()`` invocation. 137 138 Raises 139 ------ 140 ImportError 141 If the ``zeep`` package is not installed. 142 """ 143 144 def __init__( 145 self, 146 wsdl_url: str, 147 username: str, 148 password: str, 149 credentials_type_qname: Optional[str] = None, 150 credentials_username_field: str = "Username", 151 credentials_password_field: str = "Password", 152 ssl_verify: Union[bool, str] = True, 153 ssl_cert_path: Optional[str] = None, 154 connect_on_init: bool = False, 155 ) -> None: 156 try: 157 import zeep # noqa: F401 158 except ImportError as exc: 159 raise ImportError( 160 "zeep is required for SoapConnector. " 161 "Install it with: pip install zeep" 162 ) from exc 163 164 self.wsdl_url = wsdl_url 165 self.username = username 166 self.password = password 167 self._credentials_type_qname = credentials_type_qname 168 self._credentials_username_field = credentials_username_field 169 self._credentials_password_field = credentials_password_field 170 self.ssl_verify: Union[bool, str] = ssl_verify 171 self.ssl_cert_path = ssl_cert_path 172 173 self._client: Optional[Any] = None # zeep.Client 174 self._credentials: Optional[Any] = None # typed Credentials object 175 self._logger = BasicLogger(__name__) 176 177 if ssl_cert_path: 178 import os as _os 179 _os.environ.setdefault("REQUESTS_CA_BUNDLE", ssl_cert_path) 180 _os.environ.setdefault("SSL_CERT_FILE", ssl_cert_path) 181 self.ssl_verify = ssl_cert_path 182 183 if not ssl_verify: 184 import urllib3 185 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) 186 187 if connect_on_init: 188 self.connect() 189 190 # ------------------------------------------------------------------ # 191 # Connection management # 192 # ------------------------------------------------------------------ # 193 194 def connect(self) -> None: 195 """ 196 Establish the SOAP connection. 197 198 Creates a ``zeep.Client`` configured with HTTP Basic Auth and the 199 SSL settings supplied at construction. If ``credentials_type_qname`` 200 was provided, also builds the typed Credentials object that will be 201 injected into every subsequent ``call()``. 202 203 This method is idempotent — calling it when already connected is a 204 no-op. It is called automatically by ``call()`` on first use, so 205 explicit calls are optional unless you want to fail fast at startup. 206 207 Raises 208 ------ 209 Exception 210 Any exception raised by the zeep ``Client`` constructor (e.g. 211 network unreachable, invalid WSDL URL) is logged and re-raised. 212 """ 213 if self._client is not None: 214 return 215 216 from zeep import Client 217 from zeep.transports import Transport 218 from requests import Session 219 from requests.auth import HTTPBasicAuth 220 221 try: 222 session = Session() 223 session.auth = HTTPBasicAuth(self.username, self.password) 224 session.verify = self.ssl_verify 225 transport = Transport(session=session) 226 self._client = Client(wsdl=self.wsdl_url, transport=transport) 227 self._logger.info("SOAP client connected", wsdl=self.wsdl_url) 228 except Exception as exc: 229 self._logger.error( 230 "Failed to connect SOAP client", 231 wsdl=self.wsdl_url, 232 error=str(exc), 233 ) 234 raise 235 236 # Build the typed Credentials object if the WSDL uses one 237 if self._credentials_type_qname: 238 try: 239 cred_type = self._client.get_type(self._credentials_type_qname) 240 self._credentials = cred_type(**{ 241 self._credentials_username_field: self.username, 242 self._credentials_password_field: self.password, 243 }) 244 self._logger.info( 245 "SOAP credentials object created", 246 type_qname=self._credentials_type_qname, 247 ) 248 except Exception as exc: 249 self._logger.warning( 250 "Could not create typed credentials object — " 251 "credentials will NOT be injected automatically", 252 type_qname=self._credentials_type_qname, 253 error=str(exc), 254 ) 255 256 def disconnect(self) -> None: 257 """Close the underlying requests session and reset the client.""" 258 if self._client is not None: 259 try: 260 self._client.transport.session.close() 261 except Exception: 262 pass 263 self._client = None 264 self._credentials = None 265 self._logger.info("SOAP client disconnected", wsdl=self.wsdl_url) 266 267 # ------------------------------------------------------------------ # 268 # Method invocation # 269 # ------------------------------------------------------------------ # 270 271 def call(self, method: str, **kwargs: Any) -> Any: 272 """ 273 Invoke a SOAP service method and return a serialised plain dict. 274 275 Connects automatically on first use. If a typed Credentials object 276 was built during ``connect()``, it is injected as the ``credentials`` 277 keyword argument before the call — no need to pass it manually. 278 279 Parameters 280 ---------- 281 method: 282 Name of the SOAP operation as declared in the WSDL, e.g. 283 ``"GetDocumentsByFolder"``. 284 **kwargs: 285 Additional keyword arguments forwarded to the operation, e.g. 286 ``folderId="abc-123"``. 287 288 Returns 289 ------- 290 Any 291 Python-native value (dict, list, str, int, etc.) produced by 292 ``zeep.helpers.serialize_object()``. All zeep-specific types 293 are stripped so callers work with plain Python objects. 294 295 Raises 296 ------ 297 AttributeError 298 If ``method`` is not found in the WSDL service definition. 299 Exception 300 Any SOAP fault or transport-level error raised by zeep. 301 """ 302 self.connect() # no-op if already connected 303 304 if self._credentials is not None and "credentials" not in kwargs: 305 kwargs["credentials"] = self._credentials 306 307 operation = getattr(self._client.service, method) 308 309 self._logger.debug("Calling SOAP method", method=method) 310 try: 311 raw = operation(**kwargs) 312 except Exception as exc: 313 err = str(exc) 314 # SOAP faults that are navigational noise (folder not found, 315 # invalid id) stay at debug. All other faults log at warning — 316 # the caller decides whether to escalate further. 317 _navigational = ("not be found", "invalid", "could not be found") 318 if any(e in err.lower() for e in _navigational): 319 self._logger.debug("SOAP method call failed", method=method, error=err) 320 else: 321 self._logger.warning("SOAP method call failed", method=method, error=err) 322 raise 323 324 from zeep import helpers 325 result = helpers.serialize_object(raw, target_cls=dict) 326 self._logger.debug("SOAP method returned", method=method) 327 return result 328 329 # ------------------------------------------------------------------ # 330 # Text extraction utility # 331 # ------------------------------------------------------------------ # 332 333 def extract_text( 334 self, 335 data: Union[bytes, str], 336 filename: str, 337 ) -> str: 338 """ 339 Extract plain text from raw file bytes or base64-encoded data. 340 341 Dispatches by file extension: 342 - ``.pdf`` — extracts text via ``pypdf`` (``pip install pypdf``) 343 - ``.docx`` — extracts text via ``python-docx`` (``pip install python-docx``) 344 - anything else — decodes as UTF-8 345 346 Call this inside your ``load()`` implementation after retrieving the 347 raw file content from the service:: 348 349 data = self.call("GetDocumentData", documentId=doc_id) 350 content = self.extract_text(data["Data"], "policy.pdf") 351 352 Parameters 353 ---------- 354 data: 355 Raw bytes **or** a base64-encoded string as typically returned by 356 SOAP document download operations. 357 filename: 358 File name (or any string ending with the correct extension) used 359 to determine the extraction strategy, e.g. ``"policy.pdf"``. 360 361 Returns 362 ------- 363 str 364 Extracted plain text. Returns an empty string when ``data`` is 365 empty or ``None``. 366 """ 367 if not data: 368 return "" 369 370 # Normalise to bytes 371 if isinstance(data, str): 372 try: 373 file_bytes = base64.b64decode(data) 374 except Exception: 375 # Not base64 — treat as raw text 376 return data 377 else: 378 file_bytes = data 379 380 ext = ("." + filename.rsplit(".", 1)[-1]).lower() if "." in filename else "" 381 382 if ext == ".pdf": 383 return self._extract_pdf(file_bytes, filename) 384 if ext == ".docx": 385 return self._extract_docx(file_bytes, filename) 386 387 # Default: decode as UTF-8 388 try: 389 return file_bytes.decode("utf-8", errors="replace") 390 except Exception as exc: 391 self._logger.warning( 392 "Text decode failed, returning empty string", 393 file=filename, 394 error=str(exc), 395 ) 396 return "" 397 398 # ------------------------------------------------------------------ # 399 # Abstract interface (developer must implement) # 400 # ------------------------------------------------------------------ # 401 402 @abstractmethod 403 def load(self) -> List[Document]: 404 """ 405 Load documents from the SOAP service. 406 407 This method is entirely the developer's responsibility. Use 408 ``self.call()`` to invoke service methods and ``self.extract_text()`` 409 to convert file bytes to plain text. Assemble and return 410 ``Document`` objects with the ``id``, ``content``, and ``metadata`` 411 fields that matter for your use case. 412 413 Returns 414 ------- 415 List[Document] 416 One Document per logical document retrieved from the service. 417 ``embedding`` should always be ``None`` — the caller embeds. 418 """ 419 420 # ------------------------------------------------------------------ # 421 # Private helpers # 422 # ------------------------------------------------------------------ # 423 424 def _extract_pdf(self, file_bytes: bytes, filename: str) -> str: 425 try: 426 import pypdf # type: ignore 427 except ImportError: 428 self._logger.warning( 429 "pypdf not installed — skipping PDF extraction", 430 file=filename, 431 ) 432 return f"[PDF extraction skipped — install pypdf: {filename}]" 433 try: 434 reader = pypdf.PdfReader(io.BytesIO(file_bytes)) 435 return "\n\n".join( 436 page.extract_text() or "" for page in reader.pages 437 ).strip() 438 except Exception as exc: 439 self._logger.warning( 440 "PDF extraction failed", file=filename, error=str(exc) 441 ) 442 return f"[PDF extraction error: {filename}]" 443 444 def _extract_docx(self, file_bytes: bytes, filename: str) -> str: 445 try: 446 import docx # type: ignore 447 except ImportError: 448 self._logger.warning( 449 "python-docx not installed — skipping DOCX extraction", 450 file=filename, 451 ) 452 return f"[DOCX extraction skipped — install python-docx: {filename}]" 453 try: 454 doc = docx.Document(io.BytesIO(file_bytes)) 455 return "\n\n".join( 456 p.text for p in doc.paragraphs if p.text.strip() 457 ).strip() 458 except Exception as exc: 459 self._logger.warning( 460 "DOCX extraction failed", file=filename, error=str(exc) 461 ) 462 return f"[DOCX extraction error: {filename}]" 463 464 # ------------------------------------------------------------------ # 465 # Convenience # 466 # ------------------------------------------------------------------ # 467 468 def _make_doc_id(self, *parts: str) -> str: 469 """ 470 Generate a stable document ID from one or more string parts. 471 472 Convenience helper for ``load()`` implementations to produce 473 consistent, collision-resistant IDs:: 474 475 doc_id = self._make_doc_id(str(doc["DocumentId"])) 476 """ 477 combined = "_".join(str(p) for p in parts) 478 return "soap_" + hashlib.md5(combined.encode()).hexdigest()[:12] 479 480 def __enter__(self) -> "SoapConnector": 481 self.connect() 482 return self 483 484 def __exit__(self, *_: Any) -> None: 485 self.disconnect()
Generic SOAP / WSDL connector base class.
Manages the complete connection lifecycle for a single WSDL endpoint: authentication via HTTP Basic Auth, optional SSL certificate configuration, lazy connection initialisation, method invocation, and automatic response serialisation from zeep objects to plain Python dicts.
Subclass this to add service-specific method wrappers and implement
load() with your document-extraction logic.
Parameters
wsdl_url:
Full URL to the WSDL endpoint, e.g.
"https://example.com/Api/AdminApi.svc?wsdl".
username:
HTTP Basic Auth username for the service.
password:
HTTP Basic Auth password for the service.
credentials_type_qname:
Fully qualified name of the SOAP Credentials complex type as
defined in the WSDL, e.g.
"{http://schemas.datacontract.org/2004/07/My.Api}Credentials".
When supplied, a typed credentials object is created and injected
automatically into every call() invocation as the
credentials keyword argument.
When None (default), no credentials object is injected — use
this when the service authenticates via HTTP headers only.
credentials_username_field:
Field name for the username inside the Credentials complex type.
Default: "Username".
credentials_password_field:
Field name for the password inside the Credentials complex type.
Default: "Password".
ssl_verify:
Whether to verify SSL certificates. Set to False for services
using self-signed certificates (issues a warning).
Default: True.
ssl_cert_path:
Path to a CA certificate bundle PEM file for corporate SSL
inspection proxies. Sets REQUESTS_CA_BUNDLE and
SSL_CERT_FILE environment variables automatically.
connect_on_init:
If True, call connect() immediately in __init__.
If False (default), connection is established lazily on the
first call() invocation.
Raises
ImportError
If the zeep package is not installed.
194 def connect(self) -> None: 195 """ 196 Establish the SOAP connection. 197 198 Creates a ``zeep.Client`` configured with HTTP Basic Auth and the 199 SSL settings supplied at construction. If ``credentials_type_qname`` 200 was provided, also builds the typed Credentials object that will be 201 injected into every subsequent ``call()``. 202 203 This method is idempotent — calling it when already connected is a 204 no-op. It is called automatically by ``call()`` on first use, so 205 explicit calls are optional unless you want to fail fast at startup. 206 207 Raises 208 ------ 209 Exception 210 Any exception raised by the zeep ``Client`` constructor (e.g. 211 network unreachable, invalid WSDL URL) is logged and re-raised. 212 """ 213 if self._client is not None: 214 return 215 216 from zeep import Client 217 from zeep.transports import Transport 218 from requests import Session 219 from requests.auth import HTTPBasicAuth 220 221 try: 222 session = Session() 223 session.auth = HTTPBasicAuth(self.username, self.password) 224 session.verify = self.ssl_verify 225 transport = Transport(session=session) 226 self._client = Client(wsdl=self.wsdl_url, transport=transport) 227 self._logger.info("SOAP client connected", wsdl=self.wsdl_url) 228 except Exception as exc: 229 self._logger.error( 230 "Failed to connect SOAP client", 231 wsdl=self.wsdl_url, 232 error=str(exc), 233 ) 234 raise 235 236 # Build the typed Credentials object if the WSDL uses one 237 if self._credentials_type_qname: 238 try: 239 cred_type = self._client.get_type(self._credentials_type_qname) 240 self._credentials = cred_type(**{ 241 self._credentials_username_field: self.username, 242 self._credentials_password_field: self.password, 243 }) 244 self._logger.info( 245 "SOAP credentials object created", 246 type_qname=self._credentials_type_qname, 247 ) 248 except Exception as exc: 249 self._logger.warning( 250 "Could not create typed credentials object — " 251 "credentials will NOT be injected automatically", 252 type_qname=self._credentials_type_qname, 253 error=str(exc), 254 )
Establish the SOAP connection.
Creates a zeep.Client configured with HTTP Basic Auth and the
SSL settings supplied at construction. If credentials_type_qname
was provided, also builds the typed Credentials object that will be
injected into every subsequent call().
This method is idempotent — calling it when already connected is a
no-op. It is called automatically by call() on first use, so
explicit calls are optional unless you want to fail fast at startup.
Raises
Exception
Any exception raised by the zeep Client constructor (e.g.
network unreachable, invalid WSDL URL) is logged and re-raised.
256 def disconnect(self) -> None: 257 """Close the underlying requests session and reset the client.""" 258 if self._client is not None: 259 try: 260 self._client.transport.session.close() 261 except Exception: 262 pass 263 self._client = None 264 self._credentials = None 265 self._logger.info("SOAP client disconnected", wsdl=self.wsdl_url)
Close the underlying requests session and reset the client.
271 def call(self, method: str, **kwargs: Any) -> Any: 272 """ 273 Invoke a SOAP service method and return a serialised plain dict. 274 275 Connects automatically on first use. If a typed Credentials object 276 was built during ``connect()``, it is injected as the ``credentials`` 277 keyword argument before the call — no need to pass it manually. 278 279 Parameters 280 ---------- 281 method: 282 Name of the SOAP operation as declared in the WSDL, e.g. 283 ``"GetDocumentsByFolder"``. 284 **kwargs: 285 Additional keyword arguments forwarded to the operation, e.g. 286 ``folderId="abc-123"``. 287 288 Returns 289 ------- 290 Any 291 Python-native value (dict, list, str, int, etc.) produced by 292 ``zeep.helpers.serialize_object()``. All zeep-specific types 293 are stripped so callers work with plain Python objects. 294 295 Raises 296 ------ 297 AttributeError 298 If ``method`` is not found in the WSDL service definition. 299 Exception 300 Any SOAP fault or transport-level error raised by zeep. 301 """ 302 self.connect() # no-op if already connected 303 304 if self._credentials is not None and "credentials" not in kwargs: 305 kwargs["credentials"] = self._credentials 306 307 operation = getattr(self._client.service, method) 308 309 self._logger.debug("Calling SOAP method", method=method) 310 try: 311 raw = operation(**kwargs) 312 except Exception as exc: 313 err = str(exc) 314 # SOAP faults that are navigational noise (folder not found, 315 # invalid id) stay at debug. All other faults log at warning — 316 # the caller decides whether to escalate further. 317 _navigational = ("not be found", "invalid", "could not be found") 318 if any(e in err.lower() for e in _navigational): 319 self._logger.debug("SOAP method call failed", method=method, error=err) 320 else: 321 self._logger.warning("SOAP method call failed", method=method, error=err) 322 raise 323 324 from zeep import helpers 325 result = helpers.serialize_object(raw, target_cls=dict) 326 self._logger.debug("SOAP method returned", method=method) 327 return result
Invoke a SOAP service method and return a serialised plain dict.
Connects automatically on first use. If a typed Credentials object
was built during connect(), it is injected as the credentials
keyword argument before the call — no need to pass it manually.
Parameters
method:
Name of the SOAP operation as declared in the WSDL, e.g.
"GetDocumentsByFolder".
**kwargs:
Additional keyword arguments forwarded to the operation, e.g.
folderId="abc-123".
Returns
Any
Python-native value (dict, list, str, int, etc.) produced by
zeep.helpers.serialize_object(). All zeep-specific types
are stripped so callers work with plain Python objects.
Raises
AttributeError
If method is not found in the WSDL service definition.
Exception
Any SOAP fault or transport-level error raised by zeep.
333 def extract_text( 334 self, 335 data: Union[bytes, str], 336 filename: str, 337 ) -> str: 338 """ 339 Extract plain text from raw file bytes or base64-encoded data. 340 341 Dispatches by file extension: 342 - ``.pdf`` — extracts text via ``pypdf`` (``pip install pypdf``) 343 - ``.docx`` — extracts text via ``python-docx`` (``pip install python-docx``) 344 - anything else — decodes as UTF-8 345 346 Call this inside your ``load()`` implementation after retrieving the 347 raw file content from the service:: 348 349 data = self.call("GetDocumentData", documentId=doc_id) 350 content = self.extract_text(data["Data"], "policy.pdf") 351 352 Parameters 353 ---------- 354 data: 355 Raw bytes **or** a base64-encoded string as typically returned by 356 SOAP document download operations. 357 filename: 358 File name (or any string ending with the correct extension) used 359 to determine the extraction strategy, e.g. ``"policy.pdf"``. 360 361 Returns 362 ------- 363 str 364 Extracted plain text. Returns an empty string when ``data`` is 365 empty or ``None``. 366 """ 367 if not data: 368 return "" 369 370 # Normalise to bytes 371 if isinstance(data, str): 372 try: 373 file_bytes = base64.b64decode(data) 374 except Exception: 375 # Not base64 — treat as raw text 376 return data 377 else: 378 file_bytes = data 379 380 ext = ("." + filename.rsplit(".", 1)[-1]).lower() if "." in filename else "" 381 382 if ext == ".pdf": 383 return self._extract_pdf(file_bytes, filename) 384 if ext == ".docx": 385 return self._extract_docx(file_bytes, filename) 386 387 # Default: decode as UTF-8 388 try: 389 return file_bytes.decode("utf-8", errors="replace") 390 except Exception as exc: 391 self._logger.warning( 392 "Text decode failed, returning empty string", 393 file=filename, 394 error=str(exc), 395 ) 396 return ""
Extract plain text from raw file bytes or base64-encoded data.
Dispatches by file extension:
.pdf— extracts text viapypdf(pip install pypdf).docx— extracts text viapython-docx(pip install python-docx)- anything else — decodes as UTF-8
Call this inside your load() implementation after retrieving the
raw file content from the service::
data = self.call("GetDocumentData", documentId=doc_id)
content = self.extract_text(data["Data"], "policy.pdf")
Parameters
data:
Raw bytes or a base64-encoded string as typically returned by
SOAP document download operations.
filename:
File name (or any string ending with the correct extension) used
to determine the extraction strategy, e.g. "policy.pdf".
Returns
str
Extracted plain text. Returns an empty string when data is
empty or None.
402 @abstractmethod 403 def load(self) -> List[Document]: 404 """ 405 Load documents from the SOAP service. 406 407 This method is entirely the developer's responsibility. Use 408 ``self.call()`` to invoke service methods and ``self.extract_text()`` 409 to convert file bytes to plain text. Assemble and return 410 ``Document`` objects with the ``id``, ``content``, and ``metadata`` 411 fields that matter for your use case. 412 413 Returns 414 ------- 415 List[Document] 416 One Document per logical document retrieved from the service. 417 ``embedding`` should always be ``None`` — the caller embeds. 418 """
Load documents from the SOAP service.
This method is entirely the developer's responsibility. Use
self.call() to invoke service methods and self.extract_text()
to convert file bytes to plain text. Assemble and return
Document objects with the id, content, and metadata
fields that matter for your use case.
Returns
List[Document]
One Document per logical document retrieved from the service.
embedding should always be None — the caller embeds.
35class AzureDevOpsWikiConnector(BaseConnector): 36 """ 37 Loads pages from one or more Azure DevOps wikis via the REST API. 38 39 The developer provides an organization, project, and PAT. The connector 40 handles everything else: auth, wiki discovery, recursive page-tree 41 traversal, URL encoding, content fetching, and Document creation. 42 43 Content is served as Markdown -- pair with WikiPageChunker.chunk_markdown() 44 or MarkdownChunker for best results. 45 46 There are three loading modes depending on how much of the wiki you need. 47 See the examples below for guidance on which to use. 48 49 Metadata keys set on every returned Document: 50 ``source`` Browser URL of the page 51 ``page_path`` Full path within the wiki (e.g. ``/Team Wikis/Overview``) 52 ``root_path`` Sub-tree root this page was loaded under 53 ``wiki_id`` Wiki name used as the API identifier 54 ``wiki_name`` Human-readable wiki name 55 ``organization`` Azure DevOps organization name 56 ``project`` Azure DevOps project name 57 58 NOTE: ADO REST API paths use the actual page title with spaces 59 (e.g. ``/Team Wikis``), NOT the hyphenated browser URL slug 60 (e.g. ``/Team-Wikis``). 61 62 Example -- Mode 1: root_path (entire sub-tree from one starting point) 63 Use when you want everything under a single section. One API call 64 fetches the full tree; all descendant pages are loaded. 65 66 ```python 67 connector = AzureDevOpsWikiConnector( 68 organization="MyOrg", 69 project="MyProject", 70 pat="your-personal-access-token", 71 root_path="/Team Wikis", # loads this page + all its children 72 ) 73 docs = connector.load() 74 ``` 75 76 Example -- Mode 2: page_paths exact (specific pages, no children) 77 Use when you know the exact pages you need. Tree discovery is skipped; 78 only the listed paths are fetched (page_paths_recursive=False, the default). 79 80 ```python 81 connector = AzureDevOpsWikiConnector( 82 organization="MyOrg", 83 project="MyProject", 84 pat="your-personal-access-token", 85 page_paths=["/Team Wikis/Innovation and GenAI Lab"], 86 ) 87 docs = connector.load() 88 ``` 89 90 Example -- Mode 3: page_paths recursive (multiple targeted sub-trees) 91 Use when you want full sub-trees for several sections without loading 92 the entire wiki. Each entry is treated as its own root: the page itself 93 and all its descendants are loaded. page_paths takes precedence over 94 root_path when both are provided. 95 96 ```python 97 connector = AzureDevOpsWikiConnector( 98 organization="MyOrg", 99 project="MyProject", 100 pat="your-personal-access-token", 101 page_paths=[ 102 "/Team Wikis/Innovation and GenAI Lab", 103 "/Team Wikis/GMF Bank", 104 ], 105 page_paths_recursive=True, 106 ) 107 docs = connector.load() 108 ``` 109 """ 110 111 _API_BASE = "https://dev.azure.com" 112 _API_VERSION = "7.1" 113 114 def __init__( 115 self, 116 organization: str, 117 project: str, 118 pat: str, 119 wiki_ids: Optional[List[str]] = None, 120 root_path: str = "/", 121 page_paths: Optional[List[str]] = None, 122 page_paths_recursive: bool = False, 123 ssl_cert_path: Optional[str] = None, 124 ): 125 """ 126 Args: 127 organization: Azure DevOps organization name (e.g. ``"MyOrg"``). 128 project: Azure DevOps project name (e.g. ``"MyProject"``). 129 pat: Personal Access Token with Wiki (Read) scope. 130 wiki_ids: List of wiki identifiers (names or GUIDs) to ingest. 131 Pass ``None`` (default) to auto-discover and ingest 132 all wikis in the project. 133 root_path: Starting page path for the tree walk. 134 ``"/"`` (default) ingests the entire wiki. 135 Any sub-path (e.g. ``"/Team Wikis"``) scopes 136 ingestion to that page and all its descendants. 137 Ignored when ``page_paths`` is provided. 138 page_paths: Explicit list of page paths to fetch (e.g. 139 ``["/Team Wikis/Innovation and GenAI Lab"]``). 140 When set, tree discovery via ``root_path`` is 141 skipped. Each entry is fetched exactly as-is 142 unless ``page_paths_recursive=True``. 143 page_paths_recursive: When ``True`` and ``page_paths`` is set, each 144 path is treated as a sub-tree root: the page 145 itself **and all its descendants** are loaded. 146 When ``False`` (default), only the exact pages 147 listed are fetched. 148 ssl_cert_path: Path to a CA bundle PEM file for environments 149 with corporate SSL inspection (optional). 150 """ 151 self._org = organization 152 self._project = project 153 self._pat = pat 154 self._wiki_ids = wiki_ids 155 self._root_path = root_path.rstrip("/") or "/" 156 self._page_paths = page_paths 157 self._page_paths_recursive = page_paths_recursive 158 self._verify: str | bool = ssl_cert_path or True 159 self._logger = BasicLogger(__name__) 160 161 # ── Public interface ───────────────────────────────────────────────────── 162 163 def load(self) -> List[Document]: 164 """ 165 Load all pages from the configured Azure DevOps wiki(s). 166 167 Handles everything automatically: 168 - Discovers all wikis if wiki_ids is not specified 169 - Walks the full page tree from root_path 170 - Fetches Markdown content for each page 171 - Returns one Document per page with complete metadata 172 173 Returns: 174 List of Document objects, one per successfully loaded page. 175 176 Raises: 177 ImportError: If the ``requests`` package is not installed. 178 requests.HTTPError: If authentication or a critical API call fails. 179 """ 180 try: 181 import requests # type: ignore 182 except ImportError: 183 raise ImportError( 184 "requests is required for AzureDevOpsWikiConnector. " 185 "Install it with: pip install requests" 186 ) 187 188 session = self._build_session(requests) 189 190 # Auto-discover wikis if not specified 191 wiki_list = self._list_wikis(session) 192 if self._wiki_ids: 193 # Match on GUID *or* name — the ADO list-wikis endpoint returns 194 # a GUID as ``id`` but users typically supply the readable wiki 195 # name (e.g. "MyProject.wiki"). Matching on name covers that case. 196 wiki_list = [ 197 (wid, name) for wid, name in wiki_list 198 if wid in self._wiki_ids or name in self._wiki_ids 199 ] 200 # If discovery returned nothing (empty project, auth issue, etc.) 201 # fall back to using the caller-supplied identifiers directly. 202 # The ADO pages API requires the wiki *name* as wikiIdentifier 203 # (GUIDs are only accepted by the wiki-level endpoints, not pages). 204 if not wiki_list: 205 self._logger.warning( 206 "Wiki discovery returned no matches — using provided wiki_ids directly", 207 wiki_ids=self._wiki_ids, 208 ) 209 wiki_list = [(wid, wid) for wid in self._wiki_ids] 210 211 effective_scope = self._page_paths if self._page_paths is not None else [self._root_path] 212 self._logger.info( 213 "Starting Azure DevOps Wiki ingest", 214 organization=self._org, 215 project=self._project, 216 wikis=[wid for wid, _ in wiki_list], 217 scope=effective_scope, 218 ) 219 220 documents: List[Document] = [] 221 for wiki_id, wiki_name in wiki_list: 222 # The ADO pages API uses the wiki *name* as the wikiIdentifier in the 223 # URL path — GUIDs work only for the wiki-metadata endpoints, not pages. 224 api_identifier = wiki_name 225 self._logger.info("Ingesting wiki", wiki_id=wiki_id, wiki_name=wiki_name) 226 227 # Build (page_path, origin_root) pairs so _fetch_page can store the 228 # correct root_path in metadata without mutating self._root_path. 229 if self._page_paths is not None: 230 if self._page_paths_recursive: 231 # Treat each entry as a sub-tree root; collect all descendants. 232 path_root_pairs: List[Tuple[str, str]] = [] 233 seen: set = set() 234 for root in self._page_paths: 235 for p in self._collect_page_paths(session, api_identifier, root_path=root): 236 if p not in seen: 237 seen.add(p) 238 path_root_pairs.append((p, root)) 239 self._logger.info( 240 "Using recursive page list", 241 wiki_id=wiki_id, 242 roots=self._page_paths, 243 count=len(path_root_pairs), 244 ) 245 else: 246 # Exact pages only — each path is its own root for metadata. 247 path_root_pairs = [(p, p) for p in self._page_paths] 248 self._logger.info( 249 "Using explicit page list", 250 wiki_id=wiki_id, 251 count=len(path_root_pairs), 252 ) 253 else: 254 collected = self._collect_page_paths(session, api_identifier) 255 path_root_pairs = [(p, self._root_path) for p in collected] 256 self._logger.info("Pages found", wiki_id=wiki_id, count=len(path_root_pairs)) 257 258 for path, origin_root in path_root_pairs: 259 doc = self._fetch_page(session, api_identifier, wiki_name, path, root_path=origin_root) 260 if doc: 261 documents.append(doc) 262 263 self._logger.info( 264 "Azure DevOps Wiki ingest complete", 265 total_documents=len(documents), 266 scope=effective_scope, 267 ) 268 return documents 269 270 # ── Private helpers ────────────────────────────────────────────────────── 271 272 def _build_session(self, requests): 273 """Create a requests.Session with PAT Basic Auth pre-configured.""" 274 token = base64.b64encode(f":{self._pat}".encode()).decode() 275 session = requests.Session() 276 session.headers.update({ 277 "Authorization": f"Basic {token}", 278 "Accept": "application/json", 279 }) 280 session.verify = self._verify 281 return session 282 283 def _api_url(self, path: str) -> str: 284 return ( 285 f"{self._API_BASE}/{self._org}/{self._project}" 286 f"/_apis/wiki/{path}" 287 ) 288 289 def _get(self, session, path: str, params: Optional[Dict] = None) -> dict: 290 url = self._api_url(path) 291 all_params: Dict = {"api-version": self._API_VERSION} 292 if params: 293 all_params.update(params) 294 resp = session.get(url, params=all_params, timeout=30) 295 resp.raise_for_status() 296 return resp.json() 297 298 def _list_wikis(self, session) -> List[Tuple[str, str]]: 299 """ 300 Return (id, name) pairs for all wikis in the project. 301 Reading name here avoids a separate API call per wiki. 302 """ 303 try: 304 data = self._get(session, "wikis") 305 except Exception as exc: 306 self._logger.warning( 307 "Failed to list wikis — check PAT permissions (Wiki Read scope required)", 308 error=str(exc), 309 ) 310 return [] 311 return [(w["id"], w.get("name", w["id"])) for w in data.get("value", [])] 312 313 def _collect_page_paths(self, session, wiki_id: str, root_path=None) -> List[str]: 314 """ 315 Walk the page tree from the given root_path and return a flat list of paths. 316 317 Uses recursionLevel=full to retrieve the entire sub-tree in a single 318 API call, then flattens it. The root "/" is never returned as a page 319 (it is a virtual container); all other paths -- including root_path 320 itself if it is not "/" -- are included. 321 322 Args: 323 root_path: Starting path for this call. Falls back to 324 ``self._root_path`` when not provided. 325 """ 326 effective_root = root_path if root_path is not None else self._root_path 327 try: 328 tree = self._get( 329 session, 330 f"wikis/{wiki_id}/pages", 331 params={ 332 "path": effective_root, 333 "recursionLevel": "full", 334 "includeContent": "false", 335 }, 336 ) 337 except Exception as exc: 338 self._logger.warning( 339 "Could not list pages for wiki", 340 wiki_id=wiki_id, 341 root_path=effective_root, 342 error=str(exc), 343 ) 344 return [] 345 346 paths: List[str] = [] 347 self._walk_tree(tree, paths) 348 return paths 349 350 def _walk_tree(self, node: dict, paths: List[str]) -> None: 351 """ 352 Recursively flatten the page tree into a list of paths. 353 354 Rule: include every path except the virtual root "/". 355 The API response is already scoped to root_path, so no 356 filtering beyond skipping "/" is needed. 357 """ 358 path = node.get("path") 359 if path and path != "/": 360 paths.append(path) 361 for child in node.get("subPages", []): 362 self._walk_tree(child, paths) 363 364 def _fetch_page( 365 self, 366 session, 367 wiki_id: str, 368 wiki_name: str, 369 page_path: str, 370 root_path=None, 371 ) -> Optional[Document]: 372 """Fetch the Markdown content of a single page and return a Document.""" 373 try: 374 data = self._get( 375 session, 376 f"wikis/{wiki_id}/pages", 377 params={ 378 "path": page_path, 379 "includeContent": "true", 380 }, 381 ) 382 except Exception as exc: 383 self._logger.warning( 384 "Skipping page — failed to fetch", 385 wiki_id=wiki_id, 386 path=page_path, 387 error=str(exc), 388 ) 389 return None 390 391 content: str = data.get("content", "").strip() 392 if not content: 393 self._logger.warning("Skipping empty page", wiki_id=wiki_id, path=page_path) 394 return None 395 396 doc_id = "adowiki_" + hashlib.md5(f"{wiki_id}{page_path}".encode()).hexdigest()[:12] 397 398 return Document( 399 id=doc_id, 400 content=content, 401 timestamp=datetime.now(), 402 metadata={ 403 "source": data.get("remoteUrl", ""), 404 "page_path": page_path, 405 "root_path": root_path if root_path is not None else self._root_path, 406 "wiki_id": wiki_id, 407 "wiki_name": wiki_name, 408 "organization": self._org, 409 "project": self._project, 410 }, 411 )
Loads pages from one or more Azure DevOps wikis via the REST API.
The developer provides an organization, project, and PAT. The connector handles everything else: auth, wiki discovery, recursive page-tree traversal, URL encoding, content fetching, and Document creation.
Content is served as Markdown -- pair with WikiPageChunker.chunk_markdown() or MarkdownChunker for best results.
There are three loading modes depending on how much of the wiki you need. See the examples below for guidance on which to use.
Metadata keys set on every returned Document:
source Browser URL of the page
page_path Full path within the wiki (e.g. /Team Wikis/Overview)
root_path Sub-tree root this page was loaded under
wiki_id Wiki name used as the API identifier
wiki_name Human-readable wiki name
organization Azure DevOps organization name
project Azure DevOps project name
NOTE: ADO REST API paths use the actual page title with spaces
(e.g. /Team Wikis), NOT the hyphenated browser URL slug
(e.g. /Team-Wikis).
Example -- Mode 1: root_path (entire sub-tree from one starting point) Use when you want everything under a single section. One API call fetches the full tree; all descendant pages are loaded.
connector = AzureDevOpsWikiConnector(
organization="MyOrg",
project="MyProject",
pat="your-personal-access-token",
root_path="/Team Wikis", # loads this page + all its children
)
docs = connector.load()
Example -- Mode 2: page_paths exact (specific pages, no children) Use when you know the exact pages you need. Tree discovery is skipped; only the listed paths are fetched (page_paths_recursive=False, the default).
connector = AzureDevOpsWikiConnector(
organization="MyOrg",
project="MyProject",
pat="your-personal-access-token",
page_paths=["/Team Wikis/Innovation and GenAI Lab"],
)
docs = connector.load()
Example -- Mode 3: page_paths recursive (multiple targeted sub-trees) Use when you want full sub-trees for several sections without loading the entire wiki. Each entry is treated as its own root: the page itself and all its descendants are loaded. page_paths takes precedence over root_path when both are provided.
connector = AzureDevOpsWikiConnector(
organization="MyOrg",
project="MyProject",
pat="your-personal-access-token",
page_paths=[
"/Team Wikis/Innovation and GenAI Lab",
"/Team Wikis/GMF Bank",
],
page_paths_recursive=True,
)
docs = connector.load()
114 def __init__( 115 self, 116 organization: str, 117 project: str, 118 pat: str, 119 wiki_ids: Optional[List[str]] = None, 120 root_path: str = "/", 121 page_paths: Optional[List[str]] = None, 122 page_paths_recursive: bool = False, 123 ssl_cert_path: Optional[str] = None, 124 ): 125 """ 126 Args: 127 organization: Azure DevOps organization name (e.g. ``"MyOrg"``). 128 project: Azure DevOps project name (e.g. ``"MyProject"``). 129 pat: Personal Access Token with Wiki (Read) scope. 130 wiki_ids: List of wiki identifiers (names or GUIDs) to ingest. 131 Pass ``None`` (default) to auto-discover and ingest 132 all wikis in the project. 133 root_path: Starting page path for the tree walk. 134 ``"/"`` (default) ingests the entire wiki. 135 Any sub-path (e.g. ``"/Team Wikis"``) scopes 136 ingestion to that page and all its descendants. 137 Ignored when ``page_paths`` is provided. 138 page_paths: Explicit list of page paths to fetch (e.g. 139 ``["/Team Wikis/Innovation and GenAI Lab"]``). 140 When set, tree discovery via ``root_path`` is 141 skipped. Each entry is fetched exactly as-is 142 unless ``page_paths_recursive=True``. 143 page_paths_recursive: When ``True`` and ``page_paths`` is set, each 144 path is treated as a sub-tree root: the page 145 itself **and all its descendants** are loaded. 146 When ``False`` (default), only the exact pages 147 listed are fetched. 148 ssl_cert_path: Path to a CA bundle PEM file for environments 149 with corporate SSL inspection (optional). 150 """ 151 self._org = organization 152 self._project = project 153 self._pat = pat 154 self._wiki_ids = wiki_ids 155 self._root_path = root_path.rstrip("/") or "/" 156 self._page_paths = page_paths 157 self._page_paths_recursive = page_paths_recursive 158 self._verify: str | bool = ssl_cert_path or True 159 self._logger = BasicLogger(__name__)
Args:
organization: Azure DevOps organization name (e.g. "MyOrg").
project: Azure DevOps project name (e.g. "MyProject").
pat: Personal Access Token with Wiki (Read) scope.
wiki_ids: List of wiki identifiers (names or GUIDs) to ingest.
Pass None (default) to auto-discover and ingest
all wikis in the project.
root_path: Starting page path for the tree walk.
"/" (default) ingests the entire wiki.
Any sub-path (e.g. "/Team Wikis") scopes
ingestion to that page and all its descendants.
Ignored when page_paths is provided.
page_paths: Explicit list of page paths to fetch (e.g.
["/Team Wikis/Innovation and GenAI Lab"]).
When set, tree discovery via root_path is
skipped. Each entry is fetched exactly as-is
unless page_paths_recursive=True.
page_paths_recursive: When True and page_paths is set, each
path is treated as a sub-tree root: the page
itself and all its descendants are loaded.
When False (default), only the exact pages
listed are fetched.
ssl_cert_path: Path to a CA bundle PEM file for environments
with corporate SSL inspection (optional).
163 def load(self) -> List[Document]: 164 """ 165 Load all pages from the configured Azure DevOps wiki(s). 166 167 Handles everything automatically: 168 - Discovers all wikis if wiki_ids is not specified 169 - Walks the full page tree from root_path 170 - Fetches Markdown content for each page 171 - Returns one Document per page with complete metadata 172 173 Returns: 174 List of Document objects, one per successfully loaded page. 175 176 Raises: 177 ImportError: If the ``requests`` package is not installed. 178 requests.HTTPError: If authentication or a critical API call fails. 179 """ 180 try: 181 import requests # type: ignore 182 except ImportError: 183 raise ImportError( 184 "requests is required for AzureDevOpsWikiConnector. " 185 "Install it with: pip install requests" 186 ) 187 188 session = self._build_session(requests) 189 190 # Auto-discover wikis if not specified 191 wiki_list = self._list_wikis(session) 192 if self._wiki_ids: 193 # Match on GUID *or* name — the ADO list-wikis endpoint returns 194 # a GUID as ``id`` but users typically supply the readable wiki 195 # name (e.g. "MyProject.wiki"). Matching on name covers that case. 196 wiki_list = [ 197 (wid, name) for wid, name in wiki_list 198 if wid in self._wiki_ids or name in self._wiki_ids 199 ] 200 # If discovery returned nothing (empty project, auth issue, etc.) 201 # fall back to using the caller-supplied identifiers directly. 202 # The ADO pages API requires the wiki *name* as wikiIdentifier 203 # (GUIDs are only accepted by the wiki-level endpoints, not pages). 204 if not wiki_list: 205 self._logger.warning( 206 "Wiki discovery returned no matches — using provided wiki_ids directly", 207 wiki_ids=self._wiki_ids, 208 ) 209 wiki_list = [(wid, wid) for wid in self._wiki_ids] 210 211 effective_scope = self._page_paths if self._page_paths is not None else [self._root_path] 212 self._logger.info( 213 "Starting Azure DevOps Wiki ingest", 214 organization=self._org, 215 project=self._project, 216 wikis=[wid for wid, _ in wiki_list], 217 scope=effective_scope, 218 ) 219 220 documents: List[Document] = [] 221 for wiki_id, wiki_name in wiki_list: 222 # The ADO pages API uses the wiki *name* as the wikiIdentifier in the 223 # URL path — GUIDs work only for the wiki-metadata endpoints, not pages. 224 api_identifier = wiki_name 225 self._logger.info("Ingesting wiki", wiki_id=wiki_id, wiki_name=wiki_name) 226 227 # Build (page_path, origin_root) pairs so _fetch_page can store the 228 # correct root_path in metadata without mutating self._root_path. 229 if self._page_paths is not None: 230 if self._page_paths_recursive: 231 # Treat each entry as a sub-tree root; collect all descendants. 232 path_root_pairs: List[Tuple[str, str]] = [] 233 seen: set = set() 234 for root in self._page_paths: 235 for p in self._collect_page_paths(session, api_identifier, root_path=root): 236 if p not in seen: 237 seen.add(p) 238 path_root_pairs.append((p, root)) 239 self._logger.info( 240 "Using recursive page list", 241 wiki_id=wiki_id, 242 roots=self._page_paths, 243 count=len(path_root_pairs), 244 ) 245 else: 246 # Exact pages only — each path is its own root for metadata. 247 path_root_pairs = [(p, p) for p in self._page_paths] 248 self._logger.info( 249 "Using explicit page list", 250 wiki_id=wiki_id, 251 count=len(path_root_pairs), 252 ) 253 else: 254 collected = self._collect_page_paths(session, api_identifier) 255 path_root_pairs = [(p, self._root_path) for p in collected] 256 self._logger.info("Pages found", wiki_id=wiki_id, count=len(path_root_pairs)) 257 258 for path, origin_root in path_root_pairs: 259 doc = self._fetch_page(session, api_identifier, wiki_name, path, root_path=origin_root) 260 if doc: 261 documents.append(doc) 262 263 self._logger.info( 264 "Azure DevOps Wiki ingest complete", 265 total_documents=len(documents), 266 scope=effective_scope, 267 ) 268 return documents
Load all pages from the configured Azure DevOps wiki(s).
Handles everything automatically:
- Discovers all wikis if wiki_ids is not specified
- Walks the full page tree from root_path
- Fetches Markdown content for each page
- Returns one Document per page with complete metadata
Returns: List of Document objects, one per successfully loaded page.
Raises:
ImportError: If the requests package is not installed.
requests.HTTPError: If authentication or a critical API call fails.