gmf_forge_ai_data.context

Context processing — post-retrieval, pre-generation context management.

Modules: context_reranker: Reorder retrieved docs by LLM-assessed relevance. context_compressor: Extract only query-relevant sentences from each chunk. relevance_filter: Drop results below a minimum similarity score. context_deduplicator: Remove near-duplicate passages via n-gram Jaccard. context_window_manager: Fit documents into a fixed token budget.

 1"""
 2Context processing — post-retrieval, pre-generation context management.
 3
 4Modules:
 5    context_reranker:      Reorder retrieved docs by LLM-assessed relevance.
 6    context_compressor:    Extract only query-relevant sentences from each chunk.
 7    relevance_filter:      Drop results below a minimum similarity score.
 8    context_deduplicator:  Remove near-duplicate passages via n-gram Jaccard.
 9    context_window_manager: Fit documents into a fixed token budget.
10"""
11
12from .context_reranker import ContextReranker
13from .context_compressor import ContextCompressor
14from .relevance_filter import RelevanceFilter
15from .context_deduplicator import ContextDeduplicator
16from .context_window_manager import ContextWindowManager, WindowedContext
17
18__all__ = [
19    "ContextReranker",
20    "ContextCompressor",
21    "RelevanceFilter",
22    "ContextDeduplicator",
23    "ContextWindowManager",
24    "WindowedContext",
25]
class ContextReranker:
 22class ContextReranker:
 23    """
 24    Reranks retrieved documents by LLM relevance scoring.
 25
 26    Sends the query and all retrieved document passages to the LLM and asks it
 27    to order them from most to least relevant. Returns a new list of
 28    SearchResult objects in the LLM-determined order, with scores reassigned
 29    to reflect the new ranking.
 30
 31    The reranker is most valuable when:
 32    - Vector similarity scores are clustered closely (hard to choose top-k)
 33    - The query requires reasoning rather than lexical overlap
 34    - Final context window is small and every slot matters
 35
 36    Example:
 37        ```python
 38        from gmf_forge_ai_data.context import ContextReranker
 39
 40        reranker = ContextReranker(llm_gateway)
 41        reranked = await reranker.rerank(
 42            query="What penalties apply for antitrust violations?",
 43            results=retrieved_results,
 44            top_k=3,
 45        )
 46        # reranked[0] is now the most relevant doc by LLM judgement
 47        ```
 48    """
 49
 50    _RERANK_PROMPT = (
 51        "You are a document relevance ranking assistant for a RAG pipeline.\n\n"
 52        "Query: {query}\n\n"
 53        "Below are {count} retrieved documents.\n"
 54        "Return ONLY a comma-separated list of document indices ordered from MOST "
 55        "to LEAST relevant to the query. Include every index exactly once.\n"
 56        "Example for 4 documents: 2, 0, 3, 1\n\n"
 57        "Documents:\n{documents}\n\n"
 58        "Ranked indices (most to least relevant):"
 59    )
 60
 61    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
 62        """
 63        Args:
 64            llm_gateway: LLM gateway for relevance assessment.
 65            temperature: Sampling temperature (default 0.0 for deterministic
 66                         ranking). Keep low — ranking should be consistent.
 67        """
 68        self.llm_gateway = llm_gateway
 69        self.temperature = temperature
 70
 71    async def rerank(
 72        self,
 73        query: str,
 74        results: List[SearchResult],
 75        top_k: Optional[int] = None,
 76    ) -> List[SearchResult]:
 77        """
 78        Rerank retrieved documents by LLM relevance to the query.
 79
 80        If the LLM returns an unparseable response, the original order is
 81        preserved so the pipeline does not break.
 82
 83        Args:
 84            query:  The original user query context for relevance judgement.
 85            results: List of SearchResult objects from retrieval.
 86            top_k:  If provided, return only the top-k results after reranking.
 87
 88        Returns:
 89            SearchResult list in new LLM-determined relevance order, with
 90            scores reassigned as 1.0, (n-1)/n, (n-2)/n, ... reflecting rank.
 91        """
 92        if not results:
 93            return results
 94
 95        documents_text = "\n\n".join(
 96            f"[{i}] {r.document.content}"
 97            for i, r in enumerate(results)
 98        )
 99
100        prompt = self._RERANK_PROMPT.format(
101            query=query,
102            count=len(results),
103            documents=documents_text,
104        )
105
106        response = await self.llm_gateway.complete(
107            prompt=prompt,
108            temperature=self.temperature,
109            max_tokens=100,
110        )
111
112        ranked_indices = self._parse_ranked_indices(response.content, len(results))
113
114        n = len(ranked_indices)
115        reranked: List[SearchResult] = []
116        for new_rank, idx in enumerate(ranked_indices):
117            original = results[idx]
118            score = round(1.0 - (new_rank / n), 4) if n > 1 else 1.0
119            reranked.append(SearchResult(
120                document=original.document,
121                score=score,
122                rank=new_rank,
123            ))
124
125        return reranked[:top_k] if top_k is not None else reranked
126
127    @staticmethod
128    def _parse_ranked_indices(text: str, count: int) -> List[int]:
129        """
130        Parse a comma-separated index list from LLM output.
131
132        Falls back to the original order [0, 1, 2, ...] if the response
133        cannot be parsed or contains out-of-range indices.
134        """
135        try:
136            parts = [p.strip() for p in text.strip().split(",")]
137            indices = [int(p) for p in parts if p.isdigit()]
138            # Validate: all indices in range, no duplicates
139            if sorted(indices) == list(range(count)):
140                return indices
141            # Partial list: include missing indices at the end
142            seen = set(indices)
143            for i in range(count):
144                if i not in seen:
145                    indices.append(i)
146            return indices
147        except (ValueError, AttributeError):
148            return list(range(count))

Reranks retrieved documents by LLM relevance scoring.

Sends the query and all retrieved document passages to the LLM and asks it to order them from most to least relevant. Returns a new list of SearchResult objects in the LLM-determined order, with scores reassigned to reflect the new ranking.

The reranker is most valuable when:

  • Vector similarity scores are clustered closely (hard to choose top-k)
  • The query requires reasoning rather than lexical overlap
  • Final context window is small and every slot matters

Example:

from gmf_forge_ai_data.context import ContextReranker

reranker = ContextReranker(llm_gateway)
reranked = await reranker.rerank(
    query="What penalties apply for antitrust violations?",
    results=retrieved_results,
    top_k=3,
)
# reranked[0] is now the most relevant doc by LLM judgement
ContextReranker( llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, temperature: float = 0.0)
61    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
62        """
63        Args:
64            llm_gateway: LLM gateway for relevance assessment.
65            temperature: Sampling temperature (default 0.0 for deterministic
66                         ranking). Keep low — ranking should be consistent.
67        """
68        self.llm_gateway = llm_gateway
69        self.temperature = temperature

Args: llm_gateway: LLM gateway for relevance assessment. temperature: Sampling temperature (default 0.0 for deterministic ranking). Keep low — ranking should be consistent.

llm_gateway
temperature
async def rerank( self, query: str, results: List[gmf_forge_ai_data.SearchResult], top_k: Optional[int] = None) -> List[gmf_forge_ai_data.SearchResult]:
 71    async def rerank(
 72        self,
 73        query: str,
 74        results: List[SearchResult],
 75        top_k: Optional[int] = None,
 76    ) -> List[SearchResult]:
 77        """
 78        Rerank retrieved documents by LLM relevance to the query.
 79
 80        If the LLM returns an unparseable response, the original order is
 81        preserved so the pipeline does not break.
 82
 83        Args:
 84            query:  The original user query context for relevance judgement.
 85            results: List of SearchResult objects from retrieval.
 86            top_k:  If provided, return only the top-k results after reranking.
 87
 88        Returns:
 89            SearchResult list in new LLM-determined relevance order, with
 90            scores reassigned as 1.0, (n-1)/n, (n-2)/n, ... reflecting rank.
 91        """
 92        if not results:
 93            return results
 94
 95        documents_text = "\n\n".join(
 96            f"[{i}] {r.document.content}"
 97            for i, r in enumerate(results)
 98        )
 99
100        prompt = self._RERANK_PROMPT.format(
101            query=query,
102            count=len(results),
103            documents=documents_text,
104        )
105
106        response = await self.llm_gateway.complete(
107            prompt=prompt,
108            temperature=self.temperature,
109            max_tokens=100,
110        )
111
112        ranked_indices = self._parse_ranked_indices(response.content, len(results))
113
114        n = len(ranked_indices)
115        reranked: List[SearchResult] = []
116        for new_rank, idx in enumerate(ranked_indices):
117            original = results[idx]
118            score = round(1.0 - (new_rank / n), 4) if n > 1 else 1.0
119            reranked.append(SearchResult(
120                document=original.document,
121                score=score,
122                rank=new_rank,
123            ))
124
125        return reranked[:top_k] if top_k is not None else reranked

Rerank retrieved documents by LLM relevance to the query.

If the LLM returns an unparseable response, the original order is preserved so the pipeline does not break.

Args: query: The original user query context for relevance judgement. results: List of SearchResult objects from retrieval. top_k: If provided, return only the top-k results after reranking.

Returns: SearchResult list in new LLM-determined relevance order, with scores reassigned as 1.0, (n-1)/n, (n-2)/n, ... reflecting rank.

class ContextCompressor:
 24class ContextCompressor:
 25    """
 26    Compresses retrieved passages to only the query-relevant sentences.
 27
 28    Calls the LLM once per retrieved document, asking it to extract only the
 29    sentences that directly help answer the query. The original SearchResult
 30    is preserved — only the `content` field is replaced with the compressed
 31    version. Score and rank are unchanged.
 32
 33    When to use:
 34    - Chunks are long (>500 tokens) and only partially relevant
 35    - Context window budget is tight
 36    - You want to reduce LLM distraction from off-topic content in chunks
 37
 38    Example:
 39        ```python
 40        from gmf_forge_ai_data.context import ContextCompressor
 41
 42        compressor = ContextCompressor(llm_gateway)
 43        compressed = await compressor.compress(
 44            query="What is self-attention in transformers?",
 45            results=reranked_results,
 46        )
 47        # Each result.document.content now contains only the relevant sentences
 48        ```
 49    """
 50
 51    _COMPRESS_PROMPT = (
 52        "You are a document compression assistant for a RAG pipeline.\n\n"
 53        "Extract ONLY the sentences from the passage below that are directly "
 54        "relevant to answering the query. Drop redundant, tangential, or clearly "
 55        "off-topic sentences.\n"
 56        "Return ONLY the extracted text — no explanation, no bullet points, no markdown.\n"
 57        "If the entire passage is already focused and relevant, return it unchanged.\n\n"
 58        "STRICT RULES:\n"
 59        "- Copy sentences EXACTLY as they appear in the passage.\n"
 60        "- Do NOT paraphrase, summarise, reword, or generate any new text.\n"
 61        "- Only output sentences that exist verbatim in the passage.\n\n"
 62        "Query: {query}\n\n"
 63        "Passage:\n{content}\n\n"
 64        "Relevant sentences from the passage (verbatim):"
 65    )
 66
 67    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
 68        """
 69        Args:
 70            llm_gateway: LLM gateway for compression.
 71            temperature: Sampling temperature (default 0.0 for deterministic
 72                         extraction). Keep low to avoid hallucination.
 73        """
 74        self.llm_gateway = llm_gateway
 75        self.temperature = temperature
 76
 77    async def compress(
 78        self,
 79        query: str,
 80        results: List[SearchResult],
 81        min_length: int = 20,
 82    ) -> List[SearchResult]:
 83        """
 84        Compress each retrieved passage to query-relevant content only.
 85
 86        Calls the LLM once per result. If the LLM returns content shorter
 87        than `min_length` characters (likely an error), the original passage
 88        is kept unchanged.
 89
 90        Args:
 91            query:      The user query to guide extraction.
 92            results:    List of SearchResult objects to compress.
 93            min_length: Minimum character length for a valid compressed result.
 94                        If compression produces less, the original is kept.
 95
 96        Returns:
 97            New list of SearchResult objects with compressed document content.
 98        """
 99        compressed: List[SearchResult] = []
100        for result in results:
101            compressed_content = await self._compress_one(
102                query, result.document.content, min_length
103            )
104            new_doc = copy.copy(result.document)
105            new_doc.content = compressed_content
106            compressed.append(SearchResult(
107                document=new_doc,
108                score=result.score,
109                rank=result.rank,
110            ))
111        return compressed
112
113    async def _compress_one(
114        self, query: str, content: str, min_length: int
115    ) -> str:
116        prompt = self._COMPRESS_PROMPT.format(query=query, content=content)
117        response = await self.llm_gateway.complete(
118            prompt=prompt,
119            temperature=self.temperature,
120            max_tokens=500,
121        )
122        extracted = response.content.strip().strip('"').strip("'")
123        return extracted if len(extracted) >= min_length else content

Compresses retrieved passages to only the query-relevant sentences.

Calls the LLM once per retrieved document, asking it to extract only the sentences that directly help answer the query. The original SearchResult is preserved — only the content field is replaced with the compressed version. Score and rank are unchanged.

When to use:

  • Chunks are long (>500 tokens) and only partially relevant
  • Context window budget is tight
  • You want to reduce LLM distraction from off-topic content in chunks

Example:

from gmf_forge_ai_data.context import ContextCompressor

compressor = ContextCompressor(llm_gateway)
compressed = await compressor.compress(
    query="What is self-attention in transformers?",
    results=reranked_results,
)
# Each result.document.content now contains only the relevant sentences
ContextCompressor( llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, temperature: float = 0.0)
67    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
68        """
69        Args:
70            llm_gateway: LLM gateway for compression.
71            temperature: Sampling temperature (default 0.0 for deterministic
72                         extraction). Keep low to avoid hallucination.
73        """
74        self.llm_gateway = llm_gateway
75        self.temperature = temperature

Args: llm_gateway: LLM gateway for compression. temperature: Sampling temperature (default 0.0 for deterministic extraction). Keep low to avoid hallucination.

llm_gateway
temperature
async def compress( self, query: str, results: List[gmf_forge_ai_data.SearchResult], min_length: int = 20) -> List[gmf_forge_ai_data.SearchResult]:
 77    async def compress(
 78        self,
 79        query: str,
 80        results: List[SearchResult],
 81        min_length: int = 20,
 82    ) -> List[SearchResult]:
 83        """
 84        Compress each retrieved passage to query-relevant content only.
 85
 86        Calls the LLM once per result. If the LLM returns content shorter
 87        than `min_length` characters (likely an error), the original passage
 88        is kept unchanged.
 89
 90        Args:
 91            query:      The user query to guide extraction.
 92            results:    List of SearchResult objects to compress.
 93            min_length: Minimum character length for a valid compressed result.
 94                        If compression produces less, the original is kept.
 95
 96        Returns:
 97            New list of SearchResult objects with compressed document content.
 98        """
 99        compressed: List[SearchResult] = []
100        for result in results:
101            compressed_content = await self._compress_one(
102                query, result.document.content, min_length
103            )
104            new_doc = copy.copy(result.document)
105            new_doc.content = compressed_content
106            compressed.append(SearchResult(
107                document=new_doc,
108                score=result.score,
109                rank=result.rank,
110            ))
111        return compressed

Compress each retrieved passage to query-relevant content only.

Calls the LLM once per result. If the LLM returns content shorter than min_length characters (likely an error), the original passage is kept unchanged.

Args: query: The user query to guide extraction. results: List of SearchResult objects to compress. min_length: Minimum character length for a valid compressed result. If compression produces less, the original is kept.

Returns: New list of SearchResult objects with compressed document content.

class RelevanceFilter:
20class RelevanceFilter:
21    """
22    Filters retrieved documents by minimum similarity score.
23
24    Vector stores return results ordered by score but never automatically
25    remove low-confidence matches. The filter enforces a quality floor so
26    that weakly related documents are excluded before they reach the LLM.
27
28    The threshold depends on the embedding model and index:
29    - text-embedding-ada-002 / cosine: 0.75–0.85 is typical
30    - Lower threshold (0.6): broad recall, more noise
31    - Higher threshold (0.9): tight precision, fewer results
32
33    Example:
34        ```python
35        from gmf_forge_ai_data.context import RelevanceFilter
36
37        filter_ = RelevanceFilter(min_score=0.80)
38        kept = filter_.filter(retrieved_results)
39        # Only results with score >= 0.80 are returned
40        ```
41    """
42
43    def __init__(self, min_score: float = 0.75):
44        """
45        Args:
46            min_score: Minimum similarity score in [0, 1] to retain a result.
47                       Results with score < min_score are discarded.
48        """
49        if not 0.0 <= min_score <= 1.0:
50            raise ValueError(f"min_score must be in [0, 1], got {min_score}")
51        self.min_score = min_score
52
53    def filter(self, results: List[SearchResult]) -> List[SearchResult]:
54        """
55        Remove results below the minimum score threshold.
56
57        Rank values are NOT reassigned — they preserve the original retrieval
58        rank for traceability. Use a downstream component or sort afterward
59        if contiguous ranks are required.
60
61        Args:
62            results: Retrieved SearchResult list, typically sorted by score.
63
64        Returns:
65            Subset of results where score >= min_score, in original order.
66        """
67        return [r for r in results if r.score >= self.min_score]

Filters retrieved documents by minimum similarity score.

Vector stores return results ordered by score but never automatically remove low-confidence matches. The filter enforces a quality floor so that weakly related documents are excluded before they reach the LLM.

The threshold depends on the embedding model and index:

  • text-embedding-ada-002 / cosine: 0.75–0.85 is typical
  • Lower threshold (0.6): broad recall, more noise
  • Higher threshold (0.9): tight precision, fewer results

Example:

from gmf_forge_ai_data.context import RelevanceFilter

filter_ = RelevanceFilter(min_score=0.80)
kept = filter_.filter(retrieved_results)
# Only results with score >= 0.80 are returned
RelevanceFilter(min_score: float = 0.75)
43    def __init__(self, min_score: float = 0.75):
44        """
45        Args:
46            min_score: Minimum similarity score in [0, 1] to retain a result.
47                       Results with score < min_score are discarded.
48        """
49        if not 0.0 <= min_score <= 1.0:
50            raise ValueError(f"min_score must be in [0, 1], got {min_score}")
51        self.min_score = min_score

Args: min_score: Minimum similarity score in [0, 1] to retain a result. Results with score < min_score are discarded.

min_score
def filter( self, results: List[gmf_forge_ai_data.SearchResult]) -> List[gmf_forge_ai_data.SearchResult]:
53    def filter(self, results: List[SearchResult]) -> List[SearchResult]:
54        """
55        Remove results below the minimum score threshold.
56
57        Rank values are NOT reassigned — they preserve the original retrieval
58        rank for traceability. Use a downstream component or sort afterward
59        if contiguous ranks are required.
60
61        Args:
62            results: Retrieved SearchResult list, typically sorted by score.
63
64        Returns:
65            Subset of results where score >= min_score, in original order.
66        """
67        return [r for r in results if r.score >= self.min_score]

Remove results below the minimum score threshold.

Rank values are NOT reassigned — they preserve the original retrieval rank for traceability. Use a downstream component or sort afterward if contiguous ranks are required.

Args: results: Retrieved SearchResult list, typically sorted by score.

Returns: Subset of results where score >= min_score, in original order.

class ContextDeduplicator:
 20class ContextDeduplicator:
 21    """
 22    Removes near-duplicate passages from a retrieved result list.
 23
 24    Two documents are considered near-duplicates if their character trigram
 25    Jaccard similarity exceeds `similarity_threshold`. The higher-ranked
 26    (lower `rank` value / earlier in list) document is always kept.
 27
 28    Near-duplicates arise from:
 29    - Overlapping chunk windows (sliding-window chunking creates 30–50% overlap)
 30    - The same passage indexed in multiple documents
 31    - Repeated LLM-generated content in a knowledge base
 32
 33    Example:
 34        ```python
 35        from gmf_forge_ai_data.context import ContextDeduplicator
 36
 37        deduper = ContextDeduplicator(similarity_threshold=0.85)
 38        unique = deduper.deduplicate(retrieved_results)
 39        # Near-identical passages are removed, keeping the higher-ranked one
 40        ```
 41    """
 42
 43    def __init__(self, similarity_threshold: float = 0.85, ngram_size: int = 3):
 44        """
 45        Args:
 46            similarity_threshold: Jaccard similarity above which two documents
 47                                  are treated as duplicates. Range [0, 1].
 48                                  0.85 catches heavy overlaps; lower values
 49                                  (0.6–0.7) catch paraphrase-level duplicates.
 50            ngram_size:           Character n-gram size for fingerprinting.
 51                                  3 (trigrams) is a good default.
 52        """
 53        if not 0.0 <= similarity_threshold <= 1.0:
 54            raise ValueError(
 55                f"similarity_threshold must be in [0, 1], got {similarity_threshold}"
 56            )
 57        self.similarity_threshold = similarity_threshold
 58        self.ngram_size = ngram_size
 59
 60    def deduplicate(self, results: List[SearchResult]) -> List[SearchResult]:
 61        """
 62        Remove near-duplicate passages, keeping the highest-ranked copy.
 63
 64        Processes results in order (index 0 = highest rank). For each result,
 65        checks if it is a near-duplicate of any already-kept result. If so,
 66        it is discarded; otherwise it is added to the output.
 67
 68        Args:
 69            results: List of SearchResult objects, ordered by relevance rank.
 70
 71        Returns:
 72            Deduplicated list in the same relative order.
 73        """
 74        kept: List[SearchResult] = []
 75        kept_fingerprints: List[Set[str]] = []
 76
 77        for result in results:
 78            fp = self._fingerprint(result.document.content)
 79            if not any(
 80                self._jaccard(fp, existing) >= self.similarity_threshold
 81                for existing in kept_fingerprints
 82            ):
 83                kept.append(result)
 84                kept_fingerprints.append(fp)
 85
 86        return kept
 87
 88    def _fingerprint(self, text: str) -> Set[str]:
 89        """Build a character n-gram set for the text."""
 90        n = self.ngram_size
 91        text = text.lower()
 92        if len(text) < n:
 93            return {text}
 94        return {text[i:i + n] for i in range(len(text) - n + 1)}
 95
 96    @staticmethod
 97    def _jaccard(a: Set[str], b: Set[str]) -> float:
 98        if not a or not b:
 99            return 0.0
100        return len(a & b) / len(a | b)

Removes near-duplicate passages from a retrieved result list.

Two documents are considered near-duplicates if their character trigram Jaccard similarity exceeds similarity_threshold. The higher-ranked (lower rank value / earlier in list) document is always kept.

Near-duplicates arise from:

  • Overlapping chunk windows (sliding-window chunking creates 30–50% overlap)
  • The same passage indexed in multiple documents
  • Repeated LLM-generated content in a knowledge base

Example:

from gmf_forge_ai_data.context import ContextDeduplicator

deduper = ContextDeduplicator(similarity_threshold=0.85)
unique = deduper.deduplicate(retrieved_results)
# Near-identical passages are removed, keeping the higher-ranked one
ContextDeduplicator(similarity_threshold: float = 0.85, ngram_size: int = 3)
43    def __init__(self, similarity_threshold: float = 0.85, ngram_size: int = 3):
44        """
45        Args:
46            similarity_threshold: Jaccard similarity above which two documents
47                                  are treated as duplicates. Range [0, 1].
48                                  0.85 catches heavy overlaps; lower values
49                                  (0.6–0.7) catch paraphrase-level duplicates.
50            ngram_size:           Character n-gram size for fingerprinting.
51                                  3 (trigrams) is a good default.
52        """
53        if not 0.0 <= similarity_threshold <= 1.0:
54            raise ValueError(
55                f"similarity_threshold must be in [0, 1], got {similarity_threshold}"
56            )
57        self.similarity_threshold = similarity_threshold
58        self.ngram_size = ngram_size

Args: similarity_threshold: Jaccard similarity above which two documents are treated as duplicates. Range [0, 1]. 0.85 catches heavy overlaps; lower values (0.6–0.7) catch paraphrase-level duplicates. ngram_size: Character n-gram size for fingerprinting. 3 (trigrams) is a good default.

similarity_threshold
ngram_size
def deduplicate( self, results: List[gmf_forge_ai_data.SearchResult]) -> List[gmf_forge_ai_data.SearchResult]:
60    def deduplicate(self, results: List[SearchResult]) -> List[SearchResult]:
61        """
62        Remove near-duplicate passages, keeping the highest-ranked copy.
63
64        Processes results in order (index 0 = highest rank). For each result,
65        checks if it is a near-duplicate of any already-kept result. If so,
66        it is discarded; otherwise it is added to the output.
67
68        Args:
69            results: List of SearchResult objects, ordered by relevance rank.
70
71        Returns:
72            Deduplicated list in the same relative order.
73        """
74        kept: List[SearchResult] = []
75        kept_fingerprints: List[Set[str]] = []
76
77        for result in results:
78            fp = self._fingerprint(result.document.content)
79            if not any(
80                self._jaccard(fp, existing) >= self.similarity_threshold
81                for existing in kept_fingerprints
82            ):
83                kept.append(result)
84                kept_fingerprints.append(fp)
85
86        return kept

Remove near-duplicate passages, keeping the highest-ranked copy.

Processes results in order (index 0 = highest rank). For each result, checks if it is a near-duplicate of any already-kept result. If so, it is discarded; otherwise it is added to the output.

Args: results: List of SearchResult objects, ordered by relevance rank.

Returns: Deduplicated list in the same relative order.

class ContextWindowManager:
 42class ContextWindowManager:
 43    """
 44    Fits retrieved documents into a maximum token budget.
 45
 46    Works in two passes:
 47    1. Greedily adds full documents until the budget is exhausted.
 48    2. If `allow_truncation=True`, the last document that didn't fit is
 49       partially included using as many characters as the remaining budget
 50       allows.
 51
 52    Token estimation uses 4 characters ≈ 1 token (standard rule-of-thumb
 53    for English text with GPT tokenizers).
 54
 55    Example:
 56        ```python
 57        from gmf_forge_ai_data.context import ContextWindowManager
 58
 59        manager = ContextWindowManager(max_tokens=2000)
 60        window = manager.fit(reranked_and_compressed_results)
 61
 62        print(f"Using {window.total_tokens} / {window.budget} tokens")
 63        print(f"Docs included: {len(window.results)}, truncated: {window.truncated}")
 64
 65        # Build the prompt context string
 66        context_str = "\\n\\n".join(r.document.content for r in window.results)
 67        ```
 68    """
 69
 70    _CHARS_PER_TOKEN: float = 4.0
 71
 72    def __init__(
 73        self,
 74        max_tokens: int = 3000,
 75        allow_truncation: bool = True,
 76    ):
 77        """
 78        Args:
 79            max_tokens:        Maximum number of tokens for all document content
 80                               combined. Does not include the prompt template
 81                               overhead — subtract your template size from the
 82                               model's context limit before setting this.
 83            allow_truncation:  If True (default), the last document that would
 84                               overflow is truncated to fit remaining space.
 85                               If False, it is dropped entirely.
 86        """
 87        if max_tokens <= 0:
 88            raise ValueError(f"max_tokens must be positive, got {max_tokens}")
 89        self.max_tokens = max_tokens
 90        self.allow_truncation = allow_truncation
 91
 92    def fit(self, results: List[SearchResult]) -> WindowedContext:
 93        """
 94        Select and optionally truncate documents to fit within the token budget.
 95
 96        Args:
 97            results: List of SearchResult objects, ordered by priority
 98                     (highest priority first — e.g. after reranking).
 99
100        Returns:
101            WindowedContext with the selected results and budget metadata.
102        """
103        kept: List[SearchResult] = []
104        tokens_used = 0
105        truncated = False
106        dropped = 0
107
108        for result in results:
109            doc_tokens = self._estimate_tokens(result.document.content)
110            remaining = self.max_tokens - tokens_used
111
112            if doc_tokens <= remaining:
113                kept.append(result)
114                tokens_used += doc_tokens
115            elif self.allow_truncation and remaining > 0 and not truncated:
116                # Partially include this document to fill remaining budget
117                max_chars = int(remaining * self._CHARS_PER_TOKEN)
118                truncated_content = result.document.content[:max_chars]
119                new_doc = copy.copy(result.document)
120                new_doc.content = truncated_content
121                kept.append(SearchResult(
122                    document=new_doc,
123                    score=result.score,
124                    rank=result.rank,
125                ))
126                tokens_used += self._estimate_tokens(truncated_content)
127                truncated = True
128            else:
129                dropped += 1
130
131        return WindowedContext(
132            results=kept,
133            total_tokens=tokens_used,
134            budget=self.max_tokens,
135            truncated=truncated,
136            dropped=dropped,
137        )
138
139    def _estimate_tokens(self, text: str) -> int:
140        """Estimate token count using 4 characters-per-token heuristic."""
141        return max(1, round(len(text) / self._CHARS_PER_TOKEN))

Fits retrieved documents into a maximum token budget.

Works in two passes:

  1. Greedily adds full documents until the budget is exhausted.
  2. If allow_truncation=True, the last document that didn't fit is partially included using as many characters as the remaining budget allows.

Token estimation uses 4 characters ≈ 1 token (standard rule-of-thumb for English text with GPT tokenizers).

Example:

from gmf_forge_ai_data.context import ContextWindowManager

manager = ContextWindowManager(max_tokens=2000)
window = manager.fit(reranked_and_compressed_results)

print(f"Using {window.total_tokens} / {window.budget} tokens")
print(f"Docs included: {len(window.results)}, truncated: {window.truncated}")

# Build the prompt context string
context_str = "\n\n".join(r.document.content for r in window.results)
ContextWindowManager(max_tokens: int = 3000, allow_truncation: bool = True)
72    def __init__(
73        self,
74        max_tokens: int = 3000,
75        allow_truncation: bool = True,
76    ):
77        """
78        Args:
79            max_tokens:        Maximum number of tokens for all document content
80                               combined. Does not include the prompt template
81                               overhead — subtract your template size from the
82                               model's context limit before setting this.
83            allow_truncation:  If True (default), the last document that would
84                               overflow is truncated to fit remaining space.
85                               If False, it is dropped entirely.
86        """
87        if max_tokens <= 0:
88            raise ValueError(f"max_tokens must be positive, got {max_tokens}")
89        self.max_tokens = max_tokens
90        self.allow_truncation = allow_truncation

Args: max_tokens: Maximum number of tokens for all document content combined. Does not include the prompt template overhead — subtract your template size from the model's context limit before setting this. allow_truncation: If True (default), the last document that would overflow is truncated to fit remaining space. If False, it is dropped entirely.

max_tokens
allow_truncation
def fit( self, results: List[gmf_forge_ai_data.SearchResult]) -> WindowedContext:
 92    def fit(self, results: List[SearchResult]) -> WindowedContext:
 93        """
 94        Select and optionally truncate documents to fit within the token budget.
 95
 96        Args:
 97            results: List of SearchResult objects, ordered by priority
 98                     (highest priority first — e.g. after reranking).
 99
100        Returns:
101            WindowedContext with the selected results and budget metadata.
102        """
103        kept: List[SearchResult] = []
104        tokens_used = 0
105        truncated = False
106        dropped = 0
107
108        for result in results:
109            doc_tokens = self._estimate_tokens(result.document.content)
110            remaining = self.max_tokens - tokens_used
111
112            if doc_tokens <= remaining:
113                kept.append(result)
114                tokens_used += doc_tokens
115            elif self.allow_truncation and remaining > 0 and not truncated:
116                # Partially include this document to fill remaining budget
117                max_chars = int(remaining * self._CHARS_PER_TOKEN)
118                truncated_content = result.document.content[:max_chars]
119                new_doc = copy.copy(result.document)
120                new_doc.content = truncated_content
121                kept.append(SearchResult(
122                    document=new_doc,
123                    score=result.score,
124                    rank=result.rank,
125                ))
126                tokens_used += self._estimate_tokens(truncated_content)
127                truncated = True
128            else:
129                dropped += 1
130
131        return WindowedContext(
132            results=kept,
133            total_tokens=tokens_used,
134            budget=self.max_tokens,
135            truncated=truncated,
136            dropped=dropped,
137        )

Select and optionally truncate documents to fit within the token budget.

Args: results: List of SearchResult objects, ordered by priority (highest priority first — e.g. after reranking).

Returns: WindowedContext with the selected results and budget metadata.

@dataclass
class WindowedContext:
23@dataclass
24class WindowedContext:
25    """
26    Output of ContextWindowManager.fit().
27
28    Attributes:
29        results:      Documents that fit within the token budget, in order.
30        total_tokens: Estimated token count of all included content.
31        budget:       The configured maximum token budget.
32        truncated:    True if the last document's content was truncated to fit.
33        dropped:      Number of documents that did not fit at all.
34    """
35    results: List[SearchResult]
36    total_tokens: int
37    budget: int
38    truncated: bool = False
39    dropped: int = 0

Output of ContextWindowManager.fit().

Attributes: results: Documents that fit within the token budget, in order. total_tokens: Estimated token count of all included content. budget: The configured maximum token budget. truncated: True if the last document's content was truncated to fit. dropped: Number of documents that did not fit at all.

WindowedContext( results: List[gmf_forge_ai_data.SearchResult], total_tokens: int, budget: int, truncated: bool = False, dropped: int = 0)
total_tokens: int
budget: int
truncated: bool = False
dropped: int = 0