gmf_forge_ai_data.chunkers

Text processing and chunking module for RAG applications.

This module provides various text chunking strategies for splitting documents into manageable pieces for embedding and retrieval.

 1"""
 2Text processing and chunking module for RAG applications.
 3
 4This module provides various text chunking strategies for splitting documents
 5into manageable pieces for embedding and retrieval.
 6"""
 7
 8from .base_chunker import Chunk, BaseChunker
 9from .fixed_size_chunker import FixedSizeChunker
10from .semantic_chunker import SemanticChunker
11from .recursive_chunker import RecursiveChunker
12from .sentence_chunker import SentenceChunker
13from .markdown_chunker import MarkdownChunker
14from .code_chunker import CodeChunker
15from .wiki_chunker import WikiPageChunker
16
17__all__ = [
18    # Data structures
19    "Chunk",
20    
21    # Base class
22    "BaseChunker",
23    
24    # Chunking strategies
25    "FixedSizeChunker",
26    "SemanticChunker",
27    "RecursiveChunker",
28    "SentenceChunker",
29    "MarkdownChunker",
30    "CodeChunker",
31    "WikiPageChunker",
32]
@dataclass
class Chunk:
14@dataclass
15class Chunk:
16    """
17    Represents a chunk of text with associated metadata.
18    
19    Attributes:
20        text: The actual text content of the chunk
21        metadata: Dictionary of metadata (source, page number, etc.)
22        start_pos: Character position where chunk starts in original text
23        end_pos: Character position where chunk ends in original text
24        chunk_id: Unique identifier for the chunk
25    """
26    text: str
27    metadata: Dict[str, Any] = field(default_factory=dict)
28    start_pos: int = 0
29    end_pos: int = 0
30    chunk_id: str = ""
31    
32    def __post_init__(self):
33        """Validate chunk after initialization."""
34        if not self.text:
35            raise ValueError("Chunk text cannot be empty")
36        if self.end_pos < self.start_pos:
37            raise ValueError(f"end_pos ({self.end_pos}) cannot be less than start_pos ({self.start_pos})")
38    
39    def __len__(self) -> int:
40        """Return the length of the chunk text."""
41        return len(self.text)
42    
43    def __str__(self) -> str:
44        """Return a string representation of the chunk."""
45        preview = self.text[:50] + "..." if len(self.text) > 50 else self.text
46        return f"Chunk(id={self.chunk_id}, len={len(self.text)}, text='{preview}')"

Represents a chunk of text with associated metadata.

Attributes: text: The actual text content of the chunk metadata: Dictionary of metadata (source, page number, etc.) start_pos: Character position where chunk starts in original text end_pos: Character position where chunk ends in original text chunk_id: Unique identifier for the chunk

Chunk( text: str, metadata: Dict[str, Any] = <factory>, start_pos: int = 0, end_pos: int = 0, chunk_id: str = '')
text: str
metadata: Dict[str, Any]
start_pos: int = 0
end_pos: int = 0
chunk_id: str = ''
class BaseChunker(abc.ABC):
 49class BaseChunker(ABC):
 50    """
 51    Abstract base class for all text chunking strategies.
 52    
 53    All chunking implementations must inherit from this class and implement
 54    the chunk() method.
 55    """
 56    
 57    def __init__(self, logger=None):
 58        """
 59        Initialize the base chunker.
 60        
 61        Args:
 62            logger: Optional BasicLogger instance for structured logging
 63        """
 64        self.logger = logger
 65    
 66    @abstractmethod
 67    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 68        """
 69        Split text into chunks according to the chunking strategy.
 70        
 71        Args:
 72            text: The input text to chunk
 73            metadata: Optional metadata to attach to each chunk
 74            
 75        Returns:
 76            List of Chunk objects
 77            
 78        Raises:
 79            ValueError: If text is empty or None
 80        """
 81        pass
 82    
 83    def validate_text(self, text: str) -> None:
 84        """
 85        Validate input text before chunking.
 86        
 87        Args:
 88            text: The text to validate
 89            
 90        Raises:
 91            ValueError: If text is None, empty, or not a string
 92        """
 93        if text is None:
 94            raise ValueError("Text cannot be None")
 95        if not isinstance(text, str):
 96            raise ValueError(f"Text must be a string, got {type(text)}")
 97        if not text.strip():
 98            raise ValueError("Text cannot be empty or whitespace only")
 99    
100    def _generate_chunk_id(self, index: int, metadata: Optional[Dict[str, Any]] = None) -> str:
101        """
102        Generate a unique chunk ID.
103        
104        Args:
105            index: The index of the chunk in the sequence
106            metadata: Optional metadata that may contain source information
107            
108        Returns:
109            A unique chunk identifier
110        """
111        if metadata and "source" in metadata:
112            return f"{metadata['source']}_chunk_{index}"
113        return f"chunk_{index}"

Abstract base class for all text chunking strategies.

All chunking implementations must inherit from this class and implement the chunk() method.

BaseChunker(logger=None)
57    def __init__(self, logger=None):
58        """
59        Initialize the base chunker.
60        
61        Args:
62            logger: Optional BasicLogger instance for structured logging
63        """
64        self.logger = logger

Initialize the base chunker.

Args: logger: Optional BasicLogger instance for structured logging

logger
@abstractmethod
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
66    @abstractmethod
67    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
68        """
69        Split text into chunks according to the chunking strategy.
70        
71        Args:
72            text: The input text to chunk
73            metadata: Optional metadata to attach to each chunk
74            
75        Returns:
76            List of Chunk objects
77            
78        Raises:
79            ValueError: If text is empty or None
80        """
81        pass

Split text into chunks according to the chunking strategy.

Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is empty or None

def validate_text(self, text: str) -> None:
83    def validate_text(self, text: str) -> None:
84        """
85        Validate input text before chunking.
86        
87        Args:
88            text: The text to validate
89            
90        Raises:
91            ValueError: If text is None, empty, or not a string
92        """
93        if text is None:
94            raise ValueError("Text cannot be None")
95        if not isinstance(text, str):
96            raise ValueError(f"Text must be a string, got {type(text)}")
97        if not text.strip():
98            raise ValueError("Text cannot be empty or whitespace only")

Validate input text before chunking.

Args: text: The text to validate

Raises: ValueError: If text is None, empty, or not a string

class FixedSizeChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 15class FixedSizeChunker(BaseChunker):
 16    """
 17    Chunks text into fixed-size token-based segments with optional overlap.
 18    
 19    This is one of the most common chunking strategies for LLM applications,
 20    ensuring chunks don't exceed model context windows or embedding limits.
 21    """
 22    
 23    def __init__(
 24        self,
 25        chunk_size: int = 512,
 26        chunk_overlap: int = 50,
 27        encoding_name: str = "cl100k_base",
 28        logger=None
 29    ):
 30        """
 31        Initialize the fixed-size chunker.
 32        
 33        Args:
 34            chunk_size: Maximum number of tokens per chunk (default: 512)
 35            chunk_overlap: Number of tokens to overlap between chunks (default: 50)
 36            encoding_name: tiktoken encoding name (default: "cl100k_base" for GPT-3.5/4)
 37                          Options: "cl100k_base" (GPT-3.5, GPT-4)
 38                                   "p50k_base" (CodeX, GPT-3)
 39                                   "r50k_base" (GPT-2)
 40            logger: Optional BasicLogger instance
 41            
 42        Raises:
 43            ValueError: If chunk_size <= 0 or chunk_overlap >= chunk_size
 44        """
 45        super().__init__(logger)
 46        
 47        if chunk_size <= 0:
 48            raise ValueError("chunk_size must be greater than 0")
 49        if chunk_overlap < 0:
 50            raise ValueError("chunk_overlap must be non-negative")
 51        if chunk_overlap >= chunk_size:
 52            raise ValueError("chunk_overlap must be less than chunk_size")
 53        
 54        self.chunk_size = chunk_size
 55        self.chunk_overlap = chunk_overlap
 56        self.encoding_name = encoding_name
 57        
 58        try:
 59            self.encoding = tiktoken.get_encoding(encoding_name)
 60        except Exception as e:
 61            raise ValueError(f"Failed to load tiktoken encoding '{encoding_name}': {e}")
 62        
 63        if self.logger:
 64            self.logger.info(
 65                "Initialized FixedSizeChunker",
 66                chunk_size=chunk_size,
 67                chunk_overlap=chunk_overlap,
 68                encoding=encoding_name
 69            )
 70    
 71    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 72        """
 73        Split text into fixed-size token-based chunks with overlap.
 74        
 75        Args:
 76            text: The input text to chunk
 77            metadata: Optional metadata to attach to each chunk
 78            
 79        Returns:
 80            List of Chunk objects
 81            
 82        Raises:
 83            ValueError: If text is invalid
 84        """
 85        self.validate_text(text)
 86        
 87        if metadata is None:
 88            metadata = {}
 89        
 90        if self.logger:
 91            self.logger.debug(f"Chunking text of length {len(text)} characters")
 92        
 93        # Encode text to tokens
 94        tokens = self.encoding.encode(text)
 95        total_tokens = len(tokens)
 96        
 97        if self.logger:
 98            self.logger.debug(f"Text tokenized to {total_tokens} tokens")
 99        
100        chunks = []
101        start_idx = 0
102        chunk_index = 0
103        
104        while start_idx < total_tokens:
105            # Calculate end index for this chunk
106            end_idx = min(start_idx + self.chunk_size, total_tokens)
107            
108            # Extract token slice
109            chunk_tokens = tokens[start_idx:end_idx]
110            
111            # Decode tokens back to text
112            chunk_text = self.encoding.decode(chunk_tokens)
113            
114            # Find character positions in original text
115            # This is approximate since token boundaries don't always align with char boundaries
116            char_start = len(self.encoding.decode(tokens[:start_idx]))
117            char_end = char_start + len(chunk_text)
118            
119            # Create chunk
120            chunk = Chunk(
121                text=chunk_text,
122                metadata=metadata.copy(),
123                start_pos=char_start,
124                end_pos=char_end,
125                chunk_id=self._generate_chunk_id(chunk_index, metadata)
126            )
127            
128            # Add token count to metadata
129            chunk.metadata["token_count"] = len(chunk_tokens)
130            chunk.metadata["chunking_strategy"] = "fixed_size"
131            
132            chunks.append(chunk)
133            chunk_index += 1
134            
135            # Move start index forward, accounting for overlap
136            start_idx += self.chunk_size - self.chunk_overlap
137            
138            # Break if we've reached the end
139            if end_idx >= total_tokens:
140                break
141        
142        if self.logger:
143            self.logger.info(
144                f"Created {len(chunks)} chunks",
145                total_tokens=total_tokens,
146                avg_tokens_per_chunk=total_tokens / len(chunks) if chunks else 0
147            )
148        
149        return chunks
150    
151    def count_tokens(self, text: str) -> int:
152        """
153        Count the number of tokens in a text string.
154        
155        Args:
156            text: The text to count tokens for
157            
158        Returns:
159            Number of tokens
160        """
161        return len(self.encoding.encode(text))

Chunks text into fixed-size token-based segments with optional overlap.

This is one of the most common chunking strategies for LLM applications, ensuring chunks don't exceed model context windows or embedding limits.

FixedSizeChunker( chunk_size: int = 512, chunk_overlap: int = 50, encoding_name: str = 'cl100k_base', logger=None)
23    def __init__(
24        self,
25        chunk_size: int = 512,
26        chunk_overlap: int = 50,
27        encoding_name: str = "cl100k_base",
28        logger=None
29    ):
30        """
31        Initialize the fixed-size chunker.
32        
33        Args:
34            chunk_size: Maximum number of tokens per chunk (default: 512)
35            chunk_overlap: Number of tokens to overlap between chunks (default: 50)
36            encoding_name: tiktoken encoding name (default: "cl100k_base" for GPT-3.5/4)
37                          Options: "cl100k_base" (GPT-3.5, GPT-4)
38                                   "p50k_base" (CodeX, GPT-3)
39                                   "r50k_base" (GPT-2)
40            logger: Optional BasicLogger instance
41            
42        Raises:
43            ValueError: If chunk_size <= 0 or chunk_overlap >= chunk_size
44        """
45        super().__init__(logger)
46        
47        if chunk_size <= 0:
48            raise ValueError("chunk_size must be greater than 0")
49        if chunk_overlap < 0:
50            raise ValueError("chunk_overlap must be non-negative")
51        if chunk_overlap >= chunk_size:
52            raise ValueError("chunk_overlap must be less than chunk_size")
53        
54        self.chunk_size = chunk_size
55        self.chunk_overlap = chunk_overlap
56        self.encoding_name = encoding_name
57        
58        try:
59            self.encoding = tiktoken.get_encoding(encoding_name)
60        except Exception as e:
61            raise ValueError(f"Failed to load tiktoken encoding '{encoding_name}': {e}")
62        
63        if self.logger:
64            self.logger.info(
65                "Initialized FixedSizeChunker",
66                chunk_size=chunk_size,
67                chunk_overlap=chunk_overlap,
68                encoding=encoding_name
69            )

Initialize the fixed-size chunker.

Args: chunk_size: Maximum number of tokens per chunk (default: 512) chunk_overlap: Number of tokens to overlap between chunks (default: 50) encoding_name: tiktoken encoding name (default: "cl100k_base" for GPT-3.5/4) Options: "cl100k_base" (GPT-3.5, GPT-4) "p50k_base" (CodeX, GPT-3) "r50k_base" (GPT-2) logger: Optional BasicLogger instance

Raises: ValueError: If chunk_size <= 0 or chunk_overlap >= chunk_size

chunk_size
chunk_overlap
encoding_name
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 71    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 72        """
 73        Split text into fixed-size token-based chunks with overlap.
 74        
 75        Args:
 76            text: The input text to chunk
 77            metadata: Optional metadata to attach to each chunk
 78            
 79        Returns:
 80            List of Chunk objects
 81            
 82        Raises:
 83            ValueError: If text is invalid
 84        """
 85        self.validate_text(text)
 86        
 87        if metadata is None:
 88            metadata = {}
 89        
 90        if self.logger:
 91            self.logger.debug(f"Chunking text of length {len(text)} characters")
 92        
 93        # Encode text to tokens
 94        tokens = self.encoding.encode(text)
 95        total_tokens = len(tokens)
 96        
 97        if self.logger:
 98            self.logger.debug(f"Text tokenized to {total_tokens} tokens")
 99        
100        chunks = []
101        start_idx = 0
102        chunk_index = 0
103        
104        while start_idx < total_tokens:
105            # Calculate end index for this chunk
106            end_idx = min(start_idx + self.chunk_size, total_tokens)
107            
108            # Extract token slice
109            chunk_tokens = tokens[start_idx:end_idx]
110            
111            # Decode tokens back to text
112            chunk_text = self.encoding.decode(chunk_tokens)
113            
114            # Find character positions in original text
115            # This is approximate since token boundaries don't always align with char boundaries
116            char_start = len(self.encoding.decode(tokens[:start_idx]))
117            char_end = char_start + len(chunk_text)
118            
119            # Create chunk
120            chunk = Chunk(
121                text=chunk_text,
122                metadata=metadata.copy(),
123                start_pos=char_start,
124                end_pos=char_end,
125                chunk_id=self._generate_chunk_id(chunk_index, metadata)
126            )
127            
128            # Add token count to metadata
129            chunk.metadata["token_count"] = len(chunk_tokens)
130            chunk.metadata["chunking_strategy"] = "fixed_size"
131            
132            chunks.append(chunk)
133            chunk_index += 1
134            
135            # Move start index forward, accounting for overlap
136            start_idx += self.chunk_size - self.chunk_overlap
137            
138            # Break if we've reached the end
139            if end_idx >= total_tokens:
140                break
141        
142        if self.logger:
143            self.logger.info(
144                f"Created {len(chunks)} chunks",
145                total_tokens=total_tokens,
146                avg_tokens_per_chunk=total_tokens / len(chunks) if chunks else 0
147            )
148        
149        return chunks

Split text into fixed-size token-based chunks with overlap.

Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is invalid

def count_tokens(self, text: str) -> int:
151    def count_tokens(self, text: str) -> int:
152        """
153        Count the number of tokens in a text string.
154        
155        Args:
156            text: The text to count tokens for
157            
158        Returns:
159            Number of tokens
160        """
161        return len(self.encoding.encode(text))

Count the number of tokens in a text string.

Args: text: The text to count tokens for

Returns: Number of tokens

class SemanticChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 15class SemanticChunker(BaseChunker):
 16    """
 17    Chunks text based on semantic similarity and sentence boundaries.
 18    
 19    This chunker respects sentence boundaries and can optionally group
 20    semantically related sentences together using embeddings.
 21    
 22    By default uses simple regex for sentence detection. For better accuracy,
 23    pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize).
 24    """
 25    
 26    def __init__(
 27        self,
 28        sentence_tokenizer: Callable[[str], List[str]],
 29        max_chunk_size: int = 1000,
 30        min_chunk_size: int = 100,
 31        similarity_threshold: float = 0.5,
 32        logger=None
 33    ):
 34        """
 35        Initialize the semantic chunker.
 36        
 37        Args:
 38            sentence_tokenizer: REQUIRED callable that splits text into sentences.
 39                              Use nltk.sent_tokenize (recommended), spacy, or custom function.
 40                              Example: nltk.sent_tokenize
 41                              Must return List[str] of sentences.
 42            max_chunk_size: Maximum characters per chunk (default: 1000)
 43            min_chunk_size: Minimum characters per chunk (default: 100)
 44            similarity_threshold: Threshold for semantic similarity (0.0-1.0, default: 0.5)
 45                                 Not used in basic implementation
 46            logger: Optional BasicLogger instance
 47            
 48        Raises:
 49            ValueError: If sentence_tokenizer is not provided
 50        """
 51        super().__init__(logger)
 52        
 53        if sentence_tokenizer is None:
 54            raise ValueError(
 55                "sentence_tokenizer is required. Please provide a sentence tokenization function.\n"
 56                "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n"
 57                "Example: SemanticChunker(sentence_tokenizer=nltk.sent_tokenize)"
 58            )
 59        
 60        if max_chunk_size <= 0:
 61            raise ValueError("max_chunk_size must be greater than 0")
 62        if min_chunk_size < 0:
 63            raise ValueError("min_chunk_size must be non-negative")
 64        if min_chunk_size >= max_chunk_size:
 65            raise ValueError("min_chunk_size must be less than max_chunk_size")
 66        if not (0.0 <= similarity_threshold <= 1.0):
 67            raise ValueError("similarity_threshold must be between 0.0 and 1.0")
 68        
 69        self.sentence_tokenizer = sentence_tokenizer
 70        self.max_chunk_size = max_chunk_size
 71        self.min_chunk_size = min_chunk_size
 72        self.similarity_threshold = similarity_threshold
 73        
 74        if self.logger:
 75            self.logger.info(
 76                "Initialized SemanticChunker",
 77                max_chunk_size=max_chunk_size,
 78                min_chunk_size=min_chunk_size,
 79                tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom'
 80            )
 81    
 82    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 83        """
 84        Split text into chunks based on sentence boundaries and semantic similarity.
 85        
 86        Args:
 87            text: The input text to chunk
 88            metadata: Optional metadata to attach to each chunk
 89            
 90        Returns:
 91            List of Chunk objects
 92            
 93        Raises:
 94            ValueError: If text is invalid
 95        """
 96        self.validate_text(text)
 97        
 98        if metadata is None:
 99            metadata = {}
100        
101        if self.logger:
102            self.logger.debug(f"Chunking text of length {len(text)} characters")
103        
104        # Split into sentences
105        sentences = self._split_into_sentences(text)
106        
107        if self.logger:
108            self.logger.debug(f"Split text into {len(sentences)} sentences")
109        
110        # Group sentences into chunks
111        chunks = []
112        current_chunk_sentences = []
113        current_chunk_size = 0
114        chunk_index = 0
115        char_position = 0
116        
117        for sentence_text, start, end in sentences:
118            sentence_len = len(sentence_text)
119            
120            # Check if adding this sentence would exceed max size
121            if current_chunk_size + sentence_len > self.max_chunk_size and current_chunk_sentences:
122                # Create chunk from accumulated sentences
123                chunk_text = " ".join(current_chunk_sentences)
124                chunk_start = char_position - current_chunk_size
125                
126                chunk = Chunk(
127                    text=chunk_text,
128                    metadata=metadata.copy(),
129                    start_pos=chunk_start,
130                    end_pos=char_position,
131                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
132                )
133                chunk.metadata["sentence_count"] = len(current_chunk_sentences)
134                chunk.metadata["chunking_strategy"] = "semantic"
135                
136                chunks.append(chunk)
137                chunk_index += 1
138                
139                # Start new chunk
140                current_chunk_sentences = []
141                current_chunk_size = 0
142            
143            # Add sentence to current chunk
144            current_chunk_sentences.append(sentence_text)
145            current_chunk_size += sentence_len + 1  # +1 for space
146            char_position = end
147        
148       # Create final chunk if there are remaining sentences
149        if current_chunk_sentences:
150            chunk_text = " ".join(current_chunk_sentences)
151            chunk_start = char_position - current_chunk_size
152            
153            chunk = Chunk(
154                text=chunk_text,
155                metadata=metadata.copy(),
156                start_pos=chunk_start,
157                end_pos=char_position,
158                chunk_id=self._generate_chunk_id(chunk_index, metadata)
159            )
160            chunk.metadata["sentence_count"] = len(current_chunk_sentences)
161            chunk.metadata["chunking_strategy"] = "semantic"
162            
163            chunks.append(chunk)
164        
165        if self.logger:
166            self.logger.info(
167                f"Created {len(chunks)} semantic chunks",
168                total_sentences=len(sentences),
169                avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0
170            )
171        
172        return chunks
173    
174    def _split_into_sentences(self, text: str) -> List[tuple]:
175        """
176        Split text into sentences using provided tokenizer.
177        
178        Args:
179            text: The text to split
180            
181        Returns:
182            List of tuples (sentence_text, start_pos, end_pos)
183            
184        Raises:
185            RuntimeError: If sentence tokenizer fails
186        """
187        try:
188            sentence_texts = self.sentence_tokenizer(text)
189            
190            # Calculate positions for each sentence
191            sentences = []
192            pos = 0
193            for sent_text in sentence_texts:
194                # Find the sentence in the original text
195                idx = text.find(sent_text, pos)
196                if idx != -1:
197                    start = idx
198                    end = idx + len(sent_text)
199                    sentences.append((sent_text, start, end))
200                    pos = end
201                else:
202                    # Fallback: estimate position
203                    sentences.append((sent_text, pos, pos + len(sent_text)))
204                    pos += len(sent_text)
205            
206            return sentences if sentences else [(text.strip(), 0, len(text))]
207            
208        except Exception as e:
209            raise RuntimeError(
210                f"Sentence tokenizer failed: {e}\n"
211                f"Please ensure your tokenizer function is working correctly."
212            ) from e

Chunks text based on semantic similarity and sentence boundaries.

This chunker respects sentence boundaries and can optionally group semantically related sentences together using embeddings.

By default uses simple regex for sentence detection. For better accuracy, pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize).

SemanticChunker( sentence_tokenizer: Callable[[str], List[str]], max_chunk_size: int = 1000, min_chunk_size: int = 100, similarity_threshold: float = 0.5, logger=None)
26    def __init__(
27        self,
28        sentence_tokenizer: Callable[[str], List[str]],
29        max_chunk_size: int = 1000,
30        min_chunk_size: int = 100,
31        similarity_threshold: float = 0.5,
32        logger=None
33    ):
34        """
35        Initialize the semantic chunker.
36        
37        Args:
38            sentence_tokenizer: REQUIRED callable that splits text into sentences.
39                              Use nltk.sent_tokenize (recommended), spacy, or custom function.
40                              Example: nltk.sent_tokenize
41                              Must return List[str] of sentences.
42            max_chunk_size: Maximum characters per chunk (default: 1000)
43            min_chunk_size: Minimum characters per chunk (default: 100)
44            similarity_threshold: Threshold for semantic similarity (0.0-1.0, default: 0.5)
45                                 Not used in basic implementation
46            logger: Optional BasicLogger instance
47            
48        Raises:
49            ValueError: If sentence_tokenizer is not provided
50        """
51        super().__init__(logger)
52        
53        if sentence_tokenizer is None:
54            raise ValueError(
55                "sentence_tokenizer is required. Please provide a sentence tokenization function.\n"
56                "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n"
57                "Example: SemanticChunker(sentence_tokenizer=nltk.sent_tokenize)"
58            )
59        
60        if max_chunk_size <= 0:
61            raise ValueError("max_chunk_size must be greater than 0")
62        if min_chunk_size < 0:
63            raise ValueError("min_chunk_size must be non-negative")
64        if min_chunk_size >= max_chunk_size:
65            raise ValueError("min_chunk_size must be less than max_chunk_size")
66        if not (0.0 <= similarity_threshold <= 1.0):
67            raise ValueError("similarity_threshold must be between 0.0 and 1.0")
68        
69        self.sentence_tokenizer = sentence_tokenizer
70        self.max_chunk_size = max_chunk_size
71        self.min_chunk_size = min_chunk_size
72        self.similarity_threshold = similarity_threshold
73        
74        if self.logger:
75            self.logger.info(
76                "Initialized SemanticChunker",
77                max_chunk_size=max_chunk_size,
78                min_chunk_size=min_chunk_size,
79                tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom'
80            )

Initialize the semantic chunker.

Args: sentence_tokenizer: REQUIRED callable that splits text into sentences. Use nltk.sent_tokenize (recommended), spacy, or custom function. Example: nltk.sent_tokenize Must return List[str] of sentences. max_chunk_size: Maximum characters per chunk (default: 1000) min_chunk_size: Minimum characters per chunk (default: 100) similarity_threshold: Threshold for semantic similarity (0.0-1.0, default: 0.5) Not used in basic implementation logger: Optional BasicLogger instance

Raises: ValueError: If sentence_tokenizer is not provided

sentence_tokenizer
max_chunk_size
min_chunk_size
similarity_threshold
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 82    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 83        """
 84        Split text into chunks based on sentence boundaries and semantic similarity.
 85        
 86        Args:
 87            text: The input text to chunk
 88            metadata: Optional metadata to attach to each chunk
 89            
 90        Returns:
 91            List of Chunk objects
 92            
 93        Raises:
 94            ValueError: If text is invalid
 95        """
 96        self.validate_text(text)
 97        
 98        if metadata is None:
 99            metadata = {}
100        
101        if self.logger:
102            self.logger.debug(f"Chunking text of length {len(text)} characters")
103        
104        # Split into sentences
105        sentences = self._split_into_sentences(text)
106        
107        if self.logger:
108            self.logger.debug(f"Split text into {len(sentences)} sentences")
109        
110        # Group sentences into chunks
111        chunks = []
112        current_chunk_sentences = []
113        current_chunk_size = 0
114        chunk_index = 0
115        char_position = 0
116        
117        for sentence_text, start, end in sentences:
118            sentence_len = len(sentence_text)
119            
120            # Check if adding this sentence would exceed max size
121            if current_chunk_size + sentence_len > self.max_chunk_size and current_chunk_sentences:
122                # Create chunk from accumulated sentences
123                chunk_text = " ".join(current_chunk_sentences)
124                chunk_start = char_position - current_chunk_size
125                
126                chunk = Chunk(
127                    text=chunk_text,
128                    metadata=metadata.copy(),
129                    start_pos=chunk_start,
130                    end_pos=char_position,
131                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
132                )
133                chunk.metadata["sentence_count"] = len(current_chunk_sentences)
134                chunk.metadata["chunking_strategy"] = "semantic"
135                
136                chunks.append(chunk)
137                chunk_index += 1
138                
139                # Start new chunk
140                current_chunk_sentences = []
141                current_chunk_size = 0
142            
143            # Add sentence to current chunk
144            current_chunk_sentences.append(sentence_text)
145            current_chunk_size += sentence_len + 1  # +1 for space
146            char_position = end
147        
148       # Create final chunk if there are remaining sentences
149        if current_chunk_sentences:
150            chunk_text = " ".join(current_chunk_sentences)
151            chunk_start = char_position - current_chunk_size
152            
153            chunk = Chunk(
154                text=chunk_text,
155                metadata=metadata.copy(),
156                start_pos=chunk_start,
157                end_pos=char_position,
158                chunk_id=self._generate_chunk_id(chunk_index, metadata)
159            )
160            chunk.metadata["sentence_count"] = len(current_chunk_sentences)
161            chunk.metadata["chunking_strategy"] = "semantic"
162            
163            chunks.append(chunk)
164        
165        if self.logger:
166            self.logger.info(
167                f"Created {len(chunks)} semantic chunks",
168                total_sentences=len(sentences),
169                avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0
170            )
171        
172        return chunks

Split text into chunks based on sentence boundaries and semantic similarity.

Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is invalid

class RecursiveChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 16class RecursiveChunker(BaseChunker):
 17    """
 18    Recursively chunks text using a hierarchy of separators.
 19    
 20    This chunker attempts to split at natural boundaries in order of preference:
 21    1. Double newlines (paragraphs)
 22    2. Single newlines (lines)
 23    3. Sentence boundaries
 24    4. Word boundaries
 25    5. Character boundaries (last resort)
 26    
 27    This preserves document structure while ensuring chunks don't exceed the maximum size.
 28    """
 29    
 30    def __init__(
 31        self,
 32        chunk_size: int = 1000,
 33        chunk_overlap: int = 100,
 34        separators: Optional[List[str]] = None,
 35        logger=None
 36    ):
 37        """
 38        Initialize the recursive chunker.
 39        
 40        Args:
 41            chunk_size: Target maximum characters per chunk (default: 1000)
 42            chunk_overlap: Characters to overlap between chunks (default: 100)
 43            separators: List of separators in priority order (default: standard hierarchy)
 44            logger: Optional BasicLogger instance
 45        """
 46        super().__init__(logger)
 47        
 48        if chunk_size <= 0:
 49            raise ValueError("chunk_size must be greater than 0")
 50        if chunk_overlap < 0:
 51            raise ValueError("chunk_overlap must be non-negative")
 52        if chunk_overlap >= chunk_size:
 53            raise ValueError("chunk_overlap must be less than chunk_size")
 54        
 55        self.chunk_size = chunk_size
 56        self.chunk_overlap = chunk_overlap
 57        
 58        # Default separator hierarchy if not provided
 59        if separators is None:
 60            self.separators = [
 61                "\n\n",  # Paragraphs
 62                "\n",    # Lines
 63                ". ",    # Sentences
 64                " ",     # Words
 65                ""       # Characters (no separator)
 66            ]
 67        else:
 68            self.separators = separators
 69        
 70        if self.logger:
 71            self.logger.info(
 72                "Initialized RecursiveChunker",
 73                chunk_size=chunk_size,
 74                chunk_overlap=chunk_overlap,
 75                num_separators=len(self.separators)
 76            )
 77    
 78    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 79        """
 80        Recursively split text into chunks using hierarchical separators.
 81        
 82        Args:
 83            text: The input text to chunk
 84            metadata: Optional metadata to attach to each chunk
 85            
 86        Returns:
 87            List of Chunk objects
 88            
 89        Raises:
 90            ValueError: If text is invalid
 91        """
 92        self.validate_text(text)
 93        
 94        if metadata is None:
 95            metadata = {}
 96        
 97        if self.logger:
 98            self.logger.debug(f"Recursively chunking text of length {len(text)} characters")
 99        
100        # Perform recursive splitting
101        text_chunks = self._split_text_recursively(text)
102        
103        # Convert text chunks to Chunk objects with metadata
104        chunks = []
105        char_position = 0
106        
107        for i, chunk_text in enumerate(text_chunks):
108            chunk = Chunk(
109                text=chunk_text,
110                metadata=metadata.copy(),
111                start_pos=char_position,
112                end_pos=char_position + len(chunk_text),
113                chunk_id=self._generate_chunk_id(i, metadata)
114            )
115            chunk.metadata["chunking_strategy"] = "recursive"
116            chunks.append(chunk)
117            
118            # Update position accounting for overlap
119            char_position += len(chunk_text) - self.chunk_overlap
120        
121        if self.logger:
122            self.logger.info(
123                f"Created {len(chunks)} recursive chunks",
124                avg_chunk_size=sum(len(c.text) for c in chunks) / len(chunks) if chunks else 0
125            )
126        
127        return chunks
128    
129    def _split_text_recursively(self, text: str) -> List[str]:
130        """
131        Recursively split text using the separator hierarchy.
132        
133        Args:
134            text: Text to split
135            
136        Returns:
137            List of text chunks
138        """
139        return self._split_text(text, self.separators)
140    
141    def _split_text(self, text: str, separators: List[str]) -> List[str]:
142        """
143        Split text using the given separators recursively.
144        
145        Args:
146            text: Text to split
147            separators: Remaining separators to try
148            
149        Returns:
150            List of text chunks
151        """
152        final_chunks = []
153        
154        # Choose separator (last one if list is exhausted)
155        separator = separators[-1] if separators else ""
156        
157        # Split by current separator
158        if separator:
159            splits = text.split(separator)
160        else:
161            # No separator: split by characters
162            splits = list(text)
163        
164        # Process each split segment
165        current_chunk = []
166        for split in splits:
167            # Add separator back (except for character-level splitting)
168            if separator and current_chunk:
169                split = separator + split
170            
171            # If this split is small enough, accumulate it
172            current_size = sum(len(s) for s in current_chunk)
173            
174            if current_size + len(split) <= self.chunk_size:
175                current_chunk.append(split)
176            else:
177                # Current chunk is ready
178                if current_chunk:
179                    merged = "".join(current_chunk) if not separator else separator.join(current_chunk)
180                    if separator == " ":
181                        merged = " ".join(c.strip() for c in current_chunk if c.strip())
182                    
183                    if merged.strip():
184                        final_chunks.append(merged)
185                    current_chunk = []
186                
187                # Check if this split itself needs to be broken down
188                if len(split) > self.chunk_size:
189                    if len(separators) > 1:
190                        # Try next separator in hierarchy
191                        sub_chunks = self._split_text(split, separators[1:])
192                        final_chunks.extend(sub_chunks)
193                    else:
194                        # Force split at chunk_size boundaries
195                        for i in range(0, len(split), self.chunk_size):
196                            sub_chunk = split[i:i + self.chunk_size]
197                            if sub_chunk.strip():
198                                final_chunks.append(sub_chunk)
199                else:
200                    current_chunk.append(split)
201        
202        # Add remaining chunk
203        if current_chunk:
204            merged = "".join(current_chunk) if not separator else separator.join(current_chunk)
205            if separator == " ":
206                merged = " ".join(c.strip() for c in current_chunk if c.strip())
207            
208            if merged.strip():
209                final_chunks.append(merged)
210        
211        return final_chunks

Recursively chunks text using a hierarchy of separators.

This chunker attempts to split at natural boundaries in order of preference:

  1. Double newlines (paragraphs)
  2. Single newlines (lines)
  3. Sentence boundaries
  4. Word boundaries
  5. Character boundaries (last resort)

This preserves document structure while ensuring chunks don't exceed the maximum size.

RecursiveChunker( chunk_size: int = 1000, chunk_overlap: int = 100, separators: Optional[List[str]] = None, logger=None)
30    def __init__(
31        self,
32        chunk_size: int = 1000,
33        chunk_overlap: int = 100,
34        separators: Optional[List[str]] = None,
35        logger=None
36    ):
37        """
38        Initialize the recursive chunker.
39        
40        Args:
41            chunk_size: Target maximum characters per chunk (default: 1000)
42            chunk_overlap: Characters to overlap between chunks (default: 100)
43            separators: List of separators in priority order (default: standard hierarchy)
44            logger: Optional BasicLogger instance
45        """
46        super().__init__(logger)
47        
48        if chunk_size <= 0:
49            raise ValueError("chunk_size must be greater than 0")
50        if chunk_overlap < 0:
51            raise ValueError("chunk_overlap must be non-negative")
52        if chunk_overlap >= chunk_size:
53            raise ValueError("chunk_overlap must be less than chunk_size")
54        
55        self.chunk_size = chunk_size
56        self.chunk_overlap = chunk_overlap
57        
58        # Default separator hierarchy if not provided
59        if separators is None:
60            self.separators = [
61                "\n\n",  # Paragraphs
62                "\n",    # Lines
63                ". ",    # Sentences
64                " ",     # Words
65                ""       # Characters (no separator)
66            ]
67        else:
68            self.separators = separators
69        
70        if self.logger:
71            self.logger.info(
72                "Initialized RecursiveChunker",
73                chunk_size=chunk_size,
74                chunk_overlap=chunk_overlap,
75                num_separators=len(self.separators)
76            )

Initialize the recursive chunker.

Args: chunk_size: Target maximum characters per chunk (default: 1000) chunk_overlap: Characters to overlap between chunks (default: 100) separators: List of separators in priority order (default: standard hierarchy) logger: Optional BasicLogger instance

chunk_size
chunk_overlap
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 78    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 79        """
 80        Recursively split text into chunks using hierarchical separators.
 81        
 82        Args:
 83            text: The input text to chunk
 84            metadata: Optional metadata to attach to each chunk
 85            
 86        Returns:
 87            List of Chunk objects
 88            
 89        Raises:
 90            ValueError: If text is invalid
 91        """
 92        self.validate_text(text)
 93        
 94        if metadata is None:
 95            metadata = {}
 96        
 97        if self.logger:
 98            self.logger.debug(f"Recursively chunking text of length {len(text)} characters")
 99        
100        # Perform recursive splitting
101        text_chunks = self._split_text_recursively(text)
102        
103        # Convert text chunks to Chunk objects with metadata
104        chunks = []
105        char_position = 0
106        
107        for i, chunk_text in enumerate(text_chunks):
108            chunk = Chunk(
109                text=chunk_text,
110                metadata=metadata.copy(),
111                start_pos=char_position,
112                end_pos=char_position + len(chunk_text),
113                chunk_id=self._generate_chunk_id(i, metadata)
114            )
115            chunk.metadata["chunking_strategy"] = "recursive"
116            chunks.append(chunk)
117            
118            # Update position accounting for overlap
119            char_position += len(chunk_text) - self.chunk_overlap
120        
121        if self.logger:
122            self.logger.info(
123                f"Created {len(chunks)} recursive chunks",
124                avg_chunk_size=sum(len(c.text) for c in chunks) / len(chunks) if chunks else 0
125            )
126        
127        return chunks

Recursively split text into chunks using hierarchical separators.

Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is invalid

class SentenceChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 15class SentenceChunker(BaseChunker):
 16    """
 17    Chunks text by grouping sentences together.
 18    
 19    This chunker preserves sentence boundaries while grouping multiple
 20    sentences into chunks based on character count or sentence count limits.
 21    
 22    By default uses simple regex for sentence detection. For better accuracy,
 23    pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize).
 24    """
 25    
 26    def __init__(
 27        self,
 28        sentence_tokenizer: Callable[[str], List[str]],
 29        max_chunk_size: int = 1000,
 30        sentences_per_chunk: Optional[int] = None,
 31        logger=None
 32    ):
 33        """
 34        Initialize the sentence chunker.
 35        
 36        Args:
 37            sentence_tokenizer: REQUIRED callable that splits text into sentences.
 38                              Use nltk.sent_tokenize (recommended), spacy, or custom function.
 39                              Example: nltk.sent_tokenize
 40                              Must return List[str] of sentences.
 41            max_chunk_size: Maximum characters per chunk (default: 1000)
 42            sentences_per_chunk: Optional fixed number of sentences per chunk
 43                                If set, this takes priority over max_chunk_size
 44            logger: Optional BasicLogger instance
 45            
 46        Raises:
 47            ValueError: If sentence_tokenizer is not provided
 48        """
 49        super().__init__(logger)
 50        
 51        if sentence_tokenizer is None:
 52            raise ValueError(
 53                "sentence_tokenizer is required. Please provide a sentence tokenization function.\n"
 54                "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n"
 55                "Example: SentenceChunker(sentence_tokenizer=nltk.sent_tokenize)"
 56            )
 57        
 58        if max_chunk_size <= 0:
 59            raise ValueError("max_chunk_size must be greater than 0")
 60        if sentences_per_chunk is not None and sentences_per_chunk <= 0:
 61            raise ValueError("sentences_per_chunk must be greater than 0")
 62        
 63        self.sentence_tokenizer = sentence_tokenizer
 64        self.max_chunk_size = max_chunk_size
 65        self.sentences_per_chunk = sentences_per_chunk
 66        
 67        if self.logger:
 68            self.logger.info(
 69                "Initialized SentenceChunker",
 70                max_chunk_size=max_chunk_size,
 71                sentences_per_chunk=sentences_per_chunk,
 72                tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom'
 73            )
 74    
 75    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 76        """
 77        Split text into chunks based on sentence boundaries.
 78        
 79        Args:
 80            text: The input text to chunk
 81            metadata: Optional metadata to attach to each chunk
 82            
 83        Returns:
 84            List of Chunk objects
 85            
 86        Raises:
 87            ValueError: If text is invalid
 88        """
 89        self.validate_text(text)
 90        
 91        if metadata is None:
 92            metadata = {}
 93        
 94        if self.logger:
 95            self.logger.debug(f"Chunking text of length {len(text)} characters by sentences")
 96        
 97        # Split text into sentences
 98        sentences = self._split_sentences(text)
 99        
100        if self.logger:
101            self.logger.debug(f"Found {len(sentences)} sentences")
102        
103        chunks = []
104        current_sentences = []
105        current_size = 0
106        chunk_index = 0
107        char_position = 0
108        
109        for sentence, start, end in sentences:
110            sentence_len = len(sentence)
111            
112            # Check if we should create a new chunk
113            should_chunk = False
114            
115            if self.sentences_per_chunk:
116                # Fixed sentence count mode
117                should_chunk = len(current_sentences) >= self.sentences_per_chunk
118            else:
119                # Size-based mode
120                should_chunk = (
121                    current_size + sentence_len > self.max_chunk_size 
122                    and current_sentences  # Don't create empty chunks
123                )
124            
125            if should_chunk:
126                # Create chunk from accumulated sentences
127                chunk_text = " ".join(current_sentences)
128                chunk_start = char_position - current_size
129                
130                chunk = Chunk(
131                    text=chunk_text,
132                    metadata=metadata.copy(),
133                    start_pos=chunk_start,
134                    end_pos=char_position,
135                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
136                )
137                chunk.metadata["sentence_count"] = len(current_sentences)
138                chunk.metadata["chunking_strategy"] = "sentence"
139                
140                chunks.append(chunk)
141                chunk_index += 1
142                
143                # Reset for new chunk
144                current_sentences = []
145                current_size = 0
146            
147            # Add sentence to current chunk
148            current_sentences.append(sentence)
149            current_size += sentence_len + 1  # +1 for space
150            char_position = end
151        
152        # Create final chunk if there are remaining sentences
153        if current_sentences:
154            chunk_text = " ".join(current_sentences)
155            chunk_start = char_position - current_size
156            
157            chunk = Chunk(
158                text=chunk_text,
159                metadata=metadata.copy(),
160                start_pos=chunk_start,
161                end_pos=char_position,
162                chunk_id=self._generate_chunk_id(chunk_index, metadata)
163            )
164            chunk.metadata["sentence_count"] = len(current_sentences)
165            chunk.metadata["chunking_strategy"] = "sentence"
166            
167            chunks.append(chunk)
168        
169        if self.logger:
170            self.logger.info(
171                f"Created {len(chunks)} sentence-based chunks",
172                total_sentences=len(sentences),
173                avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0
174            )
175        
176        return chunks
177    
178    def _split_sentences(self, text: str) -> List[tuple]:
179        """
180        Split text into sentences with position tracking using provided tokenizer.
181        
182        Args:
183            text: Text to split into sentences
184            
185        Returns:
186            List of tuples (sentence_text, start_pos, end_pos)
187            
188        Raises:
189            RuntimeError: If sentence tokenizer fails
190        """
191        try:
192            sentence_texts = self.sentence_tokenizer(text)
193            
194            # Calculate positions for each sentence
195            sentences = []
196            pos = 0
197            for sent_text in sentence_texts:
198                # Find the sentence in the original text
199                idx = text.find(sent_text, pos)
200                if idx != -1:
201                    start = idx
202                    end = idx + len(sent_text)
203                    sentences.append((sent_text, start, end))
204                    pos = end
205                else:
206                    # Fallback: estimate position
207                    sentences.append((sent_text, pos, pos + len(sent_text)))
208                    pos += len(sent_text)
209            
210            return sentences if sentences else [(text.strip(), 0, len(text))]
211            
212        except Exception as e:
213            raise RuntimeError(
214                f"Sentence tokenizer failed: {e}\n"
215                f"Please ensure your tokenizer function is working correctly."
216            ) from e

Chunks text by grouping sentences together.

This chunker preserves sentence boundaries while grouping multiple sentences into chunks based on character count or sentence count limits.

By default uses simple regex for sentence detection. For better accuracy, pass a custom sentence_tokenizer (e.g., nltk.sent_tokenize).

SentenceChunker( sentence_tokenizer: Callable[[str], List[str]], max_chunk_size: int = 1000, sentences_per_chunk: Optional[int] = None, logger=None)
26    def __init__(
27        self,
28        sentence_tokenizer: Callable[[str], List[str]],
29        max_chunk_size: int = 1000,
30        sentences_per_chunk: Optional[int] = None,
31        logger=None
32    ):
33        """
34        Initialize the sentence chunker.
35        
36        Args:
37            sentence_tokenizer: REQUIRED callable that splits text into sentences.
38                              Use nltk.sent_tokenize (recommended), spacy, or custom function.
39                              Example: nltk.sent_tokenize
40                              Must return List[str] of sentences.
41            max_chunk_size: Maximum characters per chunk (default: 1000)
42            sentences_per_chunk: Optional fixed number of sentences per chunk
43                                If set, this takes priority over max_chunk_size
44            logger: Optional BasicLogger instance
45            
46        Raises:
47            ValueError: If sentence_tokenizer is not provided
48        """
49        super().__init__(logger)
50        
51        if sentence_tokenizer is None:
52            raise ValueError(
53                "sentence_tokenizer is required. Please provide a sentence tokenization function.\n"
54                "Recommended: import nltk; nltk.download('punkt'); use nltk.sent_tokenize\n"
55                "Example: SentenceChunker(sentence_tokenizer=nltk.sent_tokenize)"
56            )
57        
58        if max_chunk_size <= 0:
59            raise ValueError("max_chunk_size must be greater than 0")
60        if sentences_per_chunk is not None and sentences_per_chunk <= 0:
61            raise ValueError("sentences_per_chunk must be greater than 0")
62        
63        self.sentence_tokenizer = sentence_tokenizer
64        self.max_chunk_size = max_chunk_size
65        self.sentences_per_chunk = sentences_per_chunk
66        
67        if self.logger:
68            self.logger.info(
69                "Initialized SentenceChunker",
70                max_chunk_size=max_chunk_size,
71                sentences_per_chunk=sentences_per_chunk,
72                tokenizer=sentence_tokenizer.__name__ if hasattr(sentence_tokenizer, '__name__') else 'custom'
73            )

Initialize the sentence chunker.

Args: sentence_tokenizer: REQUIRED callable that splits text into sentences. Use nltk.sent_tokenize (recommended), spacy, or custom function. Example: nltk.sent_tokenize Must return List[str] of sentences. max_chunk_size: Maximum characters per chunk (default: 1000) sentences_per_chunk: Optional fixed number of sentences per chunk If set, this takes priority over max_chunk_size logger: Optional BasicLogger instance

Raises: ValueError: If sentence_tokenizer is not provided

sentence_tokenizer
max_chunk_size
sentences_per_chunk
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 75    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 76        """
 77        Split text into chunks based on sentence boundaries.
 78        
 79        Args:
 80            text: The input text to chunk
 81            metadata: Optional metadata to attach to each chunk
 82            
 83        Returns:
 84            List of Chunk objects
 85            
 86        Raises:
 87            ValueError: If text is invalid
 88        """
 89        self.validate_text(text)
 90        
 91        if metadata is None:
 92            metadata = {}
 93        
 94        if self.logger:
 95            self.logger.debug(f"Chunking text of length {len(text)} characters by sentences")
 96        
 97        # Split text into sentences
 98        sentences = self._split_sentences(text)
 99        
100        if self.logger:
101            self.logger.debug(f"Found {len(sentences)} sentences")
102        
103        chunks = []
104        current_sentences = []
105        current_size = 0
106        chunk_index = 0
107        char_position = 0
108        
109        for sentence, start, end in sentences:
110            sentence_len = len(sentence)
111            
112            # Check if we should create a new chunk
113            should_chunk = False
114            
115            if self.sentences_per_chunk:
116                # Fixed sentence count mode
117                should_chunk = len(current_sentences) >= self.sentences_per_chunk
118            else:
119                # Size-based mode
120                should_chunk = (
121                    current_size + sentence_len > self.max_chunk_size 
122                    and current_sentences  # Don't create empty chunks
123                )
124            
125            if should_chunk:
126                # Create chunk from accumulated sentences
127                chunk_text = " ".join(current_sentences)
128                chunk_start = char_position - current_size
129                
130                chunk = Chunk(
131                    text=chunk_text,
132                    metadata=metadata.copy(),
133                    start_pos=chunk_start,
134                    end_pos=char_position,
135                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
136                )
137                chunk.metadata["sentence_count"] = len(current_sentences)
138                chunk.metadata["chunking_strategy"] = "sentence"
139                
140                chunks.append(chunk)
141                chunk_index += 1
142                
143                # Reset for new chunk
144                current_sentences = []
145                current_size = 0
146            
147            # Add sentence to current chunk
148            current_sentences.append(sentence)
149            current_size += sentence_len + 1  # +1 for space
150            char_position = end
151        
152        # Create final chunk if there are remaining sentences
153        if current_sentences:
154            chunk_text = " ".join(current_sentences)
155            chunk_start = char_position - current_size
156            
157            chunk = Chunk(
158                text=chunk_text,
159                metadata=metadata.copy(),
160                start_pos=chunk_start,
161                end_pos=char_position,
162                chunk_id=self._generate_chunk_id(chunk_index, metadata)
163            )
164            chunk.metadata["sentence_count"] = len(current_sentences)
165            chunk.metadata["chunking_strategy"] = "sentence"
166            
167            chunks.append(chunk)
168        
169        if self.logger:
170            self.logger.info(
171                f"Created {len(chunks)} sentence-based chunks",
172                total_sentences=len(sentences),
173                avg_sentences_per_chunk=len(sentences) / len(chunks) if chunks else 0
174            )
175        
176        return chunks

Split text into chunks based on sentence boundaries.

Args: text: The input text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is invalid

class MarkdownChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 15class MarkdownChunker(BaseChunker):
 16    """
 17    Chunks markdown text while respecting document structure.
 18    
 19    This chunker identifies markdown headers (# ## ###, etc.) and uses them
 20    as natural boundaries for chunking, preserving the document hierarchy.
 21    
 22    Uses regex-based parsing which works well for standard markdown. For complex
 23    markdown with extensions, consider pre-processing with mistune or markdown-it-py.
 24    """
 25    
 26    def __init__(
 27        self,
 28        max_chunk_size: int = 1500,
 29        combine_headers: bool = True,
 30        min_header_level: int = 1,
 31        logger=None
 32    ):
 33        """
 34        Initialize the markdown chunker.
 35        
 36        Args:
 37            max_chunk_size: Maximum characters per chunk (default: 1500)
 38            combine_headers: Whether to combine small sections under headers (default: True)
 39            min_header_level: Minimum header level to split at (1-6, default: 1)
 40            logger: Optional BasicLogger instance
 41        """
 42        super().__init__(logger)
 43        
 44        if max_chunk_size <= 0:
 45            raise ValueError("max_chunk_size must be greater than 0")
 46        if not (1 <= min_header_level <= 6):
 47            raise ValueError("min_header_level must be between 1 and 6")
 48        
 49        self.max_chunk_size = max_chunk_size
 50        self.combine_headers = combine_headers
 51        self.min_header_level = min_header_level
 52        
 53        # Regex pattern for markdown headers (both ATX and Setext styles)
 54        self.header_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE)
 55        
 56        if self.logger:
 57            self.logger.info(
 58                "Initialized MarkdownChunker",
 59                max_chunk_size=max_chunk_size,
 60                combine_headers=combine_headers,
 61                min_header_level=min_header_level
 62            )
 63    
 64    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 65        """
 66        Split markdown text into chunks respecting header boundaries.
 67        
 68        Args:
 69            text: The markdown text to chunk
 70            metadata: Optional metadata to attach to each chunk
 71            
 72        Returns:
 73            List of Chunk objects
 74            
 75        Raises:
 76            ValueError: If text is invalid
 77        """
 78        self.validate_text(text)
 79        
 80        if metadata is None:
 81            metadata = {}
 82        
 83        if self.logger:
 84            self.logger.debug(f"Chunking markdown text of length {len(text)} characters")
 85        
 86        # Find all headers and their positions
 87        sections = self._parse_sections(text)
 88        
 89        if self.logger:
 90            self.logger.debug(f"Found {len(sections)} markdown sections")
 91        
 92        # Create chunks from sections
 93        chunks = []
 94        current_chunk_parts = []
 95        current_size = 0
 96        chunk_index = 0
 97        current_headers = []
 98        
 99        for level, header_text, content, start, end in sections:
100            section_text = content
101            section_size = len(section_text)
102            
103            # Check if we should start a new chunk
104            if (current_size + section_size > self.max_chunk_size 
105                and current_chunk_parts 
106                and level <= self.min_header_level):
107                
108                # Create chunk from accumulated sections
109                chunk_text = "\n\n".join(current_chunk_parts)
110                
111                chunk = Chunk(
112                    text=chunk_text,
113                    metadata=metadata.copy(),
114                    start_pos=start - current_size,
115                    end_pos=start,
116                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
117                )
118                chunk.metadata["headers"] = current_headers.copy()
119                chunk.metadata["chunking_strategy"] = "markdown"
120                
121                chunks.append(chunk)
122                chunk_index += 1
123                
124                # Reset for new chunk
125                current_chunk_parts = []
126                current_size = 0
127                current_headers = []
128            
129            # Add section to current chunk
130            current_chunk_parts.append(section_text)
131            current_size += section_size + 2  # +2 for \n\n
132            
133            # Track header hierarchy
134            if header_text:
135                current_headers.append({
136                    "level": level,
137                    "text": header_text
138                })
139        
140        # Create final chunk
141        if current_chunk_parts:
142            chunk_text = "\n\n".join(current_chunk_parts)
143            
144            chunk = Chunk(
145                text=chunk_text,
146                metadata=metadata.copy(),
147                start_pos=len(text) - current_size,
148                end_pos=len(text),
149                chunk_id=self._generate_chunk_id(chunk_index, metadata)
150            )
151            chunk.metadata["headers"] = current_headers
152            chunk.metadata["chunking_strategy"] = "markdown"
153            
154            chunks.append(chunk)
155        
156        if self.logger:
157            self.logger.info(
158                f"Created {len(chunks)} markdown chunks",
159                total_sections=len(sections),
160                avg_sections_per_chunk=len(sections) / len(chunks) if chunks else 0
161            )
162        
163        return chunks
164    
165    def _parse_sections(self, text: str) -> List[Tuple[int, str, str, int, int]]:
166        """
167        Parse markdown text into sections based on headers.
168        
169        Args:
170            text: Markdown text to parse
171            
172        Returns:
173            List of tuples (header_level, header_text, content, start_pos, end_pos)
174        """
175        sections = []
176        lines = text.split('\n')
177        current_content = []
178        current_header_level = 0
179        current_header_text = ""
180        section_start = 0
181        char_position = 0
182        
183        for i, line in enumerate(lines):
184            # Check if line is a header
185            header_match = self.header_pattern.match(line)
186            
187            if header_match:
188                # Save previous section if it exists
189                if current_content or current_header_text:
190                    content = '\n'.join(current_content)
191                    sections.append((
192                        current_header_level,
193                        current_header_text,
194                        content,
195                        section_start,
196                        char_position
197                    ))
198                
199                # Start new section
200                current_header_level = len(header_match.group(1))
201                current_header_text = header_match.group(2).strip()
202                current_content = [line]  # Include header in content
203                section_start = char_position
204            else:
205                current_content.append(line)
206            
207            char_position += len(line) + 1  # +1 for newline
208        
209        # Add final section
210        if current_content:
211            content = '\n'.join(current_content)
212            sections.append((
213                current_header_level,
214                current_header_text,
215                content,
216                section_start,
217                char_position
218            ))
219        
220        # If no sections found, treat entire text as one section
221        if not sections:
222            sections = [(0, "", text, 0, len(text))]
223        
224        return sections

Chunks markdown text while respecting document structure.

This chunker identifies markdown headers (# ## ###, etc.) and uses them as natural boundaries for chunking, preserving the document hierarchy.

Uses regex-based parsing which works well for standard markdown. For complex markdown with extensions, consider pre-processing with mistune or markdown-it-py.

MarkdownChunker( max_chunk_size: int = 1500, combine_headers: bool = True, min_header_level: int = 1, logger=None)
26    def __init__(
27        self,
28        max_chunk_size: int = 1500,
29        combine_headers: bool = True,
30        min_header_level: int = 1,
31        logger=None
32    ):
33        """
34        Initialize the markdown chunker.
35        
36        Args:
37            max_chunk_size: Maximum characters per chunk (default: 1500)
38            combine_headers: Whether to combine small sections under headers (default: True)
39            min_header_level: Minimum header level to split at (1-6, default: 1)
40            logger: Optional BasicLogger instance
41        """
42        super().__init__(logger)
43        
44        if max_chunk_size <= 0:
45            raise ValueError("max_chunk_size must be greater than 0")
46        if not (1 <= min_header_level <= 6):
47            raise ValueError("min_header_level must be between 1 and 6")
48        
49        self.max_chunk_size = max_chunk_size
50        self.combine_headers = combine_headers
51        self.min_header_level = min_header_level
52        
53        # Regex pattern for markdown headers (both ATX and Setext styles)
54        self.header_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE)
55        
56        if self.logger:
57            self.logger.info(
58                "Initialized MarkdownChunker",
59                max_chunk_size=max_chunk_size,
60                combine_headers=combine_headers,
61                min_header_level=min_header_level
62            )

Initialize the markdown chunker.

Args: max_chunk_size: Maximum characters per chunk (default: 1500) combine_headers: Whether to combine small sections under headers (default: True) min_header_level: Minimum header level to split at (1-6, default: 1) logger: Optional BasicLogger instance

max_chunk_size
combine_headers
min_header_level
header_pattern
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 64    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 65        """
 66        Split markdown text into chunks respecting header boundaries.
 67        
 68        Args:
 69            text: The markdown text to chunk
 70            metadata: Optional metadata to attach to each chunk
 71            
 72        Returns:
 73            List of Chunk objects
 74            
 75        Raises:
 76            ValueError: If text is invalid
 77        """
 78        self.validate_text(text)
 79        
 80        if metadata is None:
 81            metadata = {}
 82        
 83        if self.logger:
 84            self.logger.debug(f"Chunking markdown text of length {len(text)} characters")
 85        
 86        # Find all headers and their positions
 87        sections = self._parse_sections(text)
 88        
 89        if self.logger:
 90            self.logger.debug(f"Found {len(sections)} markdown sections")
 91        
 92        # Create chunks from sections
 93        chunks = []
 94        current_chunk_parts = []
 95        current_size = 0
 96        chunk_index = 0
 97        current_headers = []
 98        
 99        for level, header_text, content, start, end in sections:
100            section_text = content
101            section_size = len(section_text)
102            
103            # Check if we should start a new chunk
104            if (current_size + section_size > self.max_chunk_size 
105                and current_chunk_parts 
106                and level <= self.min_header_level):
107                
108                # Create chunk from accumulated sections
109                chunk_text = "\n\n".join(current_chunk_parts)
110                
111                chunk = Chunk(
112                    text=chunk_text,
113                    metadata=metadata.copy(),
114                    start_pos=start - current_size,
115                    end_pos=start,
116                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
117                )
118                chunk.metadata["headers"] = current_headers.copy()
119                chunk.metadata["chunking_strategy"] = "markdown"
120                
121                chunks.append(chunk)
122                chunk_index += 1
123                
124                # Reset for new chunk
125                current_chunk_parts = []
126                current_size = 0
127                current_headers = []
128            
129            # Add section to current chunk
130            current_chunk_parts.append(section_text)
131            current_size += section_size + 2  # +2 for \n\n
132            
133            # Track header hierarchy
134            if header_text:
135                current_headers.append({
136                    "level": level,
137                    "text": header_text
138                })
139        
140        # Create final chunk
141        if current_chunk_parts:
142            chunk_text = "\n\n".join(current_chunk_parts)
143            
144            chunk = Chunk(
145                text=chunk_text,
146                metadata=metadata.copy(),
147                start_pos=len(text) - current_size,
148                end_pos=len(text),
149                chunk_id=self._generate_chunk_id(chunk_index, metadata)
150            )
151            chunk.metadata["headers"] = current_headers
152            chunk.metadata["chunking_strategy"] = "markdown"
153            
154            chunks.append(chunk)
155        
156        if self.logger:
157            self.logger.info(
158                f"Created {len(chunks)} markdown chunks",
159                total_sections=len(sections),
160                avg_sections_per_chunk=len(sections) / len(chunks) if chunks else 0
161            )
162        
163        return chunks

Split markdown text into chunks respecting header boundaries.

Args: text: The markdown text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is invalid

class CodeChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 15class CodeChunker(BaseChunker):
 16    """
 17    Chunks code while respecting function and class boundaries.
 18    
 19    This chunker identifies code structures (functions, classes, methods)
 20    and uses them as natural boundaries for chunking, preserving code context.
 21    Supports Python, JavaScript, TypeScript, Java, C#, and similar languages.
 22    """
 23    
 24    def __init__(
 25        self,
 26        max_chunk_size: int = 2000,
 27        language: str = "python",
 28        include_imports: bool = True,
 29        logger=None
 30    ):
 31        """
 32        Initialize the code chunker.
 33        
 34        Args:
 35            max_chunk_size: Maximum characters per chunk (default: 2000)
 36            language: Programming language ("python", "javascript", "java", etc.)
 37            include_imports: Whether to include imports/using statements in chunks
 38            logger: Optional BasicLogger instance
 39        """
 40        super().__init__(logger)
 41        
 42        if max_chunk_size <= 0:
 43            raise ValueError("max_chunk_size must be greater than 0")
 44        
 45        self.max_chunk_size = max_chunk_size
 46        self.language = language.lower()
 47        self.include_imports = include_imports
 48        
 49        # Define patterns for different languages
 50        self._setup_patterns()
 51        
 52        if self.logger:
 53            self.logger.info(
 54                "Initialized CodeChunker",
 55                max_chunk_size=max_chunk_size,
 56                language=language
 57            )
 58    
 59    def _setup_patterns(self):
 60        """Setup regex patterns based on language."""
 61        if self.language == "python":
 62            self.function_pattern = re.compile(
 63                r'^(async\s+)?def\s+\w+\s*\([^)]*\)\s*(->\s*[^:]+)?:',
 64                re.MULTILINE
 65            )
 66            self.class_pattern = re.compile(
 67                r'^class\s+\w+(\([^)]*\))?:\s*$',
 68                re.MULTILINE
 69            )
 70            self.import_pattern = re.compile(
 71                r'^(import\s+\S+|from\s+\S+\s+import\s+.+)$',
 72                re.MULTILINE
 73            )
 74        elif self.language in ["javascript", "typescript", "java", "csharp", "c++"]:
 75            self.function_pattern = re.compile(
 76                r'(public|private|protected|static|async)?\s*(function|void|int|string|bool|var|let|const)?\s+\w+\s*\([^)]*\)\s*\{',
 77                re.MULTILINE
 78            )
 79            self.class_pattern = re.compile(
 80                r'(export\s+)?(public\s+)?class\s+\w+(\s+extends\s+\w+)?(\s+implements\s+[\w,\s]+)?\s*\{',
 81                re.MULTILINE
 82            )
 83            self.import_pattern = re.compile(
 84                r'^(import\s+.*from\s+["\'].*["\'];?|using\s+\S+;|#include\s+<.*>)$',
 85                re.MULTILINE
 86            )
 87        else:
 88            # Generic patterns for unknown languages
 89            self.function_pattern = re.compile(r'^\s*(def|function|func|fun)\s+\w+', re.MULTILINE)
 90            self.class_pattern = re.compile(r'^\s*class\s+\w+', re.MULTILINE)
 91            self.import_pattern = re.compile(r'^(import|using|include)\s+', re.MULTILINE)
 92    
 93    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 94        """
 95        Split code into chunks respecting function and class boundaries.
 96        
 97        Args:
 98            text: The code text to chunk
 99            metadata: Optional metadata to attach to each chunk
100            
101        Returns:
102            List of Chunk objects
103            
104        Raises:
105            ValueError: If text is invalid
106        """
107        self.validate_text(text)
108        
109        if metadata is None:
110            metadata = {}
111        
112        if self.logger:
113            self.logger.debug(f"Chunking code of length {len(text)} characters")
114        
115        # Extract imports if needed
116        imports_text = ""
117        if self.include_imports:
118            imports_text = self._extract_imports(text)
119        
120        # Find all code blocks (functions and classes)
121        code_blocks = self._parse_code_blocks(text)
122        
123        if self.logger:
124            self.logger.debug(f"Found {len(code_blocks)} code blocks")
125        
126        # Create chunks from code blocks
127        chunks = []
128        current_chunk_parts = []
129        current_size = len(imports_text)
130        chunk_index = 0
131        
132        for block_type, block_name, block_content, start, end in code_blocks:
133            block_size = len(block_content)
134            
135            # Check if we should start a new chunk
136            if current_size + block_size > self.max_chunk_size and current_chunk_parts:
137                # Create chunk from accumulated blocks
138                chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts)
139                
140                chunk = Chunk(
141                    text=chunk_text.strip(),
142                    metadata=metadata.copy(),
143                    start_pos=start - current_size,
144                    end_pos=start,
145                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
146                )
147                chunk.metadata["code_blocks"] = len(current_chunk_parts)
148                chunk.metadata["chunking_strategy"] = "code"
149                chunk.metadata["language"] = self.language
150                
151                chunks.append(chunk)
152                chunk_index += 1
153                
154                # Reset for new chunk
155                current_chunk_parts = []
156                current_size = len(imports_text)
157            
158            # Add block to current chunk
159            current_chunk_parts.append(block_content)
160            current_size += block_size + 2  # +2 for \n\n
161        
162        # Create final chunk
163        if current_chunk_parts:
164            chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts)
165            
166            chunk = Chunk(
167                text=chunk_text.strip(),
168                metadata=metadata.copy(),
169                start_pos=len(text) - current_size,
170                end_pos=len(text),
171                chunk_id=self._generate_chunk_id(chunk_index, metadata)
172            )
173            chunk.metadata["code_blocks"] = len(current_chunk_parts)
174            chunk.metadata["chunking_strategy"] = "code"
175            chunk.metadata["language"] = self.language
176            
177            chunks.append(chunk)
178        
179        if self.logger:
180            self.logger.info(
181                f"Created {len(chunks)} code chunks",
182                total_blocks=len(code_blocks),
183                avg_blocks_per_chunk=len(code_blocks) / len(chunks) if chunks else 0
184            )
185        
186        return chunks
187    
188    def _extract_imports(self, text: str) -> str:
189        """Extract import/using statements from the code."""
190        imports = []
191        for match in self.import_pattern.finditer(text):
192            imports.append(match.group().strip())
193       
194        return "\n".join(imports) if imports else ""
195    
196    def _parse_code_blocks(self, text: str) -> List[Tuple[str, str, str, int, int]]:
197        """
198        Parse code into blocks (functions, classes, etc.).
199        
200        Args:
201            text: Code text to parse
202            
203        Returns:
204            List of tuples (block_type, block_name, content, start_pos, end_pos)
205        """
206        blocks = []
207        lines = text.split('\n')
208        
209        # Find all function and class definitions
210        all_matches = []
211        
212        for match in self.function_pattern.finditer(text):
213            all_matches.append(('function', match.start(), match.group()))
214        
215        for match in self.class_pattern.finditer(text):
216            all_matches.append(('class', match.start(), match.group()))
217        
218        # Sort by position
219        all_matches.sort(key=lambda x: x[1])
220        
221        # Extract code blocks with their content
222        for i, (block_type, start, match_text) in enumerate(all_matches):
223            # Extract block name
224            block_name = self._extract_name(match_text)
225            
226            # Find end of block (next block start or end of file)
227            if i + 1 < len(all_matches):
228                end = all_matches[i + 1][1]
229            else:
230                end = len(text)
231            
232            # Extract block content
233            block_content = text[start:end].strip()
234            
235            blocks.append((block_type, block_name, block_content, start, end))
236        
237        # If no blocks found, treat entire text as one block
238        if not blocks:
239            blocks = [('code', 'main', text, 0, len(text))]
240        
241        return blocks
242    
243    def _extract_name(self, definition: str) -> str:
244        """Extract function or class name from definition."""
245        # Try to find the name after 'def', 'function', 'class', etc.
246        name_match = re.search(r'\b(def|function|class|func|fun)\s+(\w+)', definition)
247        if name_match:
248            return name_match.group(2)
249        
250        # Fallback: try to find any word after space
251        words = definition.split()
252        for word in words:
253            if re.match(r'^\w+$', word) and word not in ['def', 'function', 'class', 'public', 'private', 'static', 'async']:
254                return word
255        
256        return 'unknown'

Chunks code while respecting function and class boundaries.

This chunker identifies code structures (functions, classes, methods) and uses them as natural boundaries for chunking, preserving code context. Supports Python, JavaScript, TypeScript, Java, C#, and similar languages.

CodeChunker( max_chunk_size: int = 2000, language: str = 'python', include_imports: bool = True, logger=None)
24    def __init__(
25        self,
26        max_chunk_size: int = 2000,
27        language: str = "python",
28        include_imports: bool = True,
29        logger=None
30    ):
31        """
32        Initialize the code chunker.
33        
34        Args:
35            max_chunk_size: Maximum characters per chunk (default: 2000)
36            language: Programming language ("python", "javascript", "java", etc.)
37            include_imports: Whether to include imports/using statements in chunks
38            logger: Optional BasicLogger instance
39        """
40        super().__init__(logger)
41        
42        if max_chunk_size <= 0:
43            raise ValueError("max_chunk_size must be greater than 0")
44        
45        self.max_chunk_size = max_chunk_size
46        self.language = language.lower()
47        self.include_imports = include_imports
48        
49        # Define patterns for different languages
50        self._setup_patterns()
51        
52        if self.logger:
53            self.logger.info(
54                "Initialized CodeChunker",
55                max_chunk_size=max_chunk_size,
56                language=language
57            )

Initialize the code chunker.

Args: max_chunk_size: Maximum characters per chunk (default: 2000) language: Programming language ("python", "javascript", "java", etc.) include_imports: Whether to include imports/using statements in chunks logger: Optional BasicLogger instance

max_chunk_size
language
include_imports
def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 93    def chunk(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 94        """
 95        Split code into chunks respecting function and class boundaries.
 96        
 97        Args:
 98            text: The code text to chunk
 99            metadata: Optional metadata to attach to each chunk
100            
101        Returns:
102            List of Chunk objects
103            
104        Raises:
105            ValueError: If text is invalid
106        """
107        self.validate_text(text)
108        
109        if metadata is None:
110            metadata = {}
111        
112        if self.logger:
113            self.logger.debug(f"Chunking code of length {len(text)} characters")
114        
115        # Extract imports if needed
116        imports_text = ""
117        if self.include_imports:
118            imports_text = self._extract_imports(text)
119        
120        # Find all code blocks (functions and classes)
121        code_blocks = self._parse_code_blocks(text)
122        
123        if self.logger:
124            self.logger.debug(f"Found {len(code_blocks)} code blocks")
125        
126        # Create chunks from code blocks
127        chunks = []
128        current_chunk_parts = []
129        current_size = len(imports_text)
130        chunk_index = 0
131        
132        for block_type, block_name, block_content, start, end in code_blocks:
133            block_size = len(block_content)
134            
135            # Check if we should start a new chunk
136            if current_size + block_size > self.max_chunk_size and current_chunk_parts:
137                # Create chunk from accumulated blocks
138                chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts)
139                
140                chunk = Chunk(
141                    text=chunk_text.strip(),
142                    metadata=metadata.copy(),
143                    start_pos=start - current_size,
144                    end_pos=start,
145                    chunk_id=self._generate_chunk_id(chunk_index, metadata)
146                )
147                chunk.metadata["code_blocks"] = len(current_chunk_parts)
148                chunk.metadata["chunking_strategy"] = "code"
149                chunk.metadata["language"] = self.language
150                
151                chunks.append(chunk)
152                chunk_index += 1
153                
154                # Reset for new chunk
155                current_chunk_parts = []
156                current_size = len(imports_text)
157            
158            # Add block to current chunk
159            current_chunk_parts.append(block_content)
160            current_size += block_size + 2  # +2 for \n\n
161        
162        # Create final chunk
163        if current_chunk_parts:
164            chunk_text = imports_text + "\n\n" + "\n\n".join(current_chunk_parts)
165            
166            chunk = Chunk(
167                text=chunk_text.strip(),
168                metadata=metadata.copy(),
169                start_pos=len(text) - current_size,
170                end_pos=len(text),
171                chunk_id=self._generate_chunk_id(chunk_index, metadata)
172            )
173            chunk.metadata["code_blocks"] = len(current_chunk_parts)
174            chunk.metadata["chunking_strategy"] = "code"
175            chunk.metadata["language"] = self.language
176            
177            chunks.append(chunk)
178        
179        if self.logger:
180            self.logger.info(
181                f"Created {len(chunks)} code chunks",
182                total_blocks=len(code_blocks),
183                avg_blocks_per_chunk=len(code_blocks) / len(chunks) if chunks else 0
184            )
185        
186        return chunks

Split code into chunks respecting function and class boundaries.

Args: text: The code text to chunk metadata: Optional metadata to attach to each chunk

Returns: List of Chunk objects

Raises: ValueError: If text is invalid

class WikiPageChunker(gmf_forge_ai_data.chunkers.BaseChunker):
 21class WikiPageChunker(BaseChunker):
 22    """
 23    Chunks wiki pages by splitting on heading boundaries.
 24
 25    Supports three content formats:
 26    - ``chunk_markdown(text)`` — Azure DevOps Wiki pages (Markdown ``#`` headings)
 27    - ``chunk_html(html)``     — Confluence storage-format HTML (``<h1>``–``<h6>``)
 28    - ``chunk(text)``          — plain text with heuristic heading detection
 29
 30    The heading text is preserved as the first line of each chunk and also
 31    stored in ``chunk.metadata["section_heading"]`` and
 32    ``chunk.metadata["heading_level"]`` for downstream use.
 33
 34    Example (Azure DevOps Wiki):
 35        ```python
 36        from gmf_forge_ai_data.chunkers import WikiPageChunker
 37
 38        chunker = WikiPageChunker(max_chunk_size=1500, min_chunk_size=200)
 39
 40        # content is the Markdown string from AzureDevOpsWikiConnector
 41        chunks = chunker.chunk_markdown(content, metadata={"page_path": "/Overview"})
 42        ```
 43
 44    Example (Confluence):
 45        ```python
 46        # raw_html is Confluence storage-format body
 47        chunks = chunker.chunk_html(raw_html, metadata={"page_id": "12345"})
 48        ```
 49    """
 50
 51    # Matches <h1> … <h6> tags in Confluence storage-format HTML
 52    _HTML_HEADING = re.compile(
 53        r"<h([1-6])[^>]*>(.*?)</h\1>",
 54        re.IGNORECASE | re.DOTALL,
 55    )
 56    # Matches any remaining HTML tag so we can strip them
 57    _HTML_TAG = re.compile(r"<[^>]+>")
 58    # Decode the most common HTML entities
 59    _ENTITIES: Dict[str, str] = {
 60        "&amp;": "&", "&lt;": "<", "&gt;": ">",
 61        "&quot;": '"', "&#39;": "'", "&nbsp;": " ",
 62        "&apos;": "'",
 63    }
 64
 65    def __init__(
 66        self,
 67        max_chunk_size: int = 1500,
 68        min_chunk_size: int = 200,
 69        logger=None,
 70    ):
 71        """
 72        Args:
 73            max_chunk_size: Maximum characters per chunk (default: 1500).
 74                            When a section exceeds this, it is split further
 75                            at paragraph boundaries.
 76            min_chunk_size: Sections smaller than this are merged with the
 77                            next section before splitting (default: 200).
 78            logger:         Optional BasicLogger instance.
 79        """
 80        super().__init__(logger)
 81
 82        if max_chunk_size <= 0:
 83            raise ValueError("max_chunk_size must be greater than 0")
 84        if min_chunk_size < 0:
 85            raise ValueError("min_chunk_size must be >= 0")
 86
 87        self.max_chunk_size = max_chunk_size
 88        self.min_chunk_size = min_chunk_size
 89
 90    # Matches ATX Markdown headings: # … through ###### …
 91    _MD_HEADING = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
 92
 93    # ── Public interface ─────────────────────────────────────────────────────
 94
 95    def chunk_markdown(
 96        self,
 97        text: str,
 98        metadata: Optional[Dict[str, Any]] = None,
 99    ) -> List[Chunk]:
100        """
101        Chunk Markdown wiki content (e.g. Azure DevOps Wiki pages).
102
103        Splits at ATX heading boundaries (``#`` through ``######``). This is
104        the preferred entry point for content from AzureDevOpsWikiConnector.
105
106        Args:
107            text:     Markdown content of the wiki page.
108            metadata: Optional metadata to propagate to every chunk.
109
110        Returns:
111            List of Chunk objects with section heading metadata attached.
112        """
113        self.validate_text(text)
114        sections = self._parse_markdown_sections(text)
115        return self._sections_to_chunks(sections, metadata or {})
116
117    def chunk_html(
118        self,
119        html: str,
120        metadata: Optional[Dict[str, Any]] = None,
121    ) -> List[Chunk]:
122        """
123        Chunk raw Confluence storage-format HTML directly.
124
125        This is the preferred entry point when working with content returned
126        by ConfluenceConnector before HTML stripping has been applied.
127        Headings are extracted from the HTML tags before stripping, which
128        gives more reliable section boundaries than heuristic detection.
129
130        Args:
131            html:     Confluence storage-format HTML body.
132            metadata: Optional metadata to propagate to every chunk.
133
134        Returns:
135            List of Chunk objects with section heading metadata attached.
136        """
137        sections = self._parse_html_sections(html)
138        return self._sections_to_chunks(sections, metadata or {})
139
140    def chunk(
141        self,
142        text: str,
143        metadata: Optional[Dict[str, Any]] = None,
144    ) -> List[Chunk]:
145        """
146        Chunk plain text that has already been HTML-stripped.
147
148        Falls back to heuristic heading detection (short lines followed by
149        a blank line). Prefer ``chunk_html()`` when the original HTML is
150        still available.
151
152        Args:
153            text:     Plain-text content of a wiki page.
154            metadata: Optional metadata to propagate to every chunk.
155
156        Returns:
157            List of Chunk objects.
158        """
159        self.validate_text(text)
160        sections = self._parse_text_sections(text)
161        return self._sections_to_chunks(sections, metadata or {})
162
163    # ── Section parsing ──────────────────────────────────────────────────────
164
165    def _parse_html_sections(
166        self, html: str
167    ) -> List[Tuple[int, str, str]]:
168        """
169        Split HTML into (level, heading_text, body_text) tuples.
170
171        Splits at each <h1>–<h6> tag boundary. Content before the first
172        heading is emitted as a level-0 section with an empty heading.
173        """
174        sections: List[Tuple[int, str, str]] = []
175        last_end = 0
176        current_level = 0
177        current_heading = ""
178        current_body_parts: List[str] = []
179
180        for match in self._HTML_HEADING.finditer(html):
181            # Flush whatever came before this heading
182            body_html = html[last_end : match.start()]
183            current_body_parts.append(body_html)
184            body_text = self._clean_html("".join(current_body_parts))
185            if body_text.strip() or current_heading:
186                sections.append((current_level, current_heading, body_text))
187
188            current_level = int(match.group(1))
189            current_heading = self._clean_html(match.group(2))
190            current_body_parts = []
191            last_end = match.end()
192
193        # Flush trailing content after the last heading
194        body_html = html[last_end:]
195        current_body_parts.append(body_html)
196        body_text = self._clean_html("".join(current_body_parts))
197        if body_text.strip() or current_heading:
198            sections.append((current_level, current_heading, body_text))
199
200        return sections
201
202    def _parse_markdown_sections(
203        self, text: str
204    ) -> List[Tuple[int, str, str]]:
205        """
206        Split Markdown text into (level, heading_text, body_text) tuples.
207
208        Splits at ATX heading boundaries (``# …`` through ``###### …``).
209        Content before the first heading is emitted as a level-0 section.
210        Heading markers are stripped from the heading text stored in metadata.
211        """
212        sections: List[Tuple[int, str, str]] = []
213        last_end = 0
214        current_level = 0
215        current_heading = ""
216        current_body_parts: List[str] = []
217
218        for match in self._MD_HEADING.finditer(text):
219            body = text[last_end : match.start()]
220            current_body_parts.append(body)
221            body_text = "".join(current_body_parts).strip()
222            if body_text or current_heading:
223                sections.append((current_level, current_heading, body_text))
224
225            current_level = len(match.group(1))
226            current_heading = match.group(2).strip()
227            current_body_parts = []
228            last_end = match.end()
229
230        # Flush trailing content after the last heading
231        trailing = text[last_end:].strip()
232        if trailing or current_heading:
233            sections.append((current_level, current_heading, trailing))
234
235        return sections
236
237    def _parse_text_sections(
238        self, text: str
239    ) -> List[Tuple[int, str, str]]:
240        """
241        Heuristic heading detection for already-stripped plain text.
242
243        A line is treated as a heading if it:
244        - Is non-empty and ≤ 80 characters
245        - Does not end with sentence-ending punctuation (. ! ? , ; :)
246        - Is followed by a blank line or is the first line of the text
247
248        All detected headings are emitted at level 1 (no depth information
249        is available in plain text).
250        """
251        sections: List[Tuple[int, str, str]] = []
252        lines = text.splitlines()
253        current_heading = ""
254        current_level = 0
255        current_body: List[str] = []
256
257        def _flush():
258            body = "\n".join(current_body).strip()
259            if body or current_heading:
260                sections.append((current_level, current_heading, body))
261
262        for i, line in enumerate(lines):
263            stripped = line.strip()
264            next_line = lines[i + 1].strip() if i + 1 < len(lines) else ""
265
266            is_heading = (
267                stripped
268                and len(stripped) <= 80
269                and not stripped[-1] in ".!?,;:"
270                and (not next_line or i == 0)
271            )
272
273            if is_heading:
274                _flush()
275                current_heading = stripped
276                current_level = 1
277                current_body = []
278            else:
279                current_body.append(line)
280
281        _flush()
282        return sections
283
284    # ── Chunk assembly ───────────────────────────────────────────────────────
285
286    def _sections_to_chunks(
287        self,
288        sections: List[Tuple[int, str, str]],
289        metadata: Dict[str, Any],
290    ) -> List[Chunk]:
291        """
292        Merge small sections, split large ones, and produce final Chunk list.
293        """
294        if not sections:
295            return []
296
297        # Merge sections smaller than min_chunk_size into the next one
298        merged = self._merge_small_sections(sections)
299
300        chunks: List[Chunk] = []
301        chunk_index = 0
302
303        for level, heading, body in merged:
304            section_text = f"{heading}\n{body}".strip() if heading else body.strip()
305            if not section_text:
306                continue
307
308            # Split large sections at paragraph boundaries
309            parts = self._split_on_paragraphs(section_text)
310
311            for part in parts:
312                if not part.strip():
313                    continue
314                chunk = Chunk(
315                    text=part.strip(),
316                    metadata={
317                        **metadata,
318                        "section_heading": heading,
319                        "heading_level": level,
320                        "chunking_strategy": "wiki",
321                    },
322                    start_pos=0,
323                    end_pos=len(part),
324                    chunk_id=self._generate_chunk_id(chunk_index, metadata),
325                )
326                chunks.append(chunk)
327                chunk_index += 1
328
329        if self.logger:
330            self.logger.info(
331                "WikiPageChunker: created chunks",
332                chunk_count=len(chunks),
333                section_count=len(sections),
334            )
335
336        return chunks
337
338    def _merge_small_sections(
339        self, sections: List[Tuple[int, str, str]]
340    ) -> List[Tuple[int, str, str]]:
341        """Merge sections whose body is shorter than min_chunk_size into the next."""
342        if not sections:
343            return sections
344
345        merged: List[Tuple[int, str, str]] = []
346        pending_level, pending_heading, pending_body = sections[0]
347
348        for level, heading, body in sections[1:]:
349            if len(pending_body.strip()) < self.min_chunk_size:
350                # Append this section to the pending one
351                separator = "\n\n" if pending_body.strip() else ""
352                if heading:
353                    pending_body = (
354                        pending_body + separator + heading + "\n" + body
355                    )
356                else:
357                    pending_body = pending_body + separator + body
358            else:
359                merged.append((pending_level, pending_heading, pending_body))
360                pending_level, pending_heading, pending_body = level, heading, body
361
362        merged.append((pending_level, pending_heading, pending_body))
363        return merged
364
365    def _split_on_paragraphs(self, text: str) -> List[str]:
366        """
367        Split text that exceeds max_chunk_size at paragraph (double newline) boundaries.
368        Paragraphs that individually exceed max_chunk_size are further split at
369        single-newline boundaries, then at fixed character boundaries as a last
370        resort (handles code blocks and tables with no blank lines).
371        Returns the original text as a single-element list if it fits.
372        """
373        if len(text) <= self.max_chunk_size:
374            return [text]
375
376        paragraphs = re.split(r"\n\n+", text)
377        parts: List[str] = []
378        current_parts: List[str] = []
379        current_size = 0
380
381        for para in paragraphs:
382            para_size = len(para)
383            if current_size + para_size > self.max_chunk_size and current_parts:
384                parts.append("\n\n".join(current_parts))
385                current_parts = []
386                current_size = 0
387            # A single paragraph larger than the limit must be split further
388            if para_size > self.max_chunk_size:
389                if current_parts:
390                    parts.append("\n\n".join(current_parts))
391                    current_parts = []
392                    current_size = 0
393                parts.extend(self._split_long_paragraph(para))
394            else:
395                current_parts.append(para)
396                current_size += para_size + 2  # +2 for \n\n
397
398        if current_parts:
399            parts.append("\n\n".join(current_parts))
400
401        return parts
402
403    def _split_long_paragraph(self, text: str) -> List[str]:
404        """
405        Split a paragraph that exceeds max_chunk_size.
406
407        Tries single-newline boundaries first (handles tables, code blocks, etc.).
408        Falls back to fixed character boundaries for lines that are still too long.
409        """
410        if len(text) <= self.max_chunk_size:
411            return [text]
412
413        lines = text.split("\n")
414        parts: List[str] = []
415        current_lines: List[str] = []
416        current_size = 0
417
418        for line in lines:
419            line_size = len(line)
420            if current_size + line_size > self.max_chunk_size and current_lines:
421                parts.append("\n".join(current_lines))
422                current_lines = []
423                current_size = 0
424            # A single line still over the limit: split at character boundaries
425            if line_size > self.max_chunk_size:
426                if current_lines:
427                    parts.append("\n".join(current_lines))
428                    current_lines = []
429                    current_size = 0
430                for i in range(0, line_size, self.max_chunk_size):
431                    parts.append(line[i : i + self.max_chunk_size])
432            else:
433                current_lines.append(line)
434                current_size += line_size + 1  # +1 for \n
435
436        if current_lines:
437            parts.append("\n".join(current_lines))
438
439        return parts
440
441    # ── HTML utility ─────────────────────────────────────────────────────────
442
443    def _clean_html(self, html: str) -> str:
444        """Strip HTML tags and decode common entities to produce plain text."""
445        text = self._HTML_TAG.sub(" ", html)
446        for entity, char in self._ENTITIES.items():
447            text = text.replace(entity, char)
448        # Collapse runs of whitespace but preserve newlines
449        text = re.sub(r"[ \t]+", " ", text)
450        text = re.sub(r"\n{3,}", "\n\n", text)
451        return text

Chunks wiki pages by splitting on heading boundaries.

Supports three content formats:

  • chunk_markdown(text) — Azure DevOps Wiki pages (Markdown # headings)
  • chunk_html(html) — Confluence storage-format HTML (<h1><h6>)
  • chunk(text) — plain text with heuristic heading detection

The heading text is preserved as the first line of each chunk and also stored in chunk.metadata["section_heading"] and chunk.metadata["heading_level"] for downstream use.

Example (Azure DevOps Wiki):

from gmf_forge_ai_data.chunkers import WikiPageChunker

chunker = WikiPageChunker(max_chunk_size=1500, min_chunk_size=200)

# content is the Markdown string from AzureDevOpsWikiConnector
chunks = chunker.chunk_markdown(content, metadata={"page_path": "/Overview"})

Example (Confluence):

# raw_html is Confluence storage-format body
chunks = chunker.chunk_html(raw_html, metadata={"page_id": "12345"})
WikiPageChunker(max_chunk_size: int = 1500, min_chunk_size: int = 200, logger=None)
65    def __init__(
66        self,
67        max_chunk_size: int = 1500,
68        min_chunk_size: int = 200,
69        logger=None,
70    ):
71        """
72        Args:
73            max_chunk_size: Maximum characters per chunk (default: 1500).
74                            When a section exceeds this, it is split further
75                            at paragraph boundaries.
76            min_chunk_size: Sections smaller than this are merged with the
77                            next section before splitting (default: 200).
78            logger:         Optional BasicLogger instance.
79        """
80        super().__init__(logger)
81
82        if max_chunk_size <= 0:
83            raise ValueError("max_chunk_size must be greater than 0")
84        if min_chunk_size < 0:
85            raise ValueError("min_chunk_size must be >= 0")
86
87        self.max_chunk_size = max_chunk_size
88        self.min_chunk_size = min_chunk_size

Args: max_chunk_size: Maximum characters per chunk (default: 1500). When a section exceeds this, it is split further at paragraph boundaries. min_chunk_size: Sections smaller than this are merged with the next section before splitting (default: 200). logger: Optional BasicLogger instance.

max_chunk_size
min_chunk_size
def chunk_markdown( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
 95    def chunk_markdown(
 96        self,
 97        text: str,
 98        metadata: Optional[Dict[str, Any]] = None,
 99    ) -> List[Chunk]:
100        """
101        Chunk Markdown wiki content (e.g. Azure DevOps Wiki pages).
102
103        Splits at ATX heading boundaries (``#`` through ``######``). This is
104        the preferred entry point for content from AzureDevOpsWikiConnector.
105
106        Args:
107            text:     Markdown content of the wiki page.
108            metadata: Optional metadata to propagate to every chunk.
109
110        Returns:
111            List of Chunk objects with section heading metadata attached.
112        """
113        self.validate_text(text)
114        sections = self._parse_markdown_sections(text)
115        return self._sections_to_chunks(sections, metadata or {})

Chunk Markdown wiki content (e.g. Azure DevOps Wiki pages).

Splits at ATX heading boundaries (# through ######). This is the preferred entry point for content from AzureDevOpsWikiConnector.

Args: text: Markdown content of the wiki page. metadata: Optional metadata to propagate to every chunk.

Returns: List of Chunk objects with section heading metadata attached.

def chunk_html( self, html: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
117    def chunk_html(
118        self,
119        html: str,
120        metadata: Optional[Dict[str, Any]] = None,
121    ) -> List[Chunk]:
122        """
123        Chunk raw Confluence storage-format HTML directly.
124
125        This is the preferred entry point when working with content returned
126        by ConfluenceConnector before HTML stripping has been applied.
127        Headings are extracted from the HTML tags before stripping, which
128        gives more reliable section boundaries than heuristic detection.
129
130        Args:
131            html:     Confluence storage-format HTML body.
132            metadata: Optional metadata to propagate to every chunk.
133
134        Returns:
135            List of Chunk objects with section heading metadata attached.
136        """
137        sections = self._parse_html_sections(html)
138        return self._sections_to_chunks(sections, metadata or {})

Chunk raw Confluence storage-format HTML directly.

This is the preferred entry point when working with content returned by ConfluenceConnector before HTML stripping has been applied. Headings are extracted from the HTML tags before stripping, which gives more reliable section boundaries than heuristic detection.

Args: html: Confluence storage-format HTML body. metadata: Optional metadata to propagate to every chunk.

Returns: List of Chunk objects with section heading metadata attached.

def chunk( self, text: str, metadata: Optional[Dict[str, Any]] = None) -> List[Chunk]:
140    def chunk(
141        self,
142        text: str,
143        metadata: Optional[Dict[str, Any]] = None,
144    ) -> List[Chunk]:
145        """
146        Chunk plain text that has already been HTML-stripped.
147
148        Falls back to heuristic heading detection (short lines followed by
149        a blank line). Prefer ``chunk_html()`` when the original HTML is
150        still available.
151
152        Args:
153            text:     Plain-text content of a wiki page.
154            metadata: Optional metadata to propagate to every chunk.
155
156        Returns:
157            List of Chunk objects.
158        """
159        self.validate_text(text)
160        sections = self._parse_text_sections(text)
161        return self._sections_to_chunks(sections, metadata or {})

Chunk plain text that has already been HTML-stripped.

Falls back to heuristic heading detection (short lines followed by a blank line). Prefer chunk_html() when the original HTML is still available.

Args: text: Plain-text content of a wiki page. metadata: Optional metadata to propagate to every chunk.

Returns: List of Chunk objects.