gmf_forge_ai_data.context
Context processing — post-retrieval, pre-generation context management.
Modules: context_reranker: Reorder retrieved docs by LLM-assessed relevance. context_compressor: Extract only query-relevant sentences from each chunk. relevance_filter: Drop results below a minimum similarity score. context_deduplicator: Remove near-duplicate passages via n-gram Jaccard. context_window_manager: Fit documents into a fixed token budget.
1""" 2Context processing — post-retrieval, pre-generation context management. 3 4Modules: 5 context_reranker: Reorder retrieved docs by LLM-assessed relevance. 6 context_compressor: Extract only query-relevant sentences from each chunk. 7 relevance_filter: Drop results below a minimum similarity score. 8 context_deduplicator: Remove near-duplicate passages via n-gram Jaccard. 9 context_window_manager: Fit documents into a fixed token budget. 10""" 11 12from .context_reranker import ContextReranker 13from .context_compressor import ContextCompressor 14from .relevance_filter import RelevanceFilter 15from .context_deduplicator import ContextDeduplicator 16from .context_window_manager import ContextWindowManager, WindowedContext 17 18__all__ = [ 19 "ContextReranker", 20 "ContextCompressor", 21 "RelevanceFilter", 22 "ContextDeduplicator", 23 "ContextWindowManager", 24 "WindowedContext", 25]
22class ContextReranker: 23 """ 24 Reranks retrieved documents by LLM relevance scoring. 25 26 Sends the query and all retrieved document passages to the LLM and asks it 27 to order them from most to least relevant. Returns a new list of 28 SearchResult objects in the LLM-determined order, with scores reassigned 29 to reflect the new ranking. 30 31 The reranker is most valuable when: 32 - Vector similarity scores are clustered closely (hard to choose top-k) 33 - The query requires reasoning rather than lexical overlap 34 - Final context window is small and every slot matters 35 36 Example: 37 ```python 38 from gmf_forge_ai_data.context import ContextReranker 39 40 reranker = ContextReranker(llm_gateway) 41 reranked = await reranker.rerank( 42 query="What penalties apply for antitrust violations?", 43 results=retrieved_results, 44 top_k=3, 45 ) 46 # reranked[0] is now the most relevant doc by LLM judgement 47 ``` 48 """ 49 50 _RERANK_PROMPT = ( 51 "You are a document relevance ranking assistant for a RAG pipeline.\n\n" 52 "Query: {query}\n\n" 53 "Below are {count} retrieved documents.\n" 54 "Return ONLY a comma-separated list of document indices ordered from MOST " 55 "to LEAST relevant to the query. Include every index exactly once.\n" 56 "Example for 4 documents: 2, 0, 3, 1\n\n" 57 "Documents:\n{documents}\n\n" 58 "Ranked indices (most to least relevant):" 59 ) 60 61 def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0): 62 """ 63 Args: 64 llm_gateway: LLM gateway for relevance assessment. 65 temperature: Sampling temperature (default 0.0 for deterministic 66 ranking). Keep low — ranking should be consistent. 67 """ 68 self.llm_gateway = llm_gateway 69 self.temperature = temperature 70 71 async def rerank( 72 self, 73 query: str, 74 results: List[SearchResult], 75 top_k: Optional[int] = None, 76 ) -> List[SearchResult]: 77 """ 78 Rerank retrieved documents by LLM relevance to the query. 79 80 If the LLM returns an unparseable response, the original order is 81 preserved so the pipeline does not break. 82 83 Args: 84 query: The original user query context for relevance judgement. 85 results: List of SearchResult objects from retrieval. 86 top_k: If provided, return only the top-k results after reranking. 87 88 Returns: 89 SearchResult list in new LLM-determined relevance order, with 90 scores reassigned as 1.0, (n-1)/n, (n-2)/n, ... reflecting rank. 91 """ 92 if not results: 93 return results 94 95 documents_text = "\n\n".join( 96 f"[{i}] {r.document.content}" 97 for i, r in enumerate(results) 98 ) 99 100 prompt = self._RERANK_PROMPT.format( 101 query=query, 102 count=len(results), 103 documents=documents_text, 104 ) 105 106 response = await self.llm_gateway.complete( 107 prompt=prompt, 108 temperature=self.temperature, 109 max_tokens=100, 110 ) 111 112 ranked_indices = self._parse_ranked_indices(response.content, len(results)) 113 114 n = len(ranked_indices) 115 reranked: List[SearchResult] = [] 116 for new_rank, idx in enumerate(ranked_indices): 117 original = results[idx] 118 score = round(1.0 - (new_rank / n), 4) if n > 1 else 1.0 119 reranked.append(SearchResult( 120 document=original.document, 121 score=score, 122 rank=new_rank, 123 )) 124 125 return reranked[:top_k] if top_k is not None else reranked 126 127 @staticmethod 128 def _parse_ranked_indices(text: str, count: int) -> List[int]: 129 """ 130 Parse a comma-separated index list from LLM output. 131 132 Falls back to the original order [0, 1, 2, ...] if the response 133 cannot be parsed or contains out-of-range indices. 134 """ 135 try: 136 parts = [p.strip() for p in text.strip().split(",")] 137 indices = [int(p) for p in parts if p.isdigit()] 138 # Validate: all indices in range, no duplicates 139 if sorted(indices) == list(range(count)): 140 return indices 141 # Partial list: include missing indices at the end 142 seen = set(indices) 143 for i in range(count): 144 if i not in seen: 145 indices.append(i) 146 return indices 147 except (ValueError, AttributeError): 148 return list(range(count))
Reranks retrieved documents by LLM relevance scoring.
Sends the query and all retrieved document passages to the LLM and asks it to order them from most to least relevant. Returns a new list of SearchResult objects in the LLM-determined order, with scores reassigned to reflect the new ranking.
The reranker is most valuable when:
- Vector similarity scores are clustered closely (hard to choose top-k)
- The query requires reasoning rather than lexical overlap
- Final context window is small and every slot matters
Example:
from gmf_forge_ai_data.context import ContextReranker
reranker = ContextReranker(llm_gateway)
reranked = await reranker.rerank(
query="What penalties apply for antitrust violations?",
results=retrieved_results,
top_k=3,
)
# reranked[0] is now the most relevant doc by LLM judgement
61 def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0): 62 """ 63 Args: 64 llm_gateway: LLM gateway for relevance assessment. 65 temperature: Sampling temperature (default 0.0 for deterministic 66 ranking). Keep low — ranking should be consistent. 67 """ 68 self.llm_gateway = llm_gateway 69 self.temperature = temperature
Args: llm_gateway: LLM gateway for relevance assessment. temperature: Sampling temperature (default 0.0 for deterministic ranking). Keep low — ranking should be consistent.
71 async def rerank( 72 self, 73 query: str, 74 results: List[SearchResult], 75 top_k: Optional[int] = None, 76 ) -> List[SearchResult]: 77 """ 78 Rerank retrieved documents by LLM relevance to the query. 79 80 If the LLM returns an unparseable response, the original order is 81 preserved so the pipeline does not break. 82 83 Args: 84 query: The original user query context for relevance judgement. 85 results: List of SearchResult objects from retrieval. 86 top_k: If provided, return only the top-k results after reranking. 87 88 Returns: 89 SearchResult list in new LLM-determined relevance order, with 90 scores reassigned as 1.0, (n-1)/n, (n-2)/n, ... reflecting rank. 91 """ 92 if not results: 93 return results 94 95 documents_text = "\n\n".join( 96 f"[{i}] {r.document.content}" 97 for i, r in enumerate(results) 98 ) 99 100 prompt = self._RERANK_PROMPT.format( 101 query=query, 102 count=len(results), 103 documents=documents_text, 104 ) 105 106 response = await self.llm_gateway.complete( 107 prompt=prompt, 108 temperature=self.temperature, 109 max_tokens=100, 110 ) 111 112 ranked_indices = self._parse_ranked_indices(response.content, len(results)) 113 114 n = len(ranked_indices) 115 reranked: List[SearchResult] = [] 116 for new_rank, idx in enumerate(ranked_indices): 117 original = results[idx] 118 score = round(1.0 - (new_rank / n), 4) if n > 1 else 1.0 119 reranked.append(SearchResult( 120 document=original.document, 121 score=score, 122 rank=new_rank, 123 )) 124 125 return reranked[:top_k] if top_k is not None else reranked
Rerank retrieved documents by LLM relevance to the query.
If the LLM returns an unparseable response, the original order is preserved so the pipeline does not break.
Args: query: The original user query context for relevance judgement. results: List of SearchResult objects from retrieval. top_k: If provided, return only the top-k results after reranking.
Returns: SearchResult list in new LLM-determined relevance order, with scores reassigned as 1.0, (n-1)/n, (n-2)/n, ... reflecting rank.
24class ContextCompressor: 25 """ 26 Compresses retrieved passages to only the query-relevant sentences. 27 28 Calls the LLM once per retrieved document, asking it to extract only the 29 sentences that directly help answer the query. The original SearchResult 30 is preserved — only the `content` field is replaced with the compressed 31 version. Score and rank are unchanged. 32 33 When to use: 34 - Chunks are long (>500 tokens) and only partially relevant 35 - Context window budget is tight 36 - You want to reduce LLM distraction from off-topic content in chunks 37 38 Example: 39 ```python 40 from gmf_forge_ai_data.context import ContextCompressor 41 42 compressor = ContextCompressor(llm_gateway) 43 compressed = await compressor.compress( 44 query="What is self-attention in transformers?", 45 results=reranked_results, 46 ) 47 # Each result.document.content now contains only the relevant sentences 48 ``` 49 """ 50 51 _COMPRESS_PROMPT = ( 52 "You are a document compression assistant for a RAG pipeline.\n\n" 53 "Extract ONLY the sentences from the passage below that are directly " 54 "relevant to answering the query. Drop redundant, tangential, or clearly " 55 "off-topic sentences.\n" 56 "Return ONLY the extracted text — no explanation, no bullet points, no markdown.\n" 57 "If the entire passage is already focused and relevant, return it unchanged.\n\n" 58 "STRICT RULES:\n" 59 "- Copy sentences EXACTLY as they appear in the passage.\n" 60 "- Do NOT paraphrase, summarise, reword, or generate any new text.\n" 61 "- Only output sentences that exist verbatim in the passage.\n\n" 62 "Query: {query}\n\n" 63 "Passage:\n{content}\n\n" 64 "Relevant sentences from the passage (verbatim):" 65 ) 66 67 def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0): 68 """ 69 Args: 70 llm_gateway: LLM gateway for compression. 71 temperature: Sampling temperature (default 0.0 for deterministic 72 extraction). Keep low to avoid hallucination. 73 """ 74 self.llm_gateway = llm_gateway 75 self.temperature = temperature 76 77 async def compress( 78 self, 79 query: str, 80 results: List[SearchResult], 81 min_length: int = 20, 82 ) -> List[SearchResult]: 83 """ 84 Compress each retrieved passage to query-relevant content only. 85 86 Calls the LLM once per result. If the LLM returns content shorter 87 than `min_length` characters (likely an error), the original passage 88 is kept unchanged. 89 90 Args: 91 query: The user query to guide extraction. 92 results: List of SearchResult objects to compress. 93 min_length: Minimum character length for a valid compressed result. 94 If compression produces less, the original is kept. 95 96 Returns: 97 New list of SearchResult objects with compressed document content. 98 """ 99 compressed: List[SearchResult] = [] 100 for result in results: 101 compressed_content = await self._compress_one( 102 query, result.document.content, min_length 103 ) 104 new_doc = copy.copy(result.document) 105 new_doc.content = compressed_content 106 compressed.append(SearchResult( 107 document=new_doc, 108 score=result.score, 109 rank=result.rank, 110 )) 111 return compressed 112 113 async def _compress_one( 114 self, query: str, content: str, min_length: int 115 ) -> str: 116 prompt = self._COMPRESS_PROMPT.format(query=query, content=content) 117 response = await self.llm_gateway.complete( 118 prompt=prompt, 119 temperature=self.temperature, 120 max_tokens=500, 121 ) 122 extracted = response.content.strip().strip('"').strip("'") 123 return extracted if len(extracted) >= min_length else content
Compresses retrieved passages to only the query-relevant sentences.
Calls the LLM once per retrieved document, asking it to extract only the
sentences that directly help answer the query. The original SearchResult
is preserved — only the content field is replaced with the compressed
version. Score and rank are unchanged.
When to use:
- Chunks are long (>500 tokens) and only partially relevant
- Context window budget is tight
- You want to reduce LLM distraction from off-topic content in chunks
Example:
from gmf_forge_ai_data.context import ContextCompressor
compressor = ContextCompressor(llm_gateway)
compressed = await compressor.compress(
query="What is self-attention in transformers?",
results=reranked_results,
)
# Each result.document.content now contains only the relevant sentences
67 def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0): 68 """ 69 Args: 70 llm_gateway: LLM gateway for compression. 71 temperature: Sampling temperature (default 0.0 for deterministic 72 extraction). Keep low to avoid hallucination. 73 """ 74 self.llm_gateway = llm_gateway 75 self.temperature = temperature
Args: llm_gateway: LLM gateway for compression. temperature: Sampling temperature (default 0.0 for deterministic extraction). Keep low to avoid hallucination.
77 async def compress( 78 self, 79 query: str, 80 results: List[SearchResult], 81 min_length: int = 20, 82 ) -> List[SearchResult]: 83 """ 84 Compress each retrieved passage to query-relevant content only. 85 86 Calls the LLM once per result. If the LLM returns content shorter 87 than `min_length` characters (likely an error), the original passage 88 is kept unchanged. 89 90 Args: 91 query: The user query to guide extraction. 92 results: List of SearchResult objects to compress. 93 min_length: Minimum character length for a valid compressed result. 94 If compression produces less, the original is kept. 95 96 Returns: 97 New list of SearchResult objects with compressed document content. 98 """ 99 compressed: List[SearchResult] = [] 100 for result in results: 101 compressed_content = await self._compress_one( 102 query, result.document.content, min_length 103 ) 104 new_doc = copy.copy(result.document) 105 new_doc.content = compressed_content 106 compressed.append(SearchResult( 107 document=new_doc, 108 score=result.score, 109 rank=result.rank, 110 )) 111 return compressed
Compress each retrieved passage to query-relevant content only.
Calls the LLM once per result. If the LLM returns content shorter
than min_length characters (likely an error), the original passage
is kept unchanged.
Args: query: The user query to guide extraction. results: List of SearchResult objects to compress. min_length: Minimum character length for a valid compressed result. If compression produces less, the original is kept.
Returns: New list of SearchResult objects with compressed document content.
20class RelevanceFilter: 21 """ 22 Filters retrieved documents by minimum similarity score. 23 24 Vector stores return results ordered by score but never automatically 25 remove low-confidence matches. The filter enforces a quality floor so 26 that weakly related documents are excluded before they reach the LLM. 27 28 The threshold depends on the embedding model and index: 29 - text-embedding-ada-002 / cosine: 0.75–0.85 is typical 30 - Lower threshold (0.6): broad recall, more noise 31 - Higher threshold (0.9): tight precision, fewer results 32 33 Example: 34 ```python 35 from gmf_forge_ai_data.context import RelevanceFilter 36 37 filter_ = RelevanceFilter(min_score=0.80) 38 kept = filter_.filter(retrieved_results) 39 # Only results with score >= 0.80 are returned 40 ``` 41 """ 42 43 def __init__(self, min_score: float = 0.75): 44 """ 45 Args: 46 min_score: Minimum similarity score in [0, 1] to retain a result. 47 Results with score < min_score are discarded. 48 """ 49 if not 0.0 <= min_score <= 1.0: 50 raise ValueError(f"min_score must be in [0, 1], got {min_score}") 51 self.min_score = min_score 52 53 def filter(self, results: List[SearchResult]) -> List[SearchResult]: 54 """ 55 Remove results below the minimum score threshold. 56 57 Rank values are NOT reassigned — they preserve the original retrieval 58 rank for traceability. Use a downstream component or sort afterward 59 if contiguous ranks are required. 60 61 Args: 62 results: Retrieved SearchResult list, typically sorted by score. 63 64 Returns: 65 Subset of results where score >= min_score, in original order. 66 """ 67 return [r for r in results if r.score >= self.min_score]
Filters retrieved documents by minimum similarity score.
Vector stores return results ordered by score but never automatically remove low-confidence matches. The filter enforces a quality floor so that weakly related documents are excluded before they reach the LLM.
The threshold depends on the embedding model and index:
- text-embedding-ada-002 / cosine: 0.75–0.85 is typical
- Lower threshold (0.6): broad recall, more noise
- Higher threshold (0.9): tight precision, fewer results
Example:
from gmf_forge_ai_data.context import RelevanceFilter
filter_ = RelevanceFilter(min_score=0.80)
kept = filter_.filter(retrieved_results)
# Only results with score >= 0.80 are returned
43 def __init__(self, min_score: float = 0.75): 44 """ 45 Args: 46 min_score: Minimum similarity score in [0, 1] to retain a result. 47 Results with score < min_score are discarded. 48 """ 49 if not 0.0 <= min_score <= 1.0: 50 raise ValueError(f"min_score must be in [0, 1], got {min_score}") 51 self.min_score = min_score
Args: min_score: Minimum similarity score in [0, 1] to retain a result. Results with score < min_score are discarded.
53 def filter(self, results: List[SearchResult]) -> List[SearchResult]: 54 """ 55 Remove results below the minimum score threshold. 56 57 Rank values are NOT reassigned — they preserve the original retrieval 58 rank for traceability. Use a downstream component or sort afterward 59 if contiguous ranks are required. 60 61 Args: 62 results: Retrieved SearchResult list, typically sorted by score. 63 64 Returns: 65 Subset of results where score >= min_score, in original order. 66 """ 67 return [r for r in results if r.score >= self.min_score]
Remove results below the minimum score threshold.
Rank values are NOT reassigned — they preserve the original retrieval rank for traceability. Use a downstream component or sort afterward if contiguous ranks are required.
Args: results: Retrieved SearchResult list, typically sorted by score.
Returns: Subset of results where score >= min_score, in original order.
20class ContextDeduplicator: 21 """ 22 Removes near-duplicate passages from a retrieved result list. 23 24 Two documents are considered near-duplicates if their character trigram 25 Jaccard similarity exceeds `similarity_threshold`. The higher-ranked 26 (lower `rank` value / earlier in list) document is always kept. 27 28 Near-duplicates arise from: 29 - Overlapping chunk windows (sliding-window chunking creates 30–50% overlap) 30 - The same passage indexed in multiple documents 31 - Repeated LLM-generated content in a knowledge base 32 33 Example: 34 ```python 35 from gmf_forge_ai_data.context import ContextDeduplicator 36 37 deduper = ContextDeduplicator(similarity_threshold=0.85) 38 unique = deduper.deduplicate(retrieved_results) 39 # Near-identical passages are removed, keeping the higher-ranked one 40 ``` 41 """ 42 43 def __init__(self, similarity_threshold: float = 0.85, ngram_size: int = 3): 44 """ 45 Args: 46 similarity_threshold: Jaccard similarity above which two documents 47 are treated as duplicates. Range [0, 1]. 48 0.85 catches heavy overlaps; lower values 49 (0.6–0.7) catch paraphrase-level duplicates. 50 ngram_size: Character n-gram size for fingerprinting. 51 3 (trigrams) is a good default. 52 """ 53 if not 0.0 <= similarity_threshold <= 1.0: 54 raise ValueError( 55 f"similarity_threshold must be in [0, 1], got {similarity_threshold}" 56 ) 57 self.similarity_threshold = similarity_threshold 58 self.ngram_size = ngram_size 59 60 def deduplicate(self, results: List[SearchResult]) -> List[SearchResult]: 61 """ 62 Remove near-duplicate passages, keeping the highest-ranked copy. 63 64 Processes results in order (index 0 = highest rank). For each result, 65 checks if it is a near-duplicate of any already-kept result. If so, 66 it is discarded; otherwise it is added to the output. 67 68 Args: 69 results: List of SearchResult objects, ordered by relevance rank. 70 71 Returns: 72 Deduplicated list in the same relative order. 73 """ 74 kept: List[SearchResult] = [] 75 kept_fingerprints: List[Set[str]] = [] 76 77 for result in results: 78 fp = self._fingerprint(result.document.content) 79 if not any( 80 self._jaccard(fp, existing) >= self.similarity_threshold 81 for existing in kept_fingerprints 82 ): 83 kept.append(result) 84 kept_fingerprints.append(fp) 85 86 return kept 87 88 def _fingerprint(self, text: str) -> Set[str]: 89 """Build a character n-gram set for the text.""" 90 n = self.ngram_size 91 text = text.lower() 92 if len(text) < n: 93 return {text} 94 return {text[i:i + n] for i in range(len(text) - n + 1)} 95 96 @staticmethod 97 def _jaccard(a: Set[str], b: Set[str]) -> float: 98 if not a or not b: 99 return 0.0 100 return len(a & b) / len(a | b)
Removes near-duplicate passages from a retrieved result list.
Two documents are considered near-duplicates if their character trigram
Jaccard similarity exceeds similarity_threshold. The higher-ranked
(lower rank value / earlier in list) document is always kept.
Near-duplicates arise from:
- Overlapping chunk windows (sliding-window chunking creates 30–50% overlap)
- The same passage indexed in multiple documents
- Repeated LLM-generated content in a knowledge base
Example:
from gmf_forge_ai_data.context import ContextDeduplicator
deduper = ContextDeduplicator(similarity_threshold=0.85)
unique = deduper.deduplicate(retrieved_results)
# Near-identical passages are removed, keeping the higher-ranked one
43 def __init__(self, similarity_threshold: float = 0.85, ngram_size: int = 3): 44 """ 45 Args: 46 similarity_threshold: Jaccard similarity above which two documents 47 are treated as duplicates. Range [0, 1]. 48 0.85 catches heavy overlaps; lower values 49 (0.6–0.7) catch paraphrase-level duplicates. 50 ngram_size: Character n-gram size for fingerprinting. 51 3 (trigrams) is a good default. 52 """ 53 if not 0.0 <= similarity_threshold <= 1.0: 54 raise ValueError( 55 f"similarity_threshold must be in [0, 1], got {similarity_threshold}" 56 ) 57 self.similarity_threshold = similarity_threshold 58 self.ngram_size = ngram_size
Args: similarity_threshold: Jaccard similarity above which two documents are treated as duplicates. Range [0, 1]. 0.85 catches heavy overlaps; lower values (0.6–0.7) catch paraphrase-level duplicates. ngram_size: Character n-gram size for fingerprinting. 3 (trigrams) is a good default.
60 def deduplicate(self, results: List[SearchResult]) -> List[SearchResult]: 61 """ 62 Remove near-duplicate passages, keeping the highest-ranked copy. 63 64 Processes results in order (index 0 = highest rank). For each result, 65 checks if it is a near-duplicate of any already-kept result. If so, 66 it is discarded; otherwise it is added to the output. 67 68 Args: 69 results: List of SearchResult objects, ordered by relevance rank. 70 71 Returns: 72 Deduplicated list in the same relative order. 73 """ 74 kept: List[SearchResult] = [] 75 kept_fingerprints: List[Set[str]] = [] 76 77 for result in results: 78 fp = self._fingerprint(result.document.content) 79 if not any( 80 self._jaccard(fp, existing) >= self.similarity_threshold 81 for existing in kept_fingerprints 82 ): 83 kept.append(result) 84 kept_fingerprints.append(fp) 85 86 return kept
Remove near-duplicate passages, keeping the highest-ranked copy.
Processes results in order (index 0 = highest rank). For each result, checks if it is a near-duplicate of any already-kept result. If so, it is discarded; otherwise it is added to the output.
Args: results: List of SearchResult objects, ordered by relevance rank.
Returns: Deduplicated list in the same relative order.
42class ContextWindowManager: 43 """ 44 Fits retrieved documents into a maximum token budget. 45 46 Works in two passes: 47 1. Greedily adds full documents until the budget is exhausted. 48 2. If `allow_truncation=True`, the last document that didn't fit is 49 partially included using as many characters as the remaining budget 50 allows. 51 52 Token estimation uses 4 characters ≈ 1 token (standard rule-of-thumb 53 for English text with GPT tokenizers). 54 55 Example: 56 ```python 57 from gmf_forge_ai_data.context import ContextWindowManager 58 59 manager = ContextWindowManager(max_tokens=2000) 60 window = manager.fit(reranked_and_compressed_results) 61 62 print(f"Using {window.total_tokens} / {window.budget} tokens") 63 print(f"Docs included: {len(window.results)}, truncated: {window.truncated}") 64 65 # Build the prompt context string 66 context_str = "\\n\\n".join(r.document.content for r in window.results) 67 ``` 68 """ 69 70 _CHARS_PER_TOKEN: float = 4.0 71 72 def __init__( 73 self, 74 max_tokens: int = 3000, 75 allow_truncation: bool = True, 76 ): 77 """ 78 Args: 79 max_tokens: Maximum number of tokens for all document content 80 combined. Does not include the prompt template 81 overhead — subtract your template size from the 82 model's context limit before setting this. 83 allow_truncation: If True (default), the last document that would 84 overflow is truncated to fit remaining space. 85 If False, it is dropped entirely. 86 """ 87 if max_tokens <= 0: 88 raise ValueError(f"max_tokens must be positive, got {max_tokens}") 89 self.max_tokens = max_tokens 90 self.allow_truncation = allow_truncation 91 92 def fit(self, results: List[SearchResult]) -> WindowedContext: 93 """ 94 Select and optionally truncate documents to fit within the token budget. 95 96 Args: 97 results: List of SearchResult objects, ordered by priority 98 (highest priority first — e.g. after reranking). 99 100 Returns: 101 WindowedContext with the selected results and budget metadata. 102 """ 103 kept: List[SearchResult] = [] 104 tokens_used = 0 105 truncated = False 106 dropped = 0 107 108 for result in results: 109 doc_tokens = self._estimate_tokens(result.document.content) 110 remaining = self.max_tokens - tokens_used 111 112 if doc_tokens <= remaining: 113 kept.append(result) 114 tokens_used += doc_tokens 115 elif self.allow_truncation and remaining > 0 and not truncated: 116 # Partially include this document to fill remaining budget 117 max_chars = int(remaining * self._CHARS_PER_TOKEN) 118 truncated_content = result.document.content[:max_chars] 119 new_doc = copy.copy(result.document) 120 new_doc.content = truncated_content 121 kept.append(SearchResult( 122 document=new_doc, 123 score=result.score, 124 rank=result.rank, 125 )) 126 tokens_used += self._estimate_tokens(truncated_content) 127 truncated = True 128 else: 129 dropped += 1 130 131 return WindowedContext( 132 results=kept, 133 total_tokens=tokens_used, 134 budget=self.max_tokens, 135 truncated=truncated, 136 dropped=dropped, 137 ) 138 139 def _estimate_tokens(self, text: str) -> int: 140 """Estimate token count using 4 characters-per-token heuristic.""" 141 return max(1, round(len(text) / self._CHARS_PER_TOKEN))
Fits retrieved documents into a maximum token budget.
Works in two passes:
- Greedily adds full documents until the budget is exhausted.
- If
allow_truncation=True, the last document that didn't fit is partially included using as many characters as the remaining budget allows.
Token estimation uses 4 characters ≈ 1 token (standard rule-of-thumb for English text with GPT tokenizers).
Example:
from gmf_forge_ai_data.context import ContextWindowManager
manager = ContextWindowManager(max_tokens=2000)
window = manager.fit(reranked_and_compressed_results)
print(f"Using {window.total_tokens} / {window.budget} tokens")
print(f"Docs included: {len(window.results)}, truncated: {window.truncated}")
# Build the prompt context string
context_str = "\n\n".join(r.document.content for r in window.results)
72 def __init__( 73 self, 74 max_tokens: int = 3000, 75 allow_truncation: bool = True, 76 ): 77 """ 78 Args: 79 max_tokens: Maximum number of tokens for all document content 80 combined. Does not include the prompt template 81 overhead — subtract your template size from the 82 model's context limit before setting this. 83 allow_truncation: If True (default), the last document that would 84 overflow is truncated to fit remaining space. 85 If False, it is dropped entirely. 86 """ 87 if max_tokens <= 0: 88 raise ValueError(f"max_tokens must be positive, got {max_tokens}") 89 self.max_tokens = max_tokens 90 self.allow_truncation = allow_truncation
Args: max_tokens: Maximum number of tokens for all document content combined. Does not include the prompt template overhead — subtract your template size from the model's context limit before setting this. allow_truncation: If True (default), the last document that would overflow is truncated to fit remaining space. If False, it is dropped entirely.
92 def fit(self, results: List[SearchResult]) -> WindowedContext: 93 """ 94 Select and optionally truncate documents to fit within the token budget. 95 96 Args: 97 results: List of SearchResult objects, ordered by priority 98 (highest priority first — e.g. after reranking). 99 100 Returns: 101 WindowedContext with the selected results and budget metadata. 102 """ 103 kept: List[SearchResult] = [] 104 tokens_used = 0 105 truncated = False 106 dropped = 0 107 108 for result in results: 109 doc_tokens = self._estimate_tokens(result.document.content) 110 remaining = self.max_tokens - tokens_used 111 112 if doc_tokens <= remaining: 113 kept.append(result) 114 tokens_used += doc_tokens 115 elif self.allow_truncation and remaining > 0 and not truncated: 116 # Partially include this document to fill remaining budget 117 max_chars = int(remaining * self._CHARS_PER_TOKEN) 118 truncated_content = result.document.content[:max_chars] 119 new_doc = copy.copy(result.document) 120 new_doc.content = truncated_content 121 kept.append(SearchResult( 122 document=new_doc, 123 score=result.score, 124 rank=result.rank, 125 )) 126 tokens_used += self._estimate_tokens(truncated_content) 127 truncated = True 128 else: 129 dropped += 1 130 131 return WindowedContext( 132 results=kept, 133 total_tokens=tokens_used, 134 budget=self.max_tokens, 135 truncated=truncated, 136 dropped=dropped, 137 )
Select and optionally truncate documents to fit within the token budget.
Args: results: List of SearchResult objects, ordered by priority (highest priority first — e.g. after reranking).
Returns: WindowedContext with the selected results and budget metadata.
23@dataclass 24class WindowedContext: 25 """ 26 Output of ContextWindowManager.fit(). 27 28 Attributes: 29 results: Documents that fit within the token budget, in order. 30 total_tokens: Estimated token count of all included content. 31 budget: The configured maximum token budget. 32 truncated: True if the last document's content was truncated to fit. 33 dropped: Number of documents that did not fit at all. 34 """ 35 results: List[SearchResult] 36 total_tokens: int 37 budget: int 38 truncated: bool = False 39 dropped: int = 0
Output of ContextWindowManager.fit().
Attributes: results: Documents that fit within the token budget, in order. total_tokens: Estimated token count of all included content. budget: The configured maximum token budget. truncated: True if the last document's content was truncated to fit. dropped: Number of documents that did not fit at all.