gmf_forge_ai_data.chunkers
Text processing and chunking module for RAG applications.
This module provides various text chunking strategies for splitting documents into manageable pieces for embedding and retrieval.
1""" 2Text processing and chunking module for RAG applications. 3 4This module provides various text chunking strategies for splitting documents 5into manageable pieces for embedding and retrieval. 6""" 7 8from .base_chunker import Chunk, BaseChunker 9from .fixed_size_chunker import FixedSizeChunker 10from .semantic_chunker import SemanticChunker 11from .recursive_chunker import RecursiveChunker 12from .sentence_chunker import SentenceChunker 13from .markdown_chunker import MarkdownChunker 14from .code_chunker import CodeChunker 15from .wiki_chunker import WikiPageChunker 16 17__all__ = [ 18 # Data structures 19 "Chunk", 20 21 # Base class 22 "BaseChunker", 23 24 # Chunking strategies 25 "FixedSizeChunker", 26 "SemanticChunker", 27 "RecursiveChunker", 28 "SentenceChunker", 29 "MarkdownChunker", 30 "CodeChunker", 31 "WikiPageChunker", 32]
14@dataclass 15class Chunk: 16 """ 17 Represents a chunk of text with associated metadata. 18 19 Attributes: 20 text: The actual text content of the chunk 21 metadata: Dictionary of metadata (source, page number, etc.) 22 start_pos: Character position where chunk starts in original text 23 end_pos: Character position where chunk ends in original text 24 chunk_id: Unique identifier for the chunk 25 """ 26 text: str 27 metadata: Dict[str, Any] = field(default_factory=dict) 28 start_pos: int = 0 29 end_pos: int = 0 30 chunk_id: str = "" 31 32 def __post_init__(self): 33 """Validate chunk after initialization.""" 34 if not self.text: 35 raise ValueError("Chunk text cannot be empty") 36 if self.end_pos < self.start_pos: 37 raise ValueError(f"end_pos ({self.end_pos}) cannot be less than start_pos ({self.start_pos})") 38 39 def __len__(self) -> int: 40 """Return the length of the chunk text.""" 41 return len(self.text) 42 43 def __str__(self) -> str: 44 """Return a string representation of the chunk.""" 45 preview = self.text[:50] + "..." if len(self.text) > 50 else self.text 46 return f"Chunk(id={self.chunk_id}, len={len(self.text)}, text='{preview}')"
Represents a chunk of text with associated metadata.
Attributes: text: The actual text content of the chunk metadata: Dictionary of metadata (source, page number, etc.) start_pos: Character position where chunk starts in original text end_pos: Character position where chunk ends in original text chunk_id: Unique identifier for the chunk
49class BaseChunker(ABC): 50 """ 51 Abstract base class for all text chunking strategies. 52 53 All chunking implementations must inherit from this class and implement 54 the chunk() method. 55 """ 56 57 def __init__(self, logger=None): 58 """ 59 Initialize the base chunker. 60 61 Args: 62 logger: Optional BasicLogger instance for structured logging 63 """ 64 self.logger = logger 65 66 @abstractmethod 67 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 68 """ 69 Split text into chunks according to the chunking strategy. 70 71 Args: 72 text: The input text to chunk 73 metadata: Optional metadata to attach to each chunk 74 75 Returns: 76 List of Chunk objects 77 78 Raises: 79 ValueError: If text is empty or None 80 """ 81 pass 82 83 def validate_text(self, text: str) -> None: 84 """ 85 Validate input text before chunking. 86 87 Args: 88 text: The text to validate 89 90 Raises: 91 ValueError: If text is None, empty, or not a string 92 """ 93 if text is None: 94 raise ValueError("Text cannot be None") 95 if not isinstance(text, str): 96 raise ValueError(f"Text must be a string, got {type(text)}") 97 if not text.strip(): 98 raise ValueError("Text cannot be empty or whitespace only") 99 100 def _generate_chunk_id(self, index: int, metadata: Optional[Dict[str, Any]] = None) -> str: 101 """ 102 Generate a unique chunk ID. 103 104 Args: 105 index: The index of the chunk in the sequence 106 metadata: Optional metadata that may contain source information 107 108 Returns: 109 A unique chunk identifier 110 """ 111 if metadata and "source" in metadata: 112 return f"{metadata['source']}_chunk_{index}" 113 return f"chunk_{index}"
Abstract base class for all text chunking strategies.
All chunking implementations must inherit from this class and implement the chunk() method.
57 def __init__(self, logger=None): 58 """ 59 Initialize the base chunker. 60 61 Args: 62 logger: Optional BasicLogger instance for structured logging 63 """ 64 self.logger = logger
Initialize the base chunker.
Args: logger: Optional BasicLogger instance for structured logging
66 @abstractmethod 67 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 68 """ 69 Split text into chunks according to the chunking strategy. 70 71 Args: 72 text: The input text to chunk 73 metadata: Optional metadata to attach to each chunk 74 75 Returns: 76 List of Chunk objects 77 78 Raises: 79 ValueError: If text is empty or None 80 """ 81 pass
Split text into chunks according to the chunking strategy.
Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is empty or None
83 def validate_text(self, text: str) -> None: 84 """ 85 Validate input text before chunking. 86 87 Args: 88 text: The text to validate 89 90 Raises: 91 ValueError: If text is None, empty, or not a string 92 """ 93 if text is None: 94 raise ValueError("Text cannot be None") 95 if not isinstance(text, str): 96 raise ValueError(f"Text must be a string, got {type(text)}") 97 if not text.strip(): 98 raise ValueError("Text cannot be empty or whitespace only")
Validate input text before chunking.
Args: text: The text to validate
Raises: ValueError: If text is None, empty, or not a string
15class FixedSizeChunker(BaseChunker): 16 """ 17 Chunks text into fixed-size token-based segments with optional overlap. 18 19 This is one of the most common chunking strategies for LLM applications, 20 ensuring chunks don't exceed model context windows or embedding limits. 21 """ 22 23 def __init__( 24 self, 25 chunk_size: int = 512, 26 chunk_overlap: int = 50, 27 encoding_name: str = "cl100k_base", 28 logger=None 29 ): 30 """ 31 Initialize the fixed-size chunker. 32 33 Args: 34 chunk_size: Maximum number of tokens per chunk (default: 512) 35 chunk_overlap: Number of tokens to overlap between chunks (default: 50) 36 encoding_name: tiktoken encoding name (default: "cl100k_base" for GPT-3.5/4) 37 Options: "cl100k_base" (GPT-3.5, GPT-4) 38 "p50k_base" (CodeX, GPT-3) 39 "r50k_base" (GPT-2) 40 logger: Optional BasicLogger instance 41 42 Raises: 43 ValueError: If chunk_size <= 0 or chunk_overlap >= chunk_size 44 """ 45 super().__init__(logger) 46 47 if chunk_size <= 0: 48 raise ValueError("chunk_size must be greater than 0") 49 if chunk_overlap < 0: 50 raise ValueError("chunk_overlap must be non-negative") 51 if chunk_overlap >= chunk_size: 52 raise ValueError("chunk_overlap must be less than chunk_size") 53 54 self.chunk_size = chunk_size 55 self.chunk_overlap = chunk_overlap 56 self.encoding_name = encoding_name 57 58 try: 59 self.encoding = tiktoken.get_encoding(encoding_name) 60 except Exception as e: 61 raise ValueError(f"Failed to load tiktoken encoding '{encoding_name}': {e}") 62 63 if self.logger: 64 self.logger.info( 65 "Initialized FixedSizeChunker", 66 chunk_size=chunk_size, 67 chunk_overlap=chunk_overlap, 68 encoding=encoding_name 69 ) 70 71 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 72 """ 73 Split text into fixed-size token-based chunks with overlap. 74 75 Args: 76 text: The input text to chunk 77 metadata: Optional metadata to attach to each chunk 78 79 Returns: 80 List of Chunk objects 81 82 Raises: 83 ValueError: If text is invalid 84 """ 85 self.validate_text(text) 86 87 if metadata is None: 88 metadata = {} 89 90 if self.logger: 91 self.logger.debug(f"Chunking text of length {len(text)} characters") 92 93 # Encode text to tokens 94 tokens = self.encoding.encode(text) 95 total_tokens = len(tokens) 96 97 if self.logger: 98 self.logger.debug(f"Text tokenized to {total_tokens} tokens") 99 100 chunks = [] 101 start_idx = 0 102 chunk_index = 0 103 104 while start_idx < total_tokens: 105 # Calculate end index for this chunk 106 end_idx = min(start_idx + self.chunk_size, total_tokens) 107 108 # Extract token slice 109 chunk_tokens = tokens[start_idx:end_idx] 110 111 # Decode tokens back to text 112 chunk_text = self.encoding.decode(chunk_tokens) 113 114 # Find character positions in original text 115 # This is approximate since token boundaries don't always align with char boundaries 116 char_start = len(self.encoding.decode(tokens[:start_idx])) 117 char_end = char_start + len(chunk_text) 118 119 # Create chunk 120 chunk = Chunk( 121 text=chunk_text, 122 metadata=metadata.copy(), 123 start_pos=char_start, 124 end_pos=char_end, 125 chunk_id=self._generate_chunk_id(chunk_index, metadata) 126 ) 127 128 # Add token count to metadata 129 chunk.metadata["token_count"] = len(chunk_tokens) 130 chunk.metadata["chunking_strategy"] = "fixed_size" 131 132 chunks.append(chunk) 133 chunk_index += 1 134 135 # Move start index forward, accounting for overlap 136 start_idx += self.chunk_size - self.chunk_overlap 137 138 # Break if we've reached the end 139 if end_idx >= total_tokens: 140 break 141 142 if self.logger: 143 self.logger.info( 144 f"Created {len(chunks)} chunks", 145 total_tokens=total_tokens, 146 avg_tokens_per_chunk=total_tokens / len(chunks) if chunks else 0 147 ) 148 149 return chunks 150 151 def count_tokens(self, text: str) -> int: 152 """ 153 Count the number of tokens in a text string. 154 155 Args: 156 text: The text to count tokens for 157 158 Returns: 159 Number of tokens 160 """ 161 return len(self.encoding.encode(text))
Chunks text into fixed-size token-based segments with optional overlap.
This is one of the most common chunking strategies for LLM applications, ensuring chunks don't exceed model context windows or embedding limits.
23 def __init__( 24 self, 25 chunk_size: int = 512, 26 chunk_overlap: int = 50, 27 encoding_name: str = "cl100k_base", 28 logger=None 29 ): 30 """ 31 Initialize the fixed-size chunker. 32 33 Args: 34 chunk_size: Maximum number of tokens per chunk (default: 512) 35 chunk_overlap: Number of tokens to overlap between chunks (default: 50) 36 encoding_name: tiktoken encoding name (default: "cl100k_base" for GPT-3.5/4) 37 Options: "cl100k_base" (GPT-3.5, GPT-4) 38 "p50k_base" (CodeX, GPT-3) 39 "r50k_base" (GPT-2) 40 logger: Optional BasicLogger instance 41 42 Raises: 43 ValueError: If chunk_size <= 0 or chunk_overlap >= chunk_size 44 """ 45 super().__init__(logger) 46 47 if chunk_size <= 0: 48 raise ValueError("chunk_size must be greater than 0") 49 if chunk_overlap < 0: 50 raise ValueError("chunk_overlap must be non-negative") 51 if chunk_overlap >= chunk_size: 52 raise ValueError("chunk_overlap must be less than chunk_size") 53 54 self.chunk_size = chunk_size 55 self.chunk_overlap = chunk_overlap 56 self.encoding_name = encoding_name 57 58 try: 59 self.encoding = tiktoken.get_encoding(encoding_name) 60 except Exception as e: 61 raise ValueError(f"Failed to load tiktoken encoding '{encoding_name}': {e}") 62 63 if self.logger: 64 self.logger.info( 65 "Initialized FixedSizeChunker", 66 chunk_size=chunk_size, 67 chunk_overlap=chunk_overlap, 68 encoding=encoding_name 69 )
Initialize the fixed-size chunker.
Args: chunk_size: Maximum number of tokens per chunk (default: 512) chunk_overlap: Number of tokens to overlap between chunks (default: 50) encoding_name: tiktoken encoding name (default: "cl100k_base" for GPT-3.5/4) Options: "cl100k_base" (GPT-3.5, GPT-4) "p50k_base" (CodeX, GPT-3) "r50k_base" (GPT-2) logger: Optional BasicLogger instance
Raises: ValueError: If chunk_size <= 0 or chunk_overlap >= chunk_size
71 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 72 """ 73 Split text into fixed-size token-based chunks with overlap. 74 75 Args: 76 text: The input text to chunk 77 metadata: Optional metadata to attach to each chunk 78 79 Returns: 80 List of Chunk objects 81 82 Raises: 83 ValueError: If text is invalid 84 """ 85 self.validate_text(text) 86 87 if metadata is None: 88 metadata = {} 89 90 if self.logger: 91 self.logger.debug(f"Chunking text of length {len(text)} characters") 92 93 # Encode text to tokens 94 tokens = self.encoding.encode(text) 95 total_tokens = len(tokens) 96 97 if self.logger: 98 self.logger.debug(f"Text tokenized to {total_tokens} tokens") 99 100 chunks = [] 101 start_idx = 0 102 chunk_index = 0 103 104 while start_idx < total_tokens: 105 # Calculate end index for this chunk 106 end_idx = min(start_idx + self.chunk_size, total_tokens) 107 108 # Extract token slice 109 chunk_tokens = tokens[start_idx:end_idx] 110 111 # Decode tokens back to text 112 chunk_text = self.encoding.decode(chunk_tokens) 113 114 # Find character positions in original text 115 # This is approximate since token boundaries don't always align with char boundaries 116 char_start = len(self.encoding.decode(tokens[:start_idx])) 117 char_end = char_start + len(chunk_text) 118 119 # Create chunk 120 chunk = Chunk( 121 text=chunk_text, 122 metadata=metadata.copy(), 123 start_pos=char_start, 124 end_pos=char_end, 125 chunk_id=self._generate_chunk_id(chunk_index, metadata) 126 ) 127 128 # Add token count to metadata 129 chunk.metadata["token_count"] = len(chunk_tokens) 130 chunk.metadata["chunking_strategy"] = "fixed_size" 131 132 chunks.append(chunk) 133 chunk_index += 1 134 135 # Move start index forward, accounting for overlap 136 start_idx += self.chunk_size - self.chunk_overlap 137 138 # Break if we've reached the end 139 if end_idx >= total_tokens: 140 break 141 142 if self.logger: 143 self.logger.info( 144 f"Created {len(chunks)} chunks", 145 total_tokens=total_tokens, 146 avg_tokens_per_chunk=total_tokens / len(chunks) if chunks else 0 147 ) 148 149 return chunks
Split text into fixed-size token-based chunks with overlap.
Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is invalid
151 def count_tokens(self, text: str) -> int: 152 """ 153 Count the number of tokens in a text string. 154 155 Args: 156 text: The text to count tokens for 157 158 Returns: 159 Number of tokens 160 """ 161 return len(self.encoding.encode(text))
Count the number of tokens in a text string.
Args: text: The text to count tokens for
Returns: Number of tokens
15class SemanticChunker(BaseChunker): 16 """ 17 Chunks text based on semantic similarity and sentence boundaries. 18 19 This chunker respects sentence boundaries and can optionally group 20 semantically related sentences together using embeddings. 21 22 By default uses simple regex for sentence detection. For better accuracy, 23 pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize). 24 """ 25 26 def __init__( 27 self, 28 sentence_tokenizer: Callable[[str], List[str]], 29 max_chunk_size: int = 1000, 30 min_chunk_size: int = 100, 31 similarity_threshold: float = 0.5, 32 logger=None 33 ): 34 """ 35 Initialize the semantic chunker. 36 37 Args: 38 sentence_tokenizer: REQUIRED callable that splits text into sentences. 39 Use nltk.sent_tokenize (recommended), spacy, or custom function. 40 Example: nltk.sent_tokenize 41 Must return List[str] of sentences. 42 max_chunk_size: Maximum characters per chunk (default: 1000) 43 min_chunk_size: Minimum characters per chunk (default: 100) 44 similarity_threshold: Threshold for semantic similarity (0.0-1.0, default: 0.5) 45 Not used in basic implementation 46 logger: Optional BasicLogger instance 47 48 Raises: 49 ValueError: If sentence_tokenizer is not provided 50 """ 51 super().__init__(logger) 52 53 if sentence_tokenizer is None: 54 raise ValueError( 55 "sentence_tokenizer is required. Please provide a sentence tokenization function.\n" 56 "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n" 57 "Example: SemanticChunker(sentence_tokenizer=nltk.sent_tokenize)" 58 ) 59 60 if max_chunk_size <= 0: 61 raise ValueError("max_chunk_size must be greater than 0") 62 if min_chunk_size < 0: 63 raise ValueError("min_chunk_size must be non-negative") 64 if min_chunk_size >= max_chunk_size: 65 raise ValueError("min_chunk_size must be less than max_chunk_size") 66 if not (0.0 <= similarity_threshold <= 1.0): 67 raise ValueError("similarity_threshold must be between 0.0 and 1.0") 68 69 self.sentence_tokenizer = sentence_tokenizer 70 self.max_chunk_size = max_chunk_size 71 self.min_chunk_size = min_chunk_size 72 self.similarity_threshold = similarity_threshold 73 74 if self.logger: 75 self.logger.info( 76 "Initialized SemanticChunker", 77 max_chunk_size=max_chunk_size, 78 min_chunk_size=min_chunk_size, 79 tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom' 80 ) 81 82 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 83 """ 84 Split text into chunks based on sentence boundaries and semantic similarity. 85 86 Args: 87 text: The input text to chunk 88 metadata: Optional metadata to attach to each chunk 89 90 Returns: 91 List of Chunk objects 92 93 Raises: 94 ValueError: If text is invalid 95 """ 96 self.validate_text(text) 97 98 if metadata is None: 99 metadata = {} 100 101 if self.logger: 102 self.logger.debug(f"Chunking text of length {len(text)} characters") 103 104 # Split into sentences 105 sentences = self._split_into_sentences(text) 106 107 if self.logger: 108 self.logger.debug(f"Split text into {len(sentences)} sentences") 109 110 # Group sentences into chunks 111 chunks = [] 112 current_chunk_sentences = [] 113 current_chunk_size = 0 114 chunk_index = 0 115 char_position = 0 116 117 for sentence_text, start, end in sentences: 118 sentence_len = len(sentence_text) 119 120 # Check if adding this sentence would exceed max size 121 if current_chunk_size + sentence_len > self.max_chunk_size and current_chunk_sentences: 122 # Create chunk from accumulated sentences 123 chunk_text = " ".join(current_chunk_sentences) 124 chunk_start = char_position - current_chunk_size 125 126 chunk = Chunk( 127 text=chunk_text, 128 metadata=metadata.copy(), 129 start_pos=chunk_start, 130 end_pos=char_position, 131 chunk_id=self._generate_chunk_id(chunk_index, metadata) 132 ) 133 chunk.metadata["sentence_count"] = len(current_chunk_sentences) 134 chunk.metadata["chunking_strategy"] = "semantic" 135 136 chunks.append(chunk) 137 chunk_index += 1 138 139 # Start new chunk 140 current_chunk_sentences = [] 141 current_chunk_size = 0 142 143 # Add sentence to current chunk 144 current_chunk_sentences.append(sentence_text) 145 current_chunk_size += sentence_len + 1 # +1 for space 146 char_position = end 147 148 # Create final chunk if there are remaining sentences 149 if current_chunk_sentences: 150 chunk_text = " ".join(current_chunk_sentences) 151 chunk_start = char_position - current_chunk_size 152 153 chunk = Chunk( 154 text=chunk_text, 155 metadata=metadata.copy(), 156 start_pos=chunk_start, 157 end_pos=char_position, 158 chunk_id=self._generate_chunk_id(chunk_index, metadata) 159 ) 160 chunk.metadata["sentence_count"] = len(current_chunk_sentences) 161 chunk.metadata["chunking_strategy"] = "semantic" 162 163 chunks.append(chunk) 164 165 if self.logger: 166 self.logger.info( 167 f"Created {len(chunks)} semantic chunks", 168 total_sentences=len(sentences), 169 avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0 170 ) 171 172 return chunks 173 174 def _split_into_sentences(self, text: str) -> List[tuple]: 175 """ 176 Split text into sentences using provided tokenizer. 177 178 Args: 179 text: The text to split 180 181 Returns: 182 List of tuples (sentence_text, start_pos, end_pos) 183 184 Raises: 185 RuntimeError: If sentence tokenizer fails 186 """ 187 try: 188 sentence_texts = self.sentence_tokenizer(text) 189 190 # Calculate positions for each sentence 191 sentences = [] 192 pos = 0 193 for sent_text in sentence_texts: 194 # Find the sentence in the original text 195 idx = text.find(sent_text, pos) 196 if idx != -1: 197 start = idx 198 end = idx + len(sent_text) 199 sentences.append((sent_text, start, end)) 200 pos = end 201 else: 202 # Fallback: estimate position 203 sentences.append((sent_text, pos, pos + len(sent_text))) 204 pos += len(sent_text) 205 206 return sentences if sentences else [(text.strip(), 0, len(text))] 207 208 except Exception as e: 209 raise RuntimeError( 210 f"Sentence tokenizer failed: {e}\n" 211 f"Please ensure your tokenizer function is working correctly." 212 ) from e
Chunks text based on semantic similarity and sentence boundaries.
This chunker respects sentence boundaries and can optionally group semantically related sentences together using embeddings.
By default uses simple regex for sentence detection. For better accuracy, pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize).
26 def __init__( 27 self, 28 sentence_tokenizer: Callable[[str], List[str]], 29 max_chunk_size: int = 1000, 30 min_chunk_size: int = 100, 31 similarity_threshold: float = 0.5, 32 logger=None 33 ): 34 """ 35 Initialize the semantic chunker. 36 37 Args: 38 sentence_tokenizer: REQUIRED callable that splits text into sentences. 39 Use nltk.sent_tokenize (recommended), spacy, or custom function. 40 Example: nltk.sent_tokenize 41 Must return List[str] of sentences. 42 max_chunk_size: Maximum characters per chunk (default: 1000) 43 min_chunk_size: Minimum characters per chunk (default: 100) 44 similarity_threshold: Threshold for semantic similarity (0.0-1.0, default: 0.5) 45 Not used in basic implementation 46 logger: Optional BasicLogger instance 47 48 Raises: 49 ValueError: If sentence_tokenizer is not provided 50 """ 51 super().__init__(logger) 52 53 if sentence_tokenizer is None: 54 raise ValueError( 55 "sentence_tokenizer is required. Please provide a sentence tokenization function.\n" 56 "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n" 57 "Example: SemanticChunker(sentence_tokenizer=nltk.sent_tokenize)" 58 ) 59 60 if max_chunk_size <= 0: 61 raise ValueError("max_chunk_size must be greater than 0") 62 if min_chunk_size < 0: 63 raise ValueError("min_chunk_size must be non-negative") 64 if min_chunk_size >= max_chunk_size: 65 raise ValueError("min_chunk_size must be less than max_chunk_size") 66 if not (0.0 <= similarity_threshold <= 1.0): 67 raise ValueError("similarity_threshold must be between 0.0 and 1.0") 68 69 self.sentence_tokenizer = sentence_tokenizer 70 self.max_chunk_size = max_chunk_size 71 self.min_chunk_size = min_chunk_size 72 self.similarity_threshold = similarity_threshold 73 74 if self.logger: 75 self.logger.info( 76 "Initialized SemanticChunker", 77 max_chunk_size=max_chunk_size, 78 min_chunk_size=min_chunk_size, 79 tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom' 80 )
Initialize the semantic chunker.
Args: sentence_tokenizer: REQUIRED callable that splits text into sentences. Use nltk.sent_tokenize (recommended), spacy, or custom function. Example: nltk.sent_tokenize Must return List[str] of sentences. max_chunk_size: Maximum characters per chunk (default: 1000) min_chunk_size: Minimum characters per chunk (default: 100) similarity_threshold: Threshold for semantic similarity (0.0-1.0, default: 0.5) Not used in basic implementation logger: Optional BasicLogger instance
Raises: ValueError: If sentence_tokenizer is not provided
82 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 83 """ 84 Split text into chunks based on sentence boundaries and semantic similarity. 85 86 Args: 87 text: The input text to chunk 88 metadata: Optional metadata to attach to each chunk 89 90 Returns: 91 List of Chunk objects 92 93 Raises: 94 ValueError: If text is invalid 95 """ 96 self.validate_text(text) 97 98 if metadata is None: 99 metadata = {} 100 101 if self.logger: 102 self.logger.debug(f"Chunking text of length {len(text)} characters") 103 104 # Split into sentences 105 sentences = self._split_into_sentences(text) 106 107 if self.logger: 108 self.logger.debug(f"Split text into {len(sentences)} sentences") 109 110 # Group sentences into chunks 111 chunks = [] 112 current_chunk_sentences = [] 113 current_chunk_size = 0 114 chunk_index = 0 115 char_position = 0 116 117 for sentence_text, start, end in sentences: 118 sentence_len = len(sentence_text) 119 120 # Check if adding this sentence would exceed max size 121 if current_chunk_size + sentence_len > self.max_chunk_size and current_chunk_sentences: 122 # Create chunk from accumulated sentences 123 chunk_text = " ".join(current_chunk_sentences) 124 chunk_start = char_position - current_chunk_size 125 126 chunk = Chunk( 127 text=chunk_text, 128 metadata=metadata.copy(), 129 start_pos=chunk_start, 130 end_pos=char_position, 131 chunk_id=self._generate_chunk_id(chunk_index, metadata) 132 ) 133 chunk.metadata["sentence_count"] = len(current_chunk_sentences) 134 chunk.metadata["chunking_strategy"] = "semantic" 135 136 chunks.append(chunk) 137 chunk_index += 1 138 139 # Start new chunk 140 current_chunk_sentences = [] 141 current_chunk_size = 0 142 143 # Add sentence to current chunk 144 current_chunk_sentences.append(sentence_text) 145 current_chunk_size += sentence_len + 1 # +1 for space 146 char_position = end 147 148 # Create final chunk if there are remaining sentences 149 if current_chunk_sentences: 150 chunk_text = " ".join(current_chunk_sentences) 151 chunk_start = char_position - current_chunk_size 152 153 chunk = Chunk( 154 text=chunk_text, 155 metadata=metadata.copy(), 156 start_pos=chunk_start, 157 end_pos=char_position, 158 chunk_id=self._generate_chunk_id(chunk_index, metadata) 159 ) 160 chunk.metadata["sentence_count"] = len(current_chunk_sentences) 161 chunk.metadata["chunking_strategy"] = "semantic" 162 163 chunks.append(chunk) 164 165 if self.logger: 166 self.logger.info( 167 f"Created {len(chunks)} semantic chunks", 168 total_sentences=len(sentences), 169 avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0 170 ) 171 172 return chunks
Split text into chunks based on sentence boundaries and semantic similarity.
Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is invalid
16class RecursiveChunker(BaseChunker): 17 """ 18 Recursively chunks text using a hierarchy of separators. 19 20 This chunker attempts to split at natural boundaries in order of preference: 21 1. Double newlines (paragraphs) 22 2. Single newlines (lines) 23 3. Sentence boundaries 24 4. Word boundaries 25 5. Character boundaries (last resort) 26 27 This preserves document structure while ensuring chunks don't exceed the maximum size. 28 """ 29 30 def __init__( 31 self, 32 chunk_size: int = 1000, 33 chunk_overlap: int = 100, 34 separators: Optional[List[str]] = None, 35 logger=None 36 ): 37 """ 38 Initialize the recursive chunker. 39 40 Args: 41 chunk_size: Target maximum characters per chunk (default: 1000) 42 chunk_overlap: Characters to overlap between chunks (default: 100) 43 separators: List of separators in priority order (default: standard hierarchy) 44 logger: Optional BasicLogger instance 45 """ 46 super().__init__(logger) 47 48 if chunk_size <= 0: 49 raise ValueError("chunk_size must be greater than 0") 50 if chunk_overlap < 0: 51 raise ValueError("chunk_overlap must be non-negative") 52 if chunk_overlap >= chunk_size: 53 raise ValueError("chunk_overlap must be less than chunk_size") 54 55 self.chunk_size = chunk_size 56 self.chunk_overlap = chunk_overlap 57 58 # Default separator hierarchy if not provided 59 if separators is None: 60 self.separators = [ 61 "\n\n", # Paragraphs 62 "\n", # Lines 63 ". ", # Sentences 64 " ", # Words 65 "" # Characters (no separator) 66 ] 67 else: 68 self.separators = separators 69 70 if self.logger: 71 self.logger.info( 72 "Initialized RecursiveChunker", 73 chunk_size=chunk_size, 74 chunk_overlap=chunk_overlap, 75 num_separators=len(self.separators) 76 ) 77 78 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 79 """ 80 Recursively split text into chunks using hierarchical separators. 81 82 Args: 83 text: The input text to chunk 84 metadata: Optional metadata to attach to each chunk 85 86 Returns: 87 List of Chunk objects 88 89 Raises: 90 ValueError: If text is invalid 91 """ 92 self.validate_text(text) 93 94 if metadata is None: 95 metadata = {} 96 97 if self.logger: 98 self.logger.debug(f"Recursively chunking text of length {len(text)} characters") 99 100 # Perform recursive splitting 101 text_chunks = self._split_text_recursively(text) 102 103 # Convert text chunks to Chunk objects with metadata 104 chunks = [] 105 char_position = 0 106 107 for i, chunk_text in enumerate(text_chunks): 108 chunk = Chunk( 109 text=chunk_text, 110 metadata=metadata.copy(), 111 start_pos=char_position, 112 end_pos=char_position + len(chunk_text), 113 chunk_id=self._generate_chunk_id(i, metadata) 114 ) 115 chunk.metadata["chunking_strategy"] = "recursive" 116 chunks.append(chunk) 117 118 # Update position accounting for overlap 119 char_position += len(chunk_text) - self.chunk_overlap 120 121 if self.logger: 122 self.logger.info( 123 f"Created {len(chunks)} recursive chunks", 124 avg_chunk_size=sum(len(c.text) for c in chunks) / len(chunks) if chunks else 0 125 ) 126 127 return chunks 128 129 def _split_text_recursively(self, text: str) -> List[str]: 130 """ 131 Recursively split text using the separator hierarchy. 132 133 Args: 134 text: Text to split 135 136 Returns: 137 List of text chunks 138 """ 139 return self._split_text(text, self.separators) 140 141 def _split_text(self, text: str, separators: List[str]) -> List[str]: 142 """ 143 Split text using the given separators recursively. 144 145 Args: 146 text: Text to split 147 separators: Remaining separators to try 148 149 Returns: 150 List of text chunks 151 """ 152 final_chunks = [] 153 154 # Choose separator (last one if list is exhausted) 155 separator = separators[-1] if separators else "" 156 157 # Split by current separator 158 if separator: 159 splits = text.split(separator) 160 else: 161 # No separator: split by characters 162 splits = list(text) 163 164 # Process each split segment 165 current_chunk = [] 166 for split in splits: 167 # Add separator back (except for character-level splitting) 168 if separator and current_chunk: 169 split = separator + split 170 171 # If this split is small enough, accumulate it 172 current_size = sum(len(s) for s in current_chunk) 173 174 if current_size + len(split) <= self.chunk_size: 175 current_chunk.append(split) 176 else: 177 # Current chunk is ready 178 if current_chunk: 179 merged = "".join(current_chunk) if not separator else separator.join(current_chunk) 180 if separator == " ": 181 merged = " ".join(c.strip() for c in current_chunk if c.strip()) 182 183 if merged.strip(): 184 final_chunks.append(merged) 185 current_chunk = [] 186 187 # Check if this split itself needs to be broken down 188 if len(split) > self.chunk_size: 189 if len(separators) > 1: 190 # Try next separator in hierarchy 191 sub_chunks = self._split_text(split, separators[1:]) 192 final_chunks.extend(sub_chunks) 193 else: 194 # Force split at chunk_size boundaries 195 for i in range(0, len(split), self.chunk_size): 196 sub_chunk = split[i:i + self.chunk_size] 197 if sub_chunk.strip(): 198 final_chunks.append(sub_chunk) 199 else: 200 current_chunk.append(split) 201 202 # Add remaining chunk 203 if current_chunk: 204 merged = "".join(current_chunk) if not separator else separator.join(current_chunk) 205 if separator == " ": 206 merged = " ".join(c.strip() for c in current_chunk if c.strip()) 207 208 if merged.strip(): 209 final_chunks.append(merged) 210 211 return final_chunks
Recursively chunks text using a hierarchy of separators.
This chunker attempts to split at natural boundaries in order of preference:
- Double newlines (paragraphs)
- Single newlines (lines)
- Sentence boundaries
- Word boundaries
- Character boundaries (last resort)
This preserves document structure while ensuring chunks don't exceed the maximum size.
30 def __init__( 31 self, 32 chunk_size: int = 1000, 33 chunk_overlap: int = 100, 34 separators: Optional[List[str]] = None, 35 logger=None 36 ): 37 """ 38 Initialize the recursive chunker. 39 40 Args: 41 chunk_size: Target maximum characters per chunk (default: 1000) 42 chunk_overlap: Characters to overlap between chunks (default: 100) 43 separators: List of separators in priority order (default: standard hierarchy) 44 logger: Optional BasicLogger instance 45 """ 46 super().__init__(logger) 47 48 if chunk_size <= 0: 49 raise ValueError("chunk_size must be greater than 0") 50 if chunk_overlap < 0: 51 raise ValueError("chunk_overlap must be non-negative") 52 if chunk_overlap >= chunk_size: 53 raise ValueError("chunk_overlap must be less than chunk_size") 54 55 self.chunk_size = chunk_size 56 self.chunk_overlap = chunk_overlap 57 58 # Default separator hierarchy if not provided 59 if separators is None: 60 self.separators = [ 61 "\n\n", # Paragraphs 62 "\n", # Lines 63 ". ", # Sentences 64 " ", # Words 65 "" # Characters (no separator) 66 ] 67 else: 68 self.separators = separators 69 70 if self.logger: 71 self.logger.info( 72 "Initialized RecursiveChunker", 73 chunk_size=chunk_size, 74 chunk_overlap=chunk_overlap, 75 num_separators=len(self.separators) 76 )
Initialize the recursive chunker.
Args: chunk_size: Target maximum characters per chunk (default: 1000) chunk_overlap: Characters to overlap between chunks (default: 100) separators: List of separators in priority order (default: standard hierarchy) logger: Optional BasicLogger instance
78 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 79 """ 80 Recursively split text into chunks using hierarchical separators. 81 82 Args: 83 text: The input text to chunk 84 metadata: Optional metadata to attach to each chunk 85 86 Returns: 87 List of Chunk objects 88 89 Raises: 90 ValueError: If text is invalid 91 """ 92 self.validate_text(text) 93 94 if metadata is None: 95 metadata = {} 96 97 if self.logger: 98 self.logger.debug(f"Recursively chunking text of length {len(text)} characters") 99 100 # Perform recursive splitting 101 text_chunks = self._split_text_recursively(text) 102 103 # Convert text chunks to Chunk objects with metadata 104 chunks = [] 105 char_position = 0 106 107 for i, chunk_text in enumerate(text_chunks): 108 chunk = Chunk( 109 text=chunk_text, 110 metadata=metadata.copy(), 111 start_pos=char_position, 112 end_pos=char_position + len(chunk_text), 113 chunk_id=self._generate_chunk_id(i, metadata) 114 ) 115 chunk.metadata["chunking_strategy"] = "recursive" 116 chunks.append(chunk) 117 118 # Update position accounting for overlap 119 char_position += len(chunk_text) - self.chunk_overlap 120 121 if self.logger: 122 self.logger.info( 123 f"Created {len(chunks)} recursive chunks", 124 avg_chunk_size=sum(len(c.text) for c in chunks) / len(chunks) if chunks else 0 125 ) 126 127 return chunks
Recursively split text into chunks using hierarchical separators.
Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is invalid
15class SentenceChunker(BaseChunker): 16 """ 17 Chunks text by grouping sentences together. 18 19 This chunker preserves sentence boundaries while grouping multiple 20 sentences into chunks based on character count or sentence count limits. 21 22 By default uses simple regex for sentence detection. For better accuracy, 23 pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize). 24 """ 25 26 def __init__( 27 self, 28 sentence_tokenizer: Callable[[str], List[str]], 29 max_chunk_size: int = 1000, 30 sentences_per_chunk: Optional[int] = None, 31 logger=None 32 ): 33 """ 34 Initialize the sentence chunker. 35 36 Args: 37 sentence_tokenizer: REQUIRED callable that splits text into sentences. 38 Use nltk.sent_tokenize (recommended), spacy, or custom function. 39 Example: nltk.sent_tokenize 40 Must return List[str] of sentences. 41 max_chunk_size: Maximum characters per chunk (default: 1000) 42 sentences_per_chunk: Optional fixed number of sentences per chunk 43 If set, this takes priority over max_chunk_size 44 logger: Optional BasicLogger instance 45 46 Raises: 47 ValueError: If sentence_tokenizer is not provided 48 """ 49 super().__init__(logger) 50 51 if sentence_tokenizer is None: 52 raise ValueError( 53 "sentence_tokenizer is required. Please provide a sentence tokenization function.\n" 54 "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n" 55 "Example: SentenceChunker(sentence_tokenizer=nltk.sent_tokenize)" 56 ) 57 58 if max_chunk_size <= 0: 59 raise ValueError("max_chunk_size must be greater than 0") 60 if sentences_per_chunk is not None and sentences_per_chunk <= 0: 61 raise ValueError("sentences_per_chunk must be greater than 0") 62 63 self.sentence_tokenizer = sentence_tokenizer 64 self.max_chunk_size = max_chunk_size 65 self.sentences_per_chunk = sentences_per_chunk 66 67 if self.logger: 68 self.logger.info( 69 "Initialized SentenceChunker", 70 max_chunk_size=max_chunk_size, 71 sentences_per_chunk=sentences_per_chunk, 72 tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom' 73 ) 74 75 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 76 """ 77 Split text into chunks based on sentence boundaries. 78 79 Args: 80 text: The input text to chunk 81 metadata: Optional metadata to attach to each chunk 82 83 Returns: 84 List of Chunk objects 85 86 Raises: 87 ValueError: If text is invalid 88 """ 89 self.validate_text(text) 90 91 if metadata is None: 92 metadata = {} 93 94 if self.logger: 95 self.logger.debug(f"Chunking text of length {len(text)} characters by sentences") 96 97 # Split text into sentences 98 sentences = self._split_sentences(text) 99 100 if self.logger: 101 self.logger.debug(f"Found {len(sentences)} sentences") 102 103 chunks = [] 104 current_sentences = [] 105 current_size = 0 106 chunk_index = 0 107 char_position = 0 108 109 for sentence, start, end in sentences: 110 sentence_len = len(sentence) 111 112 # Check if we should create a new chunk 113 should_chunk = False 114 115 if self.sentences_per_chunk: 116 # Fixed sentence count mode 117 should_chunk = len(current_sentences) >= self.sentences_per_chunk 118 else: 119 # Size-based mode 120 should_chunk = ( 121 current_size + sentence_len > self.max_chunk_size 122 and current_sentences # Don't create empty chunks 123 ) 124 125 if should_chunk: 126 # Create chunk from accumulated sentences 127 chunk_text = " ".join(current_sentences) 128 chunk_start = char_position - current_size 129 130 chunk = Chunk( 131 text=chunk_text, 132 metadata=metadata.copy(), 133 start_pos=chunk_start, 134 end_pos=char_position, 135 chunk_id=self._generate_chunk_id(chunk_index, metadata) 136 ) 137 chunk.metadata["sentence_count"] = len(current_sentences) 138 chunk.metadata["chunking_strategy"] = "sentence" 139 140 chunks.append(chunk) 141 chunk_index += 1 142 143 # Reset for new chunk 144 current_sentences = [] 145 current_size = 0 146 147 # Add sentence to current chunk 148 current_sentences.append(sentence) 149 current_size += sentence_len + 1 # +1 for space 150 char_position = end 151 152 # Create final chunk if there are remaining sentences 153 if current_sentences: 154 chunk_text = " ".join(current_sentences) 155 chunk_start = char_position - current_size 156 157 chunk = Chunk( 158 text=chunk_text, 159 metadata=metadata.copy(), 160 start_pos=chunk_start, 161 end_pos=char_position, 162 chunk_id=self._generate_chunk_id(chunk_index, metadata) 163 ) 164 chunk.metadata["sentence_count"] = len(current_sentences) 165 chunk.metadata["chunking_strategy"] = "sentence" 166 167 chunks.append(chunk) 168 169 if self.logger: 170 self.logger.info( 171 f"Created {len(chunks)} sentence-based chunks", 172 total_sentences=len(sentences), 173 avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0 174 ) 175 176 return chunks 177 178 def _split_sentences(self, text: str) -> List[tuple]: 179 """ 180 Split text into sentences with position tracking using provided tokenizer. 181 182 Args: 183 text: Text to split into sentences 184 185 Returns: 186 List of tuples (sentence_text, start_pos, end_pos) 187 188 Raises: 189 RuntimeError: If sentence tokenizer fails 190 """ 191 try: 192 sentence_texts = self.sentence_tokenizer(text) 193 194 # Calculate positions for each sentence 195 sentences = [] 196 pos = 0 197 for sent_text in sentence_texts: 198 # Find the sentence in the original text 199 idx = text.find(sent_text, pos) 200 if idx != -1: 201 start = idx 202 end = idx + len(sent_text) 203 sentences.append((sent_text, start, end)) 204 pos = end 205 else: 206 # Fallback: estimate position 207 sentences.append((sent_text, pos, pos + len(sent_text))) 208 pos += len(sent_text) 209 210 return sentences if sentences else [(text.strip(), 0, len(text))] 211 212 except Exception as e: 213 raise RuntimeError( 214 f"Sentence tokenizer failed: {e}\n" 215 f"Please ensure your tokenizer function is working correctly." 216 ) from e
Chunks text by grouping sentences together.
This chunker preserves sentence boundaries while grouping multiple sentences into chunks based on character count or sentence count limits.
By default uses simple regex for sentence detection. For better accuracy, pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize).
26 def __init__( 27 self, 28 sentence_tokenizer: Callable[[str], List[str]], 29 max_chunk_size: int = 1000, 30 sentences_per_chunk: Optional[int] = None, 31 logger=None 32 ): 33 """ 34 Initialize the sentence chunker. 35 36 Args: 37 sentence_tokenizer: REQUIRED callable that splits text into sentences. 38 Use nltk.sent_tokenize (recommended), spacy, or custom function. 39 Example: nltk.sent_tokenize 40 Must return List[str] of sentences. 41 max_chunk_size: Maximum characters per chunk (default: 1000) 42 sentences_per_chunk: Optional fixed number of sentences per chunk 43 If set, this takes priority over max_chunk_size 44 logger: Optional BasicLogger instance 45 46 Raises: 47 ValueError: If sentence_tokenizer is not provided 48 """ 49 super().__init__(logger) 50 51 if sentence_tokenizer is None: 52 raise ValueError( 53 "sentence_tokenizer is required. Please provide a sentence tokenization function.\n" 54 "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n" 55 "Example: SentenceChunker(sentence_tokenizer=nltk.sent_tokenize)" 56 ) 57 58 if max_chunk_size <= 0: 59 raise ValueError("max_chunk_size must be greater than 0") 60 if sentences_per_chunk is not None and sentences_per_chunk <= 0: 61 raise ValueError("sentences_per_chunk must be greater than 0") 62 63 self.sentence_tokenizer = sentence_tokenizer 64 self.max_chunk_size = max_chunk_size 65 self.sentences_per_chunk = sentences_per_chunk 66 67 if self.logger: 68 self.logger.info( 69 "Initialized SentenceChunker", 70 max_chunk_size=max_chunk_size, 71 sentences_per_chunk=sentences_per_chunk, 72 tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom' 73 )
Initialize the sentence chunker.
Args: sentence_tokenizer: REQUIRED callable that splits text into sentences. Use nltk.sent_tokenize (recommended), spacy, or custom function. Example: nltk.sent_tokenize Must return List[str] of sentences. max_chunk_size: Maximum characters per chunk (default: 1000) sentences_per_chunk: Optional fixed number of sentences per chunk If set, this takes priority over max_chunk_size logger: Optional BasicLogger instance
Raises: ValueError: If sentence_tokenizer is not provided
75 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 76 """ 77 Split text into chunks based on sentence boundaries. 78 79 Args: 80 text: The input text to chunk 81 metadata: Optional metadata to attach to each chunk 82 83 Returns: 84 List of Chunk objects 85 86 Raises: 87 ValueError: If text is invalid 88 """ 89 self.validate_text(text) 90 91 if metadata is None: 92 metadata = {} 93 94 if self.logger: 95 self.logger.debug(f"Chunking text of length {len(text)} characters by sentences") 96 97 # Split text into sentences 98 sentences = self._split_sentences(text) 99 100 if self.logger: 101 self.logger.debug(f"Found {len(sentences)} sentences") 102 103 chunks = [] 104 current_sentences = [] 105 current_size = 0 106 chunk_index = 0 107 char_position = 0 108 109 for sentence, start, end in sentences: 110 sentence_len = len(sentence) 111 112 # Check if we should create a new chunk 113 should_chunk = False 114 115 if self.sentences_per_chunk: 116 # Fixed sentence count mode 117 should_chunk = len(current_sentences) >= self.sentences_per_chunk 118 else: 119 # Size-based mode 120 should_chunk = ( 121 current_size + sentence_len > self.max_chunk_size 122 and current_sentences # Don't create empty chunks 123 ) 124 125 if should_chunk: 126 # Create chunk from accumulated sentences 127 chunk_text = " ".join(current_sentences) 128 chunk_start = char_position - current_size 129 130 chunk = Chunk( 131 text=chunk_text, 132 metadata=metadata.copy(), 133 start_pos=chunk_start, 134 end_pos=char_position, 135 chunk_id=self._generate_chunk_id(chunk_index, metadata) 136 ) 137 chunk.metadata["sentence_count"] = len(current_sentences) 138 chunk.metadata["chunking_strategy"] = "sentence" 139 140 chunks.append(chunk) 141 chunk_index += 1 142 143 # Reset for new chunk 144 current_sentences = [] 145 current_size = 0 146 147 # Add sentence to current chunk 148 current_sentences.append(sentence) 149 current_size += sentence_len + 1 # +1 for space 150 char_position = end 151 152 # Create final chunk if there are remaining sentences 153 if current_sentences: 154 chunk_text = " ".join(current_sentences) 155 chunk_start = char_position - current_size 156 157 chunk = Chunk( 158 text=chunk_text, 159 metadata=metadata.copy(), 160 start_pos=chunk_start, 161 end_pos=char_position, 162 chunk_id=self._generate_chunk_id(chunk_index, metadata) 163 ) 164 chunk.metadata["sentence_count"] = len(current_sentences) 165 chunk.metadata["chunking_strategy"] = "sentence" 166 167 chunks.append(chunk) 168 169 if self.logger: 170 self.logger.info( 171 f"Created {len(chunks)} sentence-based chunks", 172 total_sentences=len(sentences), 173 avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0 174 ) 175 176 return chunks
Split text into chunks based on sentence boundaries.
Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is invalid
15class MarkdownChunker(BaseChunker): 16 """ 17 Chunks markdown text while respecting document structure. 18 19 This chunker identifies markdown headers (# ## ###, etc.) and uses them 20 as natural boundaries for chunking, preserving the document hierarchy. 21 22 Uses regex-based parsing which works well for standard markdown. For complex 23 markdown with extensions, consider pre-processing with mistune or markdown-it-py. 24 """ 25 26 def __init__( 27 self, 28 max_chunk_size: int = 1500, 29 combine_headers: bool = True, 30 min_header_level: int = 1, 31 logger=None 32 ): 33 """ 34 Initialize the markdown chunker. 35 36 Args: 37 max_chunk_size: Maximum characters per chunk (default: 1500) 38 combine_headers: Whether to combine small sections under headers (default: True) 39 min_header_level: Minimum header level to split at (1-6, default: 1) 40 logger: Optional BasicLogger instance 41 """ 42 super().__init__(logger) 43 44 if max_chunk_size <= 0: 45 raise ValueError("max_chunk_size must be greater than 0") 46 if not (1 <= min_header_level <= 6): 47 raise ValueError("min_header_level must be between 1 and 6") 48 49 self.max_chunk_size = max_chunk_size 50 self.combine_headers = combine_headers 51 self.min_header_level = min_header_level 52 53 # Regex pattern for markdown headers (both ATX and Setext styles) 54 self.header_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE) 55 56 if self.logger: 57 self.logger.info( 58 "Initialized MarkdownChunker", 59 max_chunk_size=max_chunk_size, 60 combine_headers=combine_headers, 61 min_header_level=min_header_level 62 ) 63 64 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 65 """ 66 Split markdown text into chunks respecting header boundaries. 67 68 Args: 69 text: The markdown text to chunk 70 metadata: Optional metadata to attach to each chunk 71 72 Returns: 73 List of Chunk objects 74 75 Raises: 76 ValueError: If text is invalid 77 """ 78 self.validate_text(text) 79 80 if metadata is None: 81 metadata = {} 82 83 if self.logger: 84 self.logger.debug(f"Chunking markdown text of length {len(text)} characters") 85 86 # Find all headers and their positions 87 sections = self._parse_sections(text) 88 89 if self.logger: 90 self.logger.debug(f"Found {len(sections)} markdown sections") 91 92 # Create chunks from sections 93 chunks = [] 94 current_chunk_parts = [] 95 current_size = 0 96 chunk_index = 0 97 current_headers = [] 98 99 for level, header_text, content, start, end in sections: 100 section_text = content 101 section_size = len(section_text) 102 103 # Check if we should start a new chunk 104 if (current_size + section_size > self.max_chunk_size 105 and current_chunk_parts 106 and level <= self.min_header_level): 107 108 # Create chunk from accumulated sections 109 chunk_text = "\n\n".join(current_chunk_parts) 110 111 chunk = Chunk( 112 text=chunk_text, 113 metadata=metadata.copy(), 114 start_pos=start - current_size, 115 end_pos=start, 116 chunk_id=self._generate_chunk_id(chunk_index, metadata) 117 ) 118 chunk.metadata["headers"] = current_headers.copy() 119 chunk.metadata["chunking_strategy"] = "markdown" 120 121 chunks.append(chunk) 122 chunk_index += 1 123 124 # Reset for new chunk 125 current_chunk_parts = [] 126 current_size = 0 127 current_headers = [] 128 129 # Add section to current chunk 130 current_chunk_parts.append(section_text) 131 current_size += section_size + 2 # +2 for \n\n 132 133 # Track header hierarchy 134 if header_text: 135 current_headers.append({ 136 "level": level, 137 "text": header_text 138 }) 139 140 # Create final chunk 141 if current_chunk_parts: 142 chunk_text = "\n\n".join(current_chunk_parts) 143 144 chunk = Chunk( 145 text=chunk_text, 146 metadata=metadata.copy(), 147 start_pos=len(text) - current_size, 148 end_pos=len(text), 149 chunk_id=self._generate_chunk_id(chunk_index, metadata) 150 ) 151 chunk.metadata["headers"] = current_headers 152 chunk.metadata["chunking_strategy"] = "markdown" 153 154 chunks.append(chunk) 155 156 if self.logger: 157 self.logger.info( 158 f"Created {len(chunks)} markdown chunks", 159 total_sections=len(sections), 160 avg_sections_per_chunk=len(sections) / len(chunks) if chunks else 0 161 ) 162 163 return chunks 164 165 def _parse_sections(self, text: str) -> List[Tuple[int, str, str, int, int]]: 166 """ 167 Parse markdown text into sections based on headers. 168 169 Args: 170 text: Markdown text to parse 171 172 Returns: 173 List of tuples (header_level, header_text, content, start_pos, end_pos) 174 """ 175 sections = [] 176 lines = text.split('\n') 177 current_content = [] 178 current_header_level = 0 179 current_header_text = "" 180 section_start = 0 181 char_position = 0 182 183 for i, line in enumerate(lines): 184 # Check if line is a header 185 header_match = self.header_pattern.match(line) 186 187 if header_match: 188 # Save previous section if it exists 189 if current_content or current_header_text: 190 content = '\n'.join(current_content) 191 sections.append(( 192 current_header_level, 193 current_header_text, 194 content, 195 section_start, 196 char_position 197 )) 198 199 # Start new section 200 current_header_level = len(header_match.group(1)) 201 current_header_text = header_match.group(2).strip() 202 current_content = [line] # Include header in content 203 section_start = char_position 204 else: 205 current_content.append(line) 206 207 char_position += len(line) + 1 # +1 for newline 208 209 # Add final section 210 if current_content: 211 content = '\n'.join(current_content) 212 sections.append(( 213 current_header_level, 214 current_header_text, 215 content, 216 section_start, 217 char_position 218 )) 219 220 # If no sections found, treat entire text as one section 221 if not sections: 222 sections = [(0, "", text, 0, len(text))] 223 224 return sections
Chunks markdown text while respecting document structure.
This chunker identifies markdown headers (# ## ###, etc.) and uses them as natural boundaries for chunking, preserving the document hierarchy.
Uses regex-based parsing which works well for standard markdown. For complex markdown with extensions, consider pre-processing with mistune or markdown-it-py.
26 def __init__( 27 self, 28 max_chunk_size: int = 1500, 29 combine_headers: bool = True, 30 min_header_level: int = 1, 31 logger=None 32 ): 33 """ 34 Initialize the markdown chunker. 35 36 Args: 37 max_chunk_size: Maximum characters per chunk (default: 1500) 38 combine_headers: Whether to combine small sections under headers (default: True) 39 min_header_level: Minimum header level to split at (1-6, default: 1) 40 logger: Optional BasicLogger instance 41 """ 42 super().__init__(logger) 43 44 if max_chunk_size <= 0: 45 raise ValueError("max_chunk_size must be greater than 0") 46 if not (1 <= min_header_level <= 6): 47 raise ValueError("min_header_level must be between 1 and 6") 48 49 self.max_chunk_size = max_chunk_size 50 self.combine_headers = combine_headers 51 self.min_header_level = min_header_level 52 53 # Regex pattern for markdown headers (both ATX and Setext styles) 54 self.header_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE) 55 56 if self.logger: 57 self.logger.info( 58 "Initialized MarkdownChunker", 59 max_chunk_size=max_chunk_size, 60 combine_headers=combine_headers, 61 min_header_level=min_header_level 62 )
Initialize the markdown chunker.
Args: max_chunk_size: Maximum characters per chunk (default: 1500) combine_headers: Whether to combine small sections under headers (default: True) min_header_level: Minimum header level to split at (1-6, default: 1) logger: Optional BasicLogger instance
64 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 65 """ 66 Split markdown text into chunks respecting header boundaries. 67 68 Args: 69 text: The markdown text to chunk 70 metadata: Optional metadata to attach to each chunk 71 72 Returns: 73 List of Chunk objects 74 75 Raises: 76 ValueError: If text is invalid 77 """ 78 self.validate_text(text) 79 80 if metadata is None: 81 metadata = {} 82 83 if self.logger: 84 self.logger.debug(f"Chunking markdown text of length {len(text)} characters") 85 86 # Find all headers and their positions 87 sections = self._parse_sections(text) 88 89 if self.logger: 90 self.logger.debug(f"Found {len(sections)} markdown sections") 91 92 # Create chunks from sections 93 chunks = [] 94 current_chunk_parts = [] 95 current_size = 0 96 chunk_index = 0 97 current_headers = [] 98 99 for level, header_text, content, start, end in sections: 100 section_text = content 101 section_size = len(section_text) 102 103 # Check if we should start a new chunk 104 if (current_size + section_size > self.max_chunk_size 105 and current_chunk_parts 106 and level <= self.min_header_level): 107 108 # Create chunk from accumulated sections 109 chunk_text = "\n\n".join(current_chunk_parts) 110 111 chunk = Chunk( 112 text=chunk_text, 113 metadata=metadata.copy(), 114 start_pos=start - current_size, 115 end_pos=start, 116 chunk_id=self._generate_chunk_id(chunk_index, metadata) 117 ) 118 chunk.metadata["headers"] = current_headers.copy() 119 chunk.metadata["chunking_strategy"] = "markdown" 120 121 chunks.append(chunk) 122 chunk_index += 1 123 124 # Reset for new chunk 125 current_chunk_parts = [] 126 current_size = 0 127 current_headers = [] 128 129 # Add section to current chunk 130 current_chunk_parts.append(section_text) 131 current_size += section_size + 2 # +2 for \n\n 132 133 # Track header hierarchy 134 if header_text: 135 current_headers.append({ 136 "level": level, 137 "text": header_text 138 }) 139 140 # Create final chunk 141 if current_chunk_parts: 142 chunk_text = "\n\n".join(current_chunk_parts) 143 144 chunk = Chunk( 145 text=chunk_text, 146 metadata=metadata.copy(), 147 start_pos=len(text) - current_size, 148 end_pos=len(text), 149 chunk_id=self._generate_chunk_id(chunk_index, metadata) 150 ) 151 chunk.metadata["headers"] = current_headers 152 chunk.metadata["chunking_strategy"] = "markdown" 153 154 chunks.append(chunk) 155 156 if self.logger: 157 self.logger.info( 158 f"Created {len(chunks)} markdown chunks", 159 total_sections=len(sections), 160 avg_sections_per_chunk=len(sections) / len(chunks) if chunks else 0 161 ) 162 163 return chunks
Split markdown text into chunks respecting header boundaries.
Args: text: The markdown text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is invalid
15class CodeChunker(BaseChunker): 16 """ 17 Chunks code while respecting function and class boundaries. 18 19 This chunker identifies code structures (functions, classes, methods) 20 and uses them as natural boundaries for chunking, preserving code context. 21 Supports Python, JavaScript, TypeScript, Java, C#, and similar languages. 22 """ 23 24 def __init__( 25 self, 26 max_chunk_size: int = 2000, 27 language: str = "python", 28 include_imports: bool = True, 29 logger=None 30 ): 31 """ 32 Initialize the code chunker. 33 34 Args: 35 max_chunk_size: Maximum characters per chunk (default: 2000) 36 language: Programming language ("python", "javascript", "java", etc.) 37 include_imports: Whether to include imports/using statements in chunks 38 logger: Optional BasicLogger instance 39 """ 40 super().__init__(logger) 41 42 if max_chunk_size <= 0: 43 raise ValueError("max_chunk_size must be greater than 0") 44 45 self.max_chunk_size = max_chunk_size 46 self.language = language.lower() 47 self.include_imports = include_imports 48 49 # Define patterns for different languages 50 self._setup_patterns() 51 52 if self.logger: 53 self.logger.info( 54 "Initialized CodeChunker", 55 max_chunk_size=max_chunk_size, 56 language=language 57 ) 58 59 def _setup_patterns(self): 60 """Setup regex patterns based on language.""" 61 if self.language == "python": 62 self.function_pattern = re.compile( 63 r'^(async\s+)?def\s+\w+\s*\([^)]*\)\s*(->\s*[^:]+)?:', 64 re.MULTILINE 65 ) 66 self.class_pattern = re.compile( 67 r'^class\s+\w+(\([^)]*\))?:\s*$', 68 re.MULTILINE 69 ) 70 self.import_pattern = re.compile( 71 r'^(import\s+\S+|from\s+\S+\s+import\s+.+)$', 72 re.MULTILINE 73 ) 74 elif self.language in ["javascript", "typescript", "java", "csharp", "c++"]: 75 self.function_pattern = re.compile( 76 r'(public|private|protected|static|async)?\s*(function|void|int|string|bool|var|let|const)?\s+\w+\s*\([^)]*\)\s*\{', 77 re.MULTILINE 78 ) 79 self.class_pattern = re.compile( 80 r'(export\s+)?(public\s+)?class\s+\w+(\s+extends\s+\w+)?(\s+implements\s+[\w,\s]+)?\s*\{', 81 re.MULTILINE 82 ) 83 self.import_pattern = re.compile( 84 r'^(import\s+.*from\s+["\'].*["\'];?|using\s+\S+;|#include\s+<.*>)$', 85 re.MULTILINE 86 ) 87 else: 88 # Generic patterns for unknown languages 89 self.function_pattern = re.compile(r'^\s*(def|function|func|fun)\s+\w+', re.MULTILINE) 90 self.class_pattern = re.compile(r'^\s*class\s+\w+', re.MULTILINE) 91 self.import_pattern = re.compile(r'^(import|using|include)\s+', re.MULTILINE) 92 93 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 94 """ 95 Split code into chunks respecting function and class boundaries. 96 97 Args: 98 text: The code text to chunk 99 metadata: Optional metadata to attach to each chunk 100 101 Returns: 102 List of Chunk objects 103 104 Raises: 105 ValueError: If text is invalid 106 """ 107 self.validate_text(text) 108 109 if metadata is None: 110 metadata = {} 111 112 if self.logger: 113 self.logger.debug(f"Chunking code of length {len(text)} characters") 114 115 # Extract imports if needed 116 imports_text = "" 117 if self.include_imports: 118 imports_text = self._extract_imports(text) 119 120 # Find all code blocks (functions and classes) 121 code_blocks = self._parse_code_blocks(text) 122 123 if self.logger: 124 self.logger.debug(f"Found {len(code_blocks)} code blocks") 125 126 # Create chunks from code blocks 127 chunks = [] 128 current_chunk_parts = [] 129 current_size = len(imports_text) 130 chunk_index = 0 131 132 for block_type, block_name, block_content, start, end in code_blocks: 133 block_size = len(block_content) 134 135 # Check if we should start a new chunk 136 if current_size + block_size > self.max_chunk_size and current_chunk_parts: 137 # Create chunk from accumulated blocks 138 chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts) 139 140 chunk = Chunk( 141 text=chunk_text.strip(), 142 metadata=metadata.copy(), 143 start_pos=start - current_size, 144 end_pos=start, 145 chunk_id=self._generate_chunk_id(chunk_index, metadata) 146 ) 147 chunk.metadata["code_blocks"] = len(current_chunk_parts) 148 chunk.metadata["chunking_strategy"] = "code" 149 chunk.metadata["language"] = self.language 150 151 chunks.append(chunk) 152 chunk_index += 1 153 154 # Reset for new chunk 155 current_chunk_parts = [] 156 current_size = len(imports_text) 157 158 # Add block to current chunk 159 current_chunk_parts.append(block_content) 160 current_size += block_size + 2 # +2 for \n\n 161 162 # Create final chunk 163 if current_chunk_parts: 164 chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts) 165 166 chunk = Chunk( 167 text=chunk_text.strip(), 168 metadata=metadata.copy(), 169 start_pos=len(text) - current_size, 170 end_pos=len(text), 171 chunk_id=self._generate_chunk_id(chunk_index, metadata) 172 ) 173 chunk.metadata["code_blocks"] = len(current_chunk_parts) 174 chunk.metadata["chunking_strategy"] = "code" 175 chunk.metadata["language"] = self.language 176 177 chunks.append(chunk) 178 179 if self.logger: 180 self.logger.info( 181 f"Created {len(chunks)} code chunks", 182 total_blocks=len(code_blocks), 183 avg_blocks_per_chunk=len(code_blocks) / len(chunks) if chunks else 0 184 ) 185 186 return chunks 187 188 def _extract_imports(self, text: str) -> str: 189 """Extract import/using statements from the code.""" 190 imports = [] 191 for match in self.import_pattern.finditer(text): 192 imports.append(match.group().strip()) 193 194 return "\n".join(imports) if imports else "" 195 196 def _parse_code_blocks(self, text: str) -> List[Tuple[str, str, str, int, int]]: 197 """ 198 Parse code into blocks (functions, classes, etc.). 199 200 Args: 201 text: Code text to parse 202 203 Returns: 204 List of tuples (block_type, block_name, content, start_pos, end_pos) 205 """ 206 blocks = [] 207 lines = text.split('\n') 208 209 # Find all function and class definitions 210 all_matches = [] 211 212 for match in self.function_pattern.finditer(text): 213 all_matches.append(('function', match.start(), match.group())) 214 215 for match in self.class_pattern.finditer(text): 216 all_matches.append(('class', match.start(), match.group())) 217 218 # Sort by position 219 all_matches.sort(key=lambda x: x[1]) 220 221 # Extract code blocks with their content 222 for i, (block_type, start, match_text) in enumerate(all_matches): 223 # Extract block name 224 block_name = self._extract_name(match_text) 225 226 # Find end of block (next block start or end of file) 227 if i + 1 < len(all_matches): 228 end = all_matches[i + 1][1] 229 else: 230 end = len(text) 231 232 # Extract block content 233 block_content = text[start:end].strip() 234 235 blocks.append((block_type, block_name, block_content, start, end)) 236 237 # If no blocks found, treat entire text as one block 238 if not blocks: 239 blocks = [('code', 'main', text, 0, len(text))] 240 241 return blocks 242 243 def _extract_name(self, definition: str) -> str: 244 """Extract function or class name from definition.""" 245 # Try to find the name after 'def', 'function', 'class', etc. 246 name_match = re.search(r'\b(def|function|class|func|fun)\s+(\w+)', definition) 247 if name_match: 248 return name_match.group(2) 249 250 # Fallback: try to find any word after space 251 words = definition.split() 252 for word in words: 253 if re.match(r'^\w+$', word) and word not in ['def', 'function', 'class', 'public', 'private', 'static', 'async']: 254 return word 255 256 return 'unknown'
Chunks code while respecting function and class boundaries.
This chunker identifies code structures (functions, classes, methods) and uses them as natural boundaries for chunking, preserving code context. Supports Python, JavaScript, TypeScript, Java, C#, and similar languages.
24 def __init__( 25 self, 26 max_chunk_size: int = 2000, 27 language: str = "python", 28 include_imports: bool = True, 29 logger=None 30 ): 31 """ 32 Initialize the code chunker. 33 34 Args: 35 max_chunk_size: Maximum characters per chunk (default: 2000) 36 language: Programming language ("python", "javascript", "java", etc.) 37 include_imports: Whether to include imports/using statements in chunks 38 logger: Optional BasicLogger instance 39 """ 40 super().__init__(logger) 41 42 if max_chunk_size <= 0: 43 raise ValueError("max_chunk_size must be greater than 0") 44 45 self.max_chunk_size = max_chunk_size 46 self.language = language.lower() 47 self.include_imports = include_imports 48 49 # Define patterns for different languages 50 self._setup_patterns() 51 52 if self.logger: 53 self.logger.info( 54 "Initialized CodeChunker", 55 max_chunk_size=max_chunk_size, 56 language=language 57 )
Initialize the code chunker.
Args: max_chunk_size: Maximum characters per chunk (default: 2000) language: Programming language ("python", "javascript", "java", etc.) include_imports: Whether to include imports/using statements in chunks logger: Optional BasicLogger instance
93 def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]: 94 """ 95 Split code into chunks respecting function and class boundaries. 96 97 Args: 98 text: The code text to chunk 99 metadata: Optional metadata to attach to each chunk 100 101 Returns: 102 List of Chunk objects 103 104 Raises: 105 ValueError: If text is invalid 106 """ 107 self.validate_text(text) 108 109 if metadata is None: 110 metadata = {} 111 112 if self.logger: 113 self.logger.debug(f"Chunking code of length {len(text)} characters") 114 115 # Extract imports if needed 116 imports_text = "" 117 if self.include_imports: 118 imports_text = self._extract_imports(text) 119 120 # Find all code blocks (functions and classes) 121 code_blocks = self._parse_code_blocks(text) 122 123 if self.logger: 124 self.logger.debug(f"Found {len(code_blocks)} code blocks") 125 126 # Create chunks from code blocks 127 chunks = [] 128 current_chunk_parts = [] 129 current_size = len(imports_text) 130 chunk_index = 0 131 132 for block_type, block_name, block_content, start, end in code_blocks: 133 block_size = len(block_content) 134 135 # Check if we should start a new chunk 136 if current_size + block_size > self.max_chunk_size and current_chunk_parts: 137 # Create chunk from accumulated blocks 138 chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts) 139 140 chunk = Chunk( 141 text=chunk_text.strip(), 142 metadata=metadata.copy(), 143 start_pos=start - current_size, 144 end_pos=start, 145 chunk_id=self._generate_chunk_id(chunk_index, metadata) 146 ) 147 chunk.metadata["code_blocks"] = len(current_chunk_parts) 148 chunk.metadata["chunking_strategy"] = "code" 149 chunk.metadata["language"] = self.language 150 151 chunks.append(chunk) 152 chunk_index += 1 153 154 # Reset for new chunk 155 current_chunk_parts = [] 156 current_size = len(imports_text) 157 158 # Add block to current chunk 159 current_chunk_parts.append(block_content) 160 current_size += block_size + 2 # +2 for \n\n 161 162 # Create final chunk 163 if current_chunk_parts: 164 chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts) 165 166 chunk = Chunk( 167 text=chunk_text.strip(), 168 metadata=metadata.copy(), 169 start_pos=len(text) - current_size, 170 end_pos=len(text), 171 chunk_id=self._generate_chunk_id(chunk_index, metadata) 172 ) 173 chunk.metadata["code_blocks"] = len(current_chunk_parts) 174 chunk.metadata["chunking_strategy"] = "code" 175 chunk.metadata["language"] = self.language 176 177 chunks.append(chunk) 178 179 if self.logger: 180 self.logger.info( 181 f"Created {len(chunks)} code chunks", 182 total_blocks=len(code_blocks), 183 avg_blocks_per_chunk=len(code_blocks) / len(chunks) if chunks else 0 184 ) 185 186 return chunks
Split code into chunks respecting function and class boundaries.
Args: text: The code text to chunk metadata: Optional metadata to attach to each chunk
Returns: List of Chunk objects
Raises: ValueError: If text is invalid
21class WikiPageChunker(BaseChunker): 22 """ 23 Chunks wiki pages by splitting on heading boundaries. 24 25 Supports three content formats: 26 - ``chunk_markdown(text)`` — Azure DevOps Wiki pages (Markdown ``#`` headings) 27 - ``chunk_html(html)`` — Confluence storage-format HTML (``<h1>``–``<h6>``) 28 - ``chunk(text)`` — plain text with heuristic heading detection 29 30 The heading text is preserved as the first line of each chunk and also 31 stored in ``chunk.metadata["section_heading"]`` and 32 ``chunk.metadata["heading_level"]`` for downstream use. 33 34 Example (Azure DevOps Wiki): 35 ```python 36 from gmf_forge_ai_data.chunkers import WikiPageChunker 37 38 chunker = WikiPageChunker(max_chunk_size=1500, min_chunk_size=200) 39 40 # content is the Markdown string from AzureDevOpsWikiConnector 41 chunks = chunker.chunk_markdown(content, metadata={"page_path": "/Overview"}) 42 ``` 43 44 Example (Confluence): 45 ```python 46 # raw_html is Confluence storage-format body 47 chunks = chunker.chunk_html(raw_html, metadata={"page_id": "12345"}) 48 ``` 49 """ 50 51 # Matches <h1> … <h6> tags in Confluence storage-format HTML 52 _HTML_HEADING = re.compile( 53 r"<h([1-6])[^>]*>(.*?)</h\1>", 54 re.IGNORECASE | re.DOTALL, 55 ) 56 # Matches any remaining HTML tag so we can strip them 57 _HTML_TAG = re.compile(r"<[^>]+>") 58 # Decode the most common HTML entities 59 _ENTITIES: Dict[str, str] = { 60 "&": "&", "<": "<", ">": ">", 61 """: '"', "'": "'", " ": " ", 62 "'": "'", 63 } 64 65 def __init__( 66 self, 67 max_chunk_size: int = 1500, 68 min_chunk_size: int = 200, 69 logger=None, 70 ): 71 """ 72 Args: 73 max_chunk_size: Maximum characters per chunk (default: 1500). 74 When a section exceeds this, it is split further 75 at paragraph boundaries. 76 min_chunk_size: Sections smaller than this are merged with the 77 next section before splitting (default: 200). 78 logger: Optional BasicLogger instance. 79 """ 80 super().__init__(logger) 81 82 if max_chunk_size <= 0: 83 raise ValueError("max_chunk_size must be greater than 0") 84 if min_chunk_size < 0: 85 raise ValueError("min_chunk_size must be >= 0") 86 87 self.max_chunk_size = max_chunk_size 88 self.min_chunk_size = min_chunk_size 89 90 # Matches ATX Markdown headings: # … through ###### … 91 _MD_HEADING = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE) 92 93 # ── Public interface ───────────────────────────────────────────────────── 94 95 def chunk_markdown( 96 self, 97 text: str, 98 metadata: Optional[Dict[str, Any]] = None, 99 ) -> List[Chunk]: 100 """ 101 Chunk Markdown wiki content (e.g. Azure DevOps Wiki pages). 102 103 Splits at ATX heading boundaries (``#`` through ``######``). This is 104 the preferred entry point for content from AzureDevOpsWikiConnector. 105 106 Args: 107 text: Markdown content of the wiki page. 108 metadata: Optional metadata to propagate to every chunk. 109 110 Returns: 111 List of Chunk objects with section heading metadata attached. 112 """ 113 self.validate_text(text) 114 sections = self._parse_markdown_sections(text) 115 return self._sections_to_chunks(sections, metadata or {}) 116 117 def chunk_html( 118 self, 119 html: str, 120 metadata: Optional[Dict[str, Any]] = None, 121 ) -> List[Chunk]: 122 """ 123 Chunk raw Confluence storage-format HTML directly. 124 125 This is the preferred entry point when working with content returned 126 by ConfluenceConnector before HTML stripping has been applied. 127 Headings are extracted from the HTML tags before stripping, which 128 gives more reliable section boundaries than heuristic detection. 129 130 Args: 131 html: Confluence storage-format HTML body. 132 metadata: Optional metadata to propagate to every chunk. 133 134 Returns: 135 List of Chunk objects with section heading metadata attached. 136 """ 137 sections = self._parse_html_sections(html) 138 return self._sections_to_chunks(sections, metadata or {}) 139 140 def chunk( 141 self, 142 text: str, 143 metadata: Optional[Dict[str, Any]] = None, 144 ) -> List[Chunk]: 145 """ 146 Chunk plain text that has already been HTML-stripped. 147 148 Falls back to heuristic heading detection (short lines followed by 149 a blank line). Prefer ``chunk_html()`` when the original HTML is 150 still available. 151 152 Args: 153 text: Plain-text content of a wiki page. 154 metadata: Optional metadata to propagate to every chunk. 155 156 Returns: 157 List of Chunk objects. 158 """ 159 self.validate_text(text) 160 sections = self._parse_text_sections(text) 161 return self._sections_to_chunks(sections, metadata or {}) 162 163 # ── Section parsing ────────────────────────────────────────────────────── 164 165 def _parse_html_sections( 166 self, html: str 167 ) -> List[Tuple[int, str, str]]: 168 """ 169 Split HTML into (level, heading_text, body_text) tuples. 170 171 Splits at each <h1>–<h6> tag boundary. Content before the first 172 heading is emitted as a level-0 section with an empty heading. 173 """ 174 sections: List[Tuple[int, str, str]] = [] 175 last_end = 0 176 current_level = 0 177 current_heading = "" 178 current_body_parts: List[str] = [] 179 180 for match in self._HTML_HEADING.finditer(html): 181 # Flush whatever came before this heading 182 body_html = html[last_end : match.start()] 183 current_body_parts.append(body_html) 184 body_text = self._clean_html("".join(current_body_parts)) 185 if body_text.strip() or current_heading: 186 sections.append((current_level, current_heading, body_text)) 187 188 current_level = int(match.group(1)) 189 current_heading = self._clean_html(match.group(2)) 190 current_body_parts = [] 191 last_end = match.end() 192 193 # Flush trailing content after the last heading 194 body_html = html[last_end:] 195 current_body_parts.append(body_html) 196 body_text = self._clean_html("".join(current_body_parts)) 197 if body_text.strip() or current_heading: 198 sections.append((current_level, current_heading, body_text)) 199 200 return sections 201 202 def _parse_markdown_sections( 203 self, text: str 204 ) -> List[Tuple[int, str, str]]: 205 """ 206 Split Markdown text into (level, heading_text, body_text) tuples. 207 208 Splits at ATX heading boundaries (``# …`` through ``###### …``). 209 Content before the first heading is emitted as a level-0 section. 210 Heading markers are stripped from the heading text stored in metadata. 211 """ 212 sections: List[Tuple[int, str, str]] = [] 213 last_end = 0 214 current_level = 0 215 current_heading = "" 216 current_body_parts: List[str] = [] 217 218 for match in self._MD_HEADING.finditer(text): 219 body = text[last_end : match.start()] 220 current_body_parts.append(body) 221 body_text = "".join(current_body_parts).strip() 222 if body_text or current_heading: 223 sections.append((current_level, current_heading, body_text)) 224 225 current_level = len(match.group(1)) 226 current_heading = match.group(2).strip() 227 current_body_parts = [] 228 last_end = match.end() 229 230 # Flush trailing content after the last heading 231 trailing = text[last_end:].strip() 232 if trailing or current_heading: 233 sections.append((current_level, current_heading, trailing)) 234 235 return sections 236 237 def _parse_text_sections( 238 self, text: str 239 ) -> List[Tuple[int, str, str]]: 240 """ 241 Heuristic heading detection for already-stripped plain text. 242 243 A line is treated as a heading if it: 244 - Is non-empty and ≤ 80 characters 245 - Does not end with sentence-ending punctuation (. ! ? , ; :) 246 - Is followed by a blank line or is the first line of the text 247 248 All detected headings are emitted at level 1 (no depth information 249 is available in plain text). 250 """ 251 sections: List[Tuple[int, str, str]] = [] 252 lines = text.splitlines() 253 current_heading = "" 254 current_level = 0 255 current_body: List[str] = [] 256 257 def _flush(): 258 body = "\n".join(current_body).strip() 259 if body or current_heading: 260 sections.append((current_level, current_heading, body)) 261 262 for i, line in enumerate(lines): 263 stripped = line.strip() 264 next_line = lines[i + 1].strip() if i + 1 < len(lines) else "" 265 266 is_heading = ( 267 stripped 268 and len(stripped) <= 80 269 and not stripped[-1] in ".!?,;:" 270 and (not next_line or i == 0) 271 ) 272 273 if is_heading: 274 _flush() 275 current_heading = stripped 276 current_level = 1 277 current_body = [] 278 else: 279 current_body.append(line) 280 281 _flush() 282 return sections 283 284 # ── Chunk assembly ─────────────────────────────────────────────────────── 285 286 def _sections_to_chunks( 287 self, 288 sections: List[Tuple[int, str, str]], 289 metadata: Dict[str, Any], 290 ) -> List[Chunk]: 291 """ 292 Merge small sections, split large ones, and produce final Chunk list. 293 """ 294 if not sections: 295 return [] 296 297 # Merge sections smaller than min_chunk_size into the next one 298 merged = self._merge_small_sections(sections) 299 300 chunks: List[Chunk] = [] 301 chunk_index = 0 302 303 for level, heading, body in merged: 304 section_text = f"{heading}\n{body}".strip() if heading else body.strip() 305 if not section_text: 306 continue 307 308 # Split large sections at paragraph boundaries 309 parts = self._split_on_paragraphs(section_text) 310 311 for part in parts: 312 if not part.strip(): 313 continue 314 chunk = Chunk( 315 text=part.strip(), 316 metadata={ 317 **metadata, 318 "section_heading": heading, 319 "heading_level": level, 320 "chunking_strategy": "wiki", 321 }, 322 start_pos=0, 323 end_pos=len(part), 324 chunk_id=self._generate_chunk_id(chunk_index, metadata), 325 ) 326 chunks.append(chunk) 327 chunk_index += 1 328 329 if self.logger: 330 self.logger.info( 331 "WikiPageChunker: created chunks", 332 chunk_count=len(chunks), 333 section_count=len(sections), 334 ) 335 336 return chunks 337 338 def _merge_small_sections( 339 self, sections: List[Tuple[int, str, str]] 340 ) -> List[Tuple[int, str, str]]: 341 """Merge sections whose body is shorter than min_chunk_size into the next.""" 342 if not sections: 343 return sections 344 345 merged: List[Tuple[int, str, str]] = [] 346 pending_level, pending_heading, pending_body = sections[0] 347 348 for level, heading, body in sections[1:]: 349 if len(pending_body.strip()) < self.min_chunk_size: 350 # Append this section to the pending one 351 separator = "\n\n" if pending_body.strip() else "" 352 if heading: 353 pending_body = ( 354 pending_body + separator + heading + "\n" + body 355 ) 356 else: 357 pending_body = pending_body + separator + body 358 else: 359 merged.append((pending_level, pending_heading, pending_body)) 360 pending_level, pending_heading, pending_body = level, heading, body 361 362 merged.append((pending_level, pending_heading, pending_body)) 363 return merged 364 365 def _split_on_paragraphs(self, text: str) -> List[str]: 366 """ 367 Split text that exceeds max_chunk_size at paragraph (double newline) boundaries. 368 Paragraphs that individually exceed max_chunk_size are further split at 369 single-newline boundaries, then at fixed character boundaries as a last 370 resort (handles code blocks and tables with no blank lines). 371 Returns the original text as a single-element list if it fits. 372 """ 373 if len(text) <= self.max_chunk_size: 374 return [text] 375 376 paragraphs = re.split(r"\n\n+", text) 377 parts: List[str] = [] 378 current_parts: List[str] = [] 379 current_size = 0 380 381 for para in paragraphs: 382 para_size = len(para) 383 if current_size + para_size > self.max_chunk_size and current_parts: 384 parts.append("\n\n".join(current_parts)) 385 current_parts = [] 386 current_size = 0 387 # A single paragraph larger than the limit must be split further 388 if para_size > self.max_chunk_size: 389 if current_parts: 390 parts.append("\n\n".join(current_parts)) 391 current_parts = [] 392 current_size = 0 393 parts.extend(self._split_long_paragraph(para)) 394 else: 395 current_parts.append(para) 396 current_size += para_size + 2 # +2 for \n\n 397 398 if current_parts: 399 parts.append("\n\n".join(current_parts)) 400 401 return parts 402 403 def _split_long_paragraph(self, text: str) -> List[str]: 404 """ 405 Split a paragraph that exceeds max_chunk_size. 406 407 Tries single-newline boundaries first (handles tables, code blocks, etc.). 408 Falls back to fixed character boundaries for lines that are still too long. 409 """ 410 if len(text) <= self.max_chunk_size: 411 return [text] 412 413 lines = text.split("\n") 414 parts: List[str] = [] 415 current_lines: List[str] = [] 416 current_size = 0 417 418 for line in lines: 419 line_size = len(line) 420 if current_size + line_size > self.max_chunk_size and current_lines: 421 parts.append("\n".join(current_lines)) 422 current_lines = [] 423 current_size = 0 424 # A single line still over the limit: split at character boundaries 425 if line_size > self.max_chunk_size: 426 if current_lines: 427 parts.append("\n".join(current_lines)) 428 current_lines = [] 429 current_size = 0 430 for i in range(0, line_size, self.max_chunk_size): 431 parts.append(line[i : i + self.max_chunk_size]) 432 else: 433 current_lines.append(line) 434 current_size += line_size + 1 # +1 for \n 435 436 if current_lines: 437 parts.append("\n".join(current_lines)) 438 439 return parts 440 441 # ── HTML utility ───────────────────────────────────────────────────────── 442 443 def _clean_html(self, html: str) -> str: 444 """Strip HTML tags and decode common entities to produce plain text.""" 445 text = self._HTML_TAG.sub(" ", html) 446 for entity, char in self._ENTITIES.items(): 447 text = text.replace(entity, char) 448 # Collapse runs of whitespace but preserve newlines 449 text = re.sub(r"[ \t]+", " ", text) 450 text = re.sub(r"\n{3,}", "\n\n", text) 451 return text
Chunks wiki pages by splitting on heading boundaries.
Supports three content formats:
chunk_markdown(text)— Azure DevOps Wiki pages (Markdown#headings)chunk_html(html)— Confluence storage-format HTML (<h1>–<h6>)chunk(text)— plain text with heuristic heading detection
The heading text is preserved as the first line of each chunk and also
stored in chunk.metadata["section_heading"] and
chunk.metadata["heading_level"] for downstream use.
Example (Azure DevOps Wiki):
from gmf_forge_ai_data.chunkers import WikiPageChunker
chunker = WikiPageChunker(max_chunk_size=1500, min_chunk_size=200)
# content is the Markdown string from AzureDevOpsWikiConnector
chunks = chunker.chunk_markdown(content, metadata={"page_path": "/Overview"})
Example (Confluence):
# raw_html is Confluence storage-format body
chunks = chunker.chunk_html(raw_html, metadata={"page_id": "12345"})
65 def __init__( 66 self, 67 max_chunk_size: int = 1500, 68 min_chunk_size: int = 200, 69 logger=None, 70 ): 71 """ 72 Args: 73 max_chunk_size: Maximum characters per chunk (default: 1500). 74 When a section exceeds this, it is split further 75 at paragraph boundaries. 76 min_chunk_size: Sections smaller than this are merged with the 77 next section before splitting (default: 200). 78 logger: Optional BasicLogger instance. 79 """ 80 super().__init__(logger) 81 82 if max_chunk_size <= 0: 83 raise ValueError("max_chunk_size must be greater than 0") 84 if min_chunk_size < 0: 85 raise ValueError("min_chunk_size must be >= 0") 86 87 self.max_chunk_size = max_chunk_size 88 self.min_chunk_size = min_chunk_size
Args: max_chunk_size: Maximum characters per chunk (default: 1500). When a section exceeds this, it is split further at paragraph boundaries. min_chunk_size: Sections smaller than this are merged with the next section before splitting (default: 200). logger: Optional BasicLogger instance.
95 def chunk_markdown( 96 self, 97 text: str, 98 metadata: Optional[Dict[str, Any]] = None, 99 ) -> List[Chunk]: 100 """ 101 Chunk Markdown wiki content (e.g. Azure DevOps Wiki pages). 102 103 Splits at ATX heading boundaries (``#`` through ``######``). This is 104 the preferred entry point for content from AzureDevOpsWikiConnector. 105 106 Args: 107 text: Markdown content of the wiki page. 108 metadata: Optional metadata to propagate to every chunk. 109 110 Returns: 111 List of Chunk objects with section heading metadata attached. 112 """ 113 self.validate_text(text) 114 sections = self._parse_markdown_sections(text) 115 return self._sections_to_chunks(sections, metadata or {})
Chunk Markdown wiki content (e.g. Azure DevOps Wiki pages).
Splits at ATX heading boundaries (# through ######). This is
the preferred entry point for content from AzureDevOpsWikiConnector.
Args: text: Markdown content of the wiki page. metadata: Optional metadata to propagate to every chunk.
Returns: List of Chunk objects with section heading metadata attached.
117 def chunk_html( 118 self, 119 html: str, 120 metadata: Optional[Dict[str, Any]] = None, 121 ) -> List[Chunk]: 122 """ 123 Chunk raw Confluence storage-format HTML directly. 124 125 This is the preferred entry point when working with content returned 126 by ConfluenceConnector before HTML stripping has been applied. 127 Headings are extracted from the HTML tags before stripping, which 128 gives more reliable section boundaries than heuristic detection. 129 130 Args: 131 html: Confluence storage-format HTML body. 132 metadata: Optional metadata to propagate to every chunk. 133 134 Returns: 135 List of Chunk objects with section heading metadata attached. 136 """ 137 sections = self._parse_html_sections(html) 138 return self._sections_to_chunks(sections, metadata or {})
Chunk raw Confluence storage-format HTML directly.
This is the preferred entry point when working with content returned by ConfluenceConnector before HTML stripping has been applied. Headings are extracted from the HTML tags before stripping, which gives more reliable section boundaries than heuristic detection.
Args: html: Confluence storage-format HTML body. metadata: Optional metadata to propagate to every chunk.
Returns: List of Chunk objects with section heading metadata attached.
140 def chunk( 141 self, 142 text: str, 143 metadata: Optional[Dict[str, Any]] = None, 144 ) -> List[Chunk]: 145 """ 146 Chunk plain text that has already been HTML-stripped. 147 148 Falls back to heuristic heading detection (short lines followed by 149 a blank line). Prefer ``chunk_html()`` when the original HTML is 150 still available. 151 152 Args: 153 text: Plain-text content of a wiki page. 154 metadata: Optional metadata to propagate to every chunk. 155 156 Returns: 157 List of Chunk objects. 158 """ 159 self.validate_text(text) 160 sections = self._parse_text_sections(text) 161 return self._sections_to_chunks(sections, metadata or {})
Chunk plain text that has already been HTML-stripped.
Falls back to heuristic heading detection (short lines followed by
a blank line). Prefer chunk_html() when the original HTML is
still available.
Args: text: Plain-text content of a wiki page. metadata: Optional metadata to propagate to every chunk.
Returns: List of Chunk objects.