gmf_forge_ai_data.query

Query processing — pre-retrieval query optimization strategies.

Modules: query_decomposer: Break complex multi-part queries into focused sub-queries. query_router: Route queries to the most appropriate retriever or index. query_expander: Generate semantically equivalent query variations for better recall. query_rewriter: Clean and clarify queries before retrieval. hyde_generator: Hypothetical Document Embeddings for improved vector search.

 1"""
 2Query processing — pre-retrieval query optimization strategies.
 3
 4Modules:
 5    query_decomposer: Break complex multi-part queries into focused sub-queries.
 6    query_router:     Route queries to the most appropriate retriever or index.
 7    query_expander:   Generate semantically equivalent query variations for better recall.
 8    query_rewriter:   Clean and clarify queries before retrieval.
 9    hyde_generator:   Hypothetical Document Embeddings for improved vector search.
10"""
11
12from .query_decomposer import QueryDecomposer, DecomposedQuery
13from .query_router import QueryRouter, RouteDecision
14from .query_expander import QueryExpander, ExpandedQuery
15from .query_rewriter import QueryRewriter, RewrittenQuery
16from .hyde_generator import HyDEGenerator, HypotheticalDocument
17
18__all__ = [
19    "QueryDecomposer",
20    "DecomposedQuery",
21    "QueryRouter",
22    "RouteDecision",
23    "QueryExpander",
24    "ExpandedQuery",
25    "QueryRewriter",
26    "RewrittenQuery",
27    "HyDEGenerator",
28    "HypotheticalDocument",
29]
class QueryDecomposer:
 32class QueryDecomposer:
 33    """
 34    Decomposes complex multi-part queries into focused sub-queries using LLM.
 35
 36    Multi-part queries ("What are the antitrust laws and what cases were filed in 2024?")
 37    are split into individual queries for better retrieval precision per component.
 38    The sub-queries can then be run in parallel with a retriever and results merged,
 39    similar to query expansion but targeting distinct question atoms rather than synonyms.
 40
 41    Example:
 42        ```python
 43        from gmf_forge_ai_data.query import QueryDecomposer
 44        from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway
 45
 46        gateway = UnifiedLLMGateway(default_provider=azure_provider)
 47        decomposer = QueryDecomposer(gateway)
 48
 49        result = await decomposer.decompose(
 50            "What are the antitrust laws and what cases were filed in 2024?"
 51        )
 52        # result.sub_queries = [
 53        #   "What are the antitrust laws?",
 54        #   "What cases were filed in 2024?",
 55        # ]
 56        ```
 57    """
 58
 59    _DECOMPOSE_PROMPT = (
 60        "You are a query decomposition assistant for a retrieval system.\n\n"
 61        "Break the following complex query into {max_sub_queries} or fewer "
 62        "focused sub-queries.\n"
 63        "Each sub-query must be self-contained and independently answerable.\n"
 64        "Return ONLY a numbered list, one sub-query per line. "
 65        "Do not add explanations.\n\n"
 66        "Query: {query}\n\n"
 67        "Sub-queries:"
 68    )
 69
 70    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
 71        """
 72        Args:
 73            llm_gateway: LLM gateway used for intelligent decomposition.
 74            temperature: Sampling temperature passed to the LLM (default 0.0 for
 75                         deterministic decomposition). Raise slightly (e.g. 0.2)
 76                         to get more varied sub-query boundaries.
 77        """
 78        self.llm_gateway = llm_gateway
 79        self.temperature = temperature
 80
 81    async def decompose(
 82        self,
 83        query: str,
 84        max_sub_queries: int = 3,
 85    ) -> DecomposedQuery:
 86        """
 87        Decompose a complex query into focused sub-queries using LLM.
 88
 89        Args:
 90            query:           The complex query to break apart.
 91            max_sub_queries: Maximum number of sub-queries to produce.
 92
 93        Returns:
 94            DecomposedQuery containing the original and list of sub-queries.
 95        """
 96        prompt = self._DECOMPOSE_PROMPT.format(
 97            query=query,
 98            max_sub_queries=max_sub_queries,
 99        )
100
101        response = await self.llm_gateway.complete(
102            prompt=prompt,
103            temperature=self.temperature,
104            max_tokens=300,
105        )
106
107        sub_queries = self._parse_numbered_list(response.content)
108
109        if not sub_queries:
110            return DecomposedQuery(
111                original=query,
112                sub_queries=[query],
113                reasoning=response.content,
114            )
115
116        return DecomposedQuery(
117            original=query,
118            sub_queries=sub_queries[:max_sub_queries],
119            reasoning=response.content,
120        )
121
122    @staticmethod
123    def _parse_numbered_list(text: str) -> List[str]:
124        """Parse '1. item\\n2. item', '1) item', '- item', '* item' from LLM output."""
125        lines = text.strip().split("\n")
126        results: List[str] = []
127        for line in lines:
128            match = re.match(r"^\s*(?:\d+[.)]\s*|[-*]\s*)(.+)", line)
129            if match:
130                results.append(match.group(1).strip())
131        return results

Decomposes complex multi-part queries into focused sub-queries using LLM.

Multi-part queries ("What are the antitrust laws and what cases were filed in 2024?") are split into individual queries for better retrieval precision per component. The sub-queries can then be run in parallel with a retriever and results merged, similar to query expansion but targeting distinct question atoms rather than synonyms.

Example:

from gmf_forge_ai_data.query import QueryDecomposer
from gmf_forge_ai_shared_core.llm_gateway import UnifiedLLMGateway

gateway = UnifiedLLMGateway(default_provider=azure_provider)
decomposer = QueryDecomposer(gateway)

result = await decomposer.decompose(
    "What are the antitrust laws and what cases were filed in 2024?"
)
# result.sub_queries = [
#   "What are the antitrust laws?",
#   "What cases were filed in 2024?",
# ]
QueryDecomposer( llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, temperature: float = 0.0)
70    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
71        """
72        Args:
73            llm_gateway: LLM gateway used for intelligent decomposition.
74            temperature: Sampling temperature passed to the LLM (default 0.0 for
75                         deterministic decomposition). Raise slightly (e.g. 0.2)
76                         to get more varied sub-query boundaries.
77        """
78        self.llm_gateway = llm_gateway
79        self.temperature = temperature

Args: llm_gateway: LLM gateway used for intelligent decomposition. temperature: Sampling temperature passed to the LLM (default 0.0 for deterministic decomposition). Raise slightly (e.g. 0.2) to get more varied sub-query boundaries.

llm_gateway
temperature
async def decompose( self, query: str, max_sub_queries: int = 3) -> DecomposedQuery:
 81    async def decompose(
 82        self,
 83        query: str,
 84        max_sub_queries: int = 3,
 85    ) -> DecomposedQuery:
 86        """
 87        Decompose a complex query into focused sub-queries using LLM.
 88
 89        Args:
 90            query:           The complex query to break apart.
 91            max_sub_queries: Maximum number of sub-queries to produce.
 92
 93        Returns:
 94            DecomposedQuery containing the original and list of sub-queries.
 95        """
 96        prompt = self._DECOMPOSE_PROMPT.format(
 97            query=query,
 98            max_sub_queries=max_sub_queries,
 99        )
100
101        response = await self.llm_gateway.complete(
102            prompt=prompt,
103            temperature=self.temperature,
104            max_tokens=300,
105        )
106
107        sub_queries = self._parse_numbered_list(response.content)
108
109        if not sub_queries:
110            return DecomposedQuery(
111                original=query,
112                sub_queries=[query],
113                reasoning=response.content,
114            )
115
116        return DecomposedQuery(
117            original=query,
118            sub_queries=sub_queries[:max_sub_queries],
119            reasoning=response.content,
120        )

Decompose a complex query into focused sub-queries using LLM.

Args: query: The complex query to break apart. max_sub_queries: Maximum number of sub-queries to produce.

Returns: DecomposedQuery containing the original and list of sub-queries.

@dataclass
class DecomposedQuery:
17@dataclass
18class DecomposedQuery:
19    """
20    Result of query decomposition.
21
22    Attributes:
23        original:    The original complex query string.
24        sub_queries: List of focused sub-queries derived from the original.
25        reasoning:   Raw LLM response explaining the decomposition.
26    """
27    original: str
28    sub_queries: List[str]
29    reasoning: Optional[str] = None

Result of query decomposition.

Attributes: original: The original complex query string. sub_queries: List of focused sub-queries derived from the original. reasoning: Raw LLM response explaining the decomposition.

DecomposedQuery( original: str, sub_queries: List[str], reasoning: Optional[str] = None)
original: str
sub_queries: List[str]
reasoning: Optional[str] = None
class QueryRouter:
 37class QueryRouter:
 38    """
 39    Routes queries to the appropriate retriever or index using LLM.
 40
 41    Each route has a name and a plain-English description of its content.
 42    The LLM selects the best-matching route for each incoming query.
 43
 44    Typical use in a multi-index RAG system: create one route per Azure AI
 45    Search index and let the router automatically direct queries without
 46    searching all indexes every time.
 47
 48    Example:
 49        ```python
 50        from gmf_forge_ai_data.query import QueryRouter
 51
 52        routes = {
 53            "legal_documents":   "Legal cases, court decisions, jurisdiction, antitrust, patent",
 54            "products":          "Products, prices, inventory, electronics, furniture, camera",
 55            "financial_reports": "Earnings, revenue, fiscal year, company financials, SEC filings",
 56            "ai_ml_knowledge":   "Machine learning, AI, neural networks, deep learning, NLP",
 57        }
 58
 59        router = QueryRouter(routes=routes, llm_gateway=gateway)
 60        decision = await router.route("What antitrust cases were filed in 2024?")
 61        # decision.target = "legal_documents"
 62        # decision.confidence = 0.9
 63        ```
 64    """
 65
 66    _ROUTE_PROMPT = (
 67        "You are a query routing assistant for a multi-domain retrieval system.\n\n"
 68        "Available indexes and what they contain:\n"
 69        "{routes_description}\n\n"
 70        "Given the user query below, output ONLY the name of the single best index "
 71        "to search. Do not add any explanation or punctuation.\n\n"
 72        "Query: {query}\n\n"
 73        "Best index:"
 74    )
 75
 76    def __init__(
 77        self,
 78        routes: Dict[str, str],
 79        llm_gateway: UnifiedLLMGateway,
 80        temperature: float = 0.0,
 81    ):
 82        """
 83        Args:
 84            routes:      Dict mapping route name → plain-English description of content.
 85            llm_gateway: LLM gateway for intelligent routing.
 86            temperature: Sampling temperature passed to the LLM (default 0.0 for
 87                         deterministic routing). Keep low — routing should be consistent.
 88        """
 89        self.routes = routes
 90        self.llm_gateway = llm_gateway
 91        self.temperature = temperature
 92
 93    async def route(self, query: str) -> RouteDecision:
 94        """
 95        Route a query to the best-matching index using LLM.
 96
 97        Args:
 98            query: The user query to route.
 99
100        Returns:
101            RouteDecision with the chosen target and confidence score.
102
103        Raises:
104            ValueError: If the LLM returns an unknown route name.
105        """
106        routes_description = "\n".join(
107            f"- {name}: {desc}" for name, desc in self.routes.items()
108        )
109        prompt = self._ROUTE_PROMPT.format(
110            routes_description=routes_description,
111            query=query,
112        )
113
114        response = await self.llm_gateway.complete(
115            prompt=prompt,
116            temperature=self.temperature,
117            max_tokens=50,
118        )
119
120        target = response.content.strip().strip('"').strip("'")
121
122        if target not in self.routes:
123            raise ValueError(
124                f"LLM returned unknown route '{target}'. "
125                f"Valid routes: {list(self.routes.keys())}"
126            )
127
128        alternatives = [(name, 0.0) for name in self.routes if name != target]
129
130        return RouteDecision(
131            query=query,
132            target=target,
133            confidence=0.9,
134            reasoning=response.content,
135            alternatives=alternatives,
136        )

Routes queries to the appropriate retriever or index using LLM.

Each route has a name and a plain-English description of its content. The LLM selects the best-matching route for each incoming query.

Typical use in a multi-index RAG system: create one route per Azure AI Search index and let the router automatically direct queries without searching all indexes every time.

Example:

from gmf_forge_ai_data.query import QueryRouter

routes = {
    "legal_documents":   "Legal cases, court decisions, jurisdiction, antitrust, patent",
    "products":          "Products, prices, inventory, electronics, furniture, camera",
    "financial_reports": "Earnings, revenue, fiscal year, company financials, SEC filings",
    "ai_ml_knowledge":   "Machine learning, AI, neural networks, deep learning, NLP",
}

router = QueryRouter(routes=routes, llm_gateway=gateway)
decision = await router.route("What antitrust cases were filed in 2024?")
# decision.target = "legal_documents"
# decision.confidence = 0.9
QueryRouter( routes: Dict[str, str], llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, temperature: float = 0.0)
76    def __init__(
77        self,
78        routes: Dict[str, str],
79        llm_gateway: UnifiedLLMGateway,
80        temperature: float = 0.0,
81    ):
82        """
83        Args:
84            routes:      Dict mapping route name → plain-English description of content.
85            llm_gateway: LLM gateway for intelligent routing.
86            temperature: Sampling temperature passed to the LLM (default 0.0 for
87                         deterministic routing). Keep low — routing should be consistent.
88        """
89        self.routes = routes
90        self.llm_gateway = llm_gateway
91        self.temperature = temperature

Args: routes: Dict mapping route name → plain-English description of content. llm_gateway: LLM gateway for intelligent routing. temperature: Sampling temperature passed to the LLM (default 0.0 for deterministic routing). Keep low — routing should be consistent.

routes
llm_gateway
temperature
async def route(self, query: str) -> RouteDecision:
 93    async def route(self, query: str) -> RouteDecision:
 94        """
 95        Route a query to the best-matching index using LLM.
 96
 97        Args:
 98            query: The user query to route.
 99
100        Returns:
101            RouteDecision with the chosen target and confidence score.
102
103        Raises:
104            ValueError: If the LLM returns an unknown route name.
105        """
106        routes_description = "\n".join(
107            f"- {name}: {desc}" for name, desc in self.routes.items()
108        )
109        prompt = self._ROUTE_PROMPT.format(
110            routes_description=routes_description,
111            query=query,
112        )
113
114        response = await self.llm_gateway.complete(
115            prompt=prompt,
116            temperature=self.temperature,
117            max_tokens=50,
118        )
119
120        target = response.content.strip().strip('"').strip("'")
121
122        if target not in self.routes:
123            raise ValueError(
124                f"LLM returned unknown route '{target}'. "
125                f"Valid routes: {list(self.routes.keys())}"
126            )
127
128        alternatives = [(name, 0.0) for name in self.routes if name != target]
129
130        return RouteDecision(
131            query=query,
132            target=target,
133            confidence=0.9,
134            reasoning=response.content,
135            alternatives=alternatives,
136        )

Route a query to the best-matching index using LLM.

Args: query: The user query to route.

Returns: RouteDecision with the chosen target and confidence score.

Raises: ValueError: If the LLM returns an unknown route name.

@dataclass
class RouteDecision:
18@dataclass
19class RouteDecision:
20    """
21    Result of query routing.
22
23    Attributes:
24        query:        The original query string.
25        target:       Name of the chosen route (retriever or index).
26        confidence:   Confidence score in [0, 1] for the chosen route.
27        reasoning:    Raw LLM output.
28        alternatives: Other routes with placeholder confidence scores.
29    """
30    query: str
31    target: str
32    confidence: float
33    reasoning: Optional[str] = None
34    alternatives: List[Tuple[str, float]] = field(default_factory=list)

Result of query routing.

Attributes: query: The original query string. target: Name of the chosen route (retriever or index). confidence: Confidence score in [0, 1] for the chosen route. reasoning: Raw LLM output. alternatives: Other routes with placeholder confidence scores.

RouteDecision( query: str, target: str, confidence: float, reasoning: Optional[str] = None, alternatives: List[Tuple[str, float]] = <factory>)
query: str
target: str
confidence: float
reasoning: Optional[str] = None
alternatives: List[Tuple[str, float]]
class QueryExpander:
 34class QueryExpander:
 35    """
 36    Generates query variations to improve retrieval recall using LLM.
 37
 38    Uses an LLM to produce semantically equivalent re-phrasings of the original
 39    query. Expanded queries are intended to run in parallel with the original query
 40    via separate retriever calls, then merged with Reciprocal Rank Fusion (RRF)
 41    using EnsembleRetriever for best results.
 42
 43    Example:
 44        ```python
 45        from gmf_forge_ai_data.query import QueryExpander
 46
 47        expander = QueryExpander(llm_gateway)
 48        result = await expander.expand("antitrust violations", num_expansions=3)
 49        # result.expansions = [
 50        #   "competition law breaches",
 51        #   "monopoly infringement cases",
 52        #   "anti-competitive conduct",
 53        # ]
 54        ```
 55    """
 56
 57    _EXPAND_PROMPT = (
 58        "You are a search query expansion assistant.\n\n"
 59        "Generate {num_expansions} alternative phrasings for the search query below.\n"
 60        "Use synonyms, related terms, and different wording that conveys the same intent.\n"
 61        "Return ONLY a numbered list, one variation per line. "
 62        "Do NOT repeat the original query.\n\n"
 63        "Original query: {query}\n\n"
 64        "Alternative phrasings:"
 65    )
 66
 67    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.3):
 68        """
 69        Args:
 70            llm_gateway: LLM gateway for generating query variations.
 71            temperature: Sampling temperature passed to the LLM (default 0.3 for
 72                         creative variation). Raise toward 0.7 for more diverse
 73                         phrasings; lower toward 0.0 for tighter paraphrases.
 74        """
 75        self.llm_gateway = llm_gateway
 76        self.temperature = temperature
 77
 78    async def expand(
 79        self,
 80        query: str,
 81        num_expansions: int = 3,
 82    ) -> ExpandedQuery:
 83        """
 84        Expand a query into multiple variations using LLM.
 85
 86        Args:
 87            query:           The original query to expand.
 88            num_expansions:  Number of alternative phrasings to generate.
 89
 90        Returns:
 91            ExpandedQuery with original and list of variation strings.
 92        """
 93        prompt = self._EXPAND_PROMPT.format(
 94            query=query,
 95            num_expansions=num_expansions,
 96        )
 97
 98        response = await self.llm_gateway.complete(
 99            prompt=prompt,
100            temperature=self.temperature,
101            max_tokens=300,
102        )
103
104        expansions = self._parse_numbered_list(response.content)
105
106        return ExpandedQuery(
107            original=query,
108            expansions=expansions[:num_expansions],
109        )
110
111    @staticmethod
112    def _parse_numbered_list(text: str) -> List[str]:
113        """Parse '1. item\\n2. item', '1) item', '- item', '* item' from LLM output."""
114        lines = text.strip().split("\n")
115        results: List[str] = []
116        for line in lines:
117            match = re.match(r"^\s*(?:\d+[.)]\s*|[-*]\s*)(.+)", line)
118            if match:
119                results.append(match.group(1).strip())
120        return results

Generates query variations to improve retrieval recall using LLM.

Uses an LLM to produce semantically equivalent re-phrasings of the original query. Expanded queries are intended to run in parallel with the original query via separate retriever calls, then merged with Reciprocal Rank Fusion (RRF) using EnsembleRetriever for best results.

Example:

from gmf_forge_ai_data.query import QueryExpander

expander = QueryExpander(llm_gateway)
result = await expander.expand("antitrust violations", num_expansions=3)
# result.expansions = [
#   "competition law breaches",
#   "monopoly infringement cases",
#   "anti-competitive conduct",
# ]
QueryExpander( llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, temperature: float = 0.3)
67    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.3):
68        """
69        Args:
70            llm_gateway: LLM gateway for generating query variations.
71            temperature: Sampling temperature passed to the LLM (default 0.3 for
72                         creative variation). Raise toward 0.7 for more diverse
73                         phrasings; lower toward 0.0 for tighter paraphrases.
74        """
75        self.llm_gateway = llm_gateway
76        self.temperature = temperature

Args: llm_gateway: LLM gateway for generating query variations. temperature: Sampling temperature passed to the LLM (default 0.3 for creative variation). Raise toward 0.7 for more diverse phrasings; lower toward 0.0 for tighter paraphrases.

llm_gateway
temperature
async def expand( self, query: str, num_expansions: int = 3) -> ExpandedQuery:
 78    async def expand(
 79        self,
 80        query: str,
 81        num_expansions: int = 3,
 82    ) -> ExpandedQuery:
 83        """
 84        Expand a query into multiple variations using LLM.
 85
 86        Args:
 87            query:           The original query to expand.
 88            num_expansions:  Number of alternative phrasings to generate.
 89
 90        Returns:
 91            ExpandedQuery with original and list of variation strings.
 92        """
 93        prompt = self._EXPAND_PROMPT.format(
 94            query=query,
 95            num_expansions=num_expansions,
 96        )
 97
 98        response = await self.llm_gateway.complete(
 99            prompt=prompt,
100            temperature=self.temperature,
101            max_tokens=300,
102        )
103
104        expansions = self._parse_numbered_list(response.content)
105
106        return ExpandedQuery(
107            original=query,
108            expansions=expansions[:num_expansions],
109        )

Expand a query into multiple variations using LLM.

Args: query: The original query to expand. num_expansions: Number of alternative phrasings to generate.

Returns: ExpandedQuery with original and list of variation strings.

@dataclass
class ExpandedQuery:
21@dataclass
22class ExpandedQuery:
23    """
24    Result of query expansion.
25
26    Attributes:
27        original:   The original query string (not included in expansions list).
28        expansions: Alternative phrasings — run alongside the original query.
29    """
30    original: str
31    expansions: List[str]

Result of query expansion.

Attributes: original: The original query string (not included in expansions list). expansions: Alternative phrasings — run alongside the original query.

ExpandedQuery(original: str, expansions: List[str])
original: str
expansions: List[str]
class QueryRewriter:
 31class QueryRewriter:
 32    """
 33    Improves query quality before retrieval using LLM.
 34
 35    Handles:
 36    - Grammar and spelling fixes
 37    - Replacement of vague terms with specific, domain-appropriate ones
 38    - Removal of conversational filler ("tell me about", "can you find")
 39    - Clarification of ambiguous intent using optional domain context
 40
 41    Example:
 42        ```python
 43        from gmf_forge_ai_data.query import QueryRewriter
 44
 45        rewriter = QueryRewriter(llm_gateway)
 46
 47        result = await rewriter.rewrite(
 48            "tell me the stuff about that apple patent thing",
 49            context="legal documents database"
 50        )
 51        # result.rewritten = "Apple Inc. patent infringement case details"
 52        # result.changes   = ["LLM rewrote: '...' → '...'"]
 53        ```
 54    """
 55
 56    _REWRITE_PROMPT = (
 57        "You are a search query optimization assistant for a document retrieval system.\n\n"
 58        "Rewrite the following query to make it more precise and effective for retrieval:\n"
 59        "- Fix grammar and spelling errors\n"
 60        "- Replace vague or colloquial terms with specific, domain-appropriate ones\n"
 61        "- Remove conversational filler (e.g., 'tell me about', 'can you find')\n"
 62        "- Preserve the original semantic intent\n"
 63        "- Return ONLY the rewritten query — no explanation, no extra text\n\n"
 64        "{context_line}"
 65        "Query: {query}\n\n"
 66        "Rewritten query:"
 67    )
 68
 69    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
 70        """
 71        Args:
 72            llm_gateway: LLM gateway for intelligent query rewriting.
 73            temperature: Sampling temperature passed to the LLM (default 0.0 for
 74                         deterministic rewrites). Keep low — rewriting should
 75                         produce consistent, reproducible output.
 76        """
 77        self.llm_gateway = llm_gateway
 78        self.temperature = temperature
 79
 80    async def rewrite(
 81        self,
 82        query: str,
 83        context: Optional[str] = None,
 84    ) -> RewrittenQuery:
 85        """
 86        Rewrite a query for better retrieval using LLM.
 87
 88        Args:
 89            query:   The original query to improve.
 90            context: Optional domain hint passed to the LLM
 91                     (e.g., "legal documents", "financial filings database").
 92
 93        Returns:
 94            RewrittenQuery with improved text and list of changes made.
 95        """
 96        context_line = f"Domain context: {context}\n\n" if context else ""
 97        prompt = self._REWRITE_PROMPT.format(
 98            query=query,
 99            context_line=context_line,
100        )
101
102        response = await self.llm_gateway.complete(
103            prompt=prompt,
104            temperature=self.temperature,
105            max_tokens=150,
106        )
107
108        rewritten = response.content.strip().strip('"').strip("'")
109
110        if not rewritten or rewritten.lower() == query.lower():
111            return RewrittenQuery(
112                original=query,
113                rewritten=query,
114                changes=["No rewrite needed"],
115            )
116
117        return RewrittenQuery(
118            original=query,
119            rewritten=rewritten,
120            changes=[f"LLM rewrote: '{query}' → '{rewritten}'"],
121        )

Improves query quality before retrieval using LLM.

Handles:

  • Grammar and spelling fixes
  • Replacement of vague terms with specific, domain-appropriate ones
  • Removal of conversational filler ("tell me about", "can you find")
  • Clarification of ambiguous intent using optional domain context

Example:

from gmf_forge_ai_data.query import QueryRewriter

rewriter = QueryRewriter(llm_gateway)

result = await rewriter.rewrite(
    "tell me the stuff about that apple patent thing",
    context="legal documents database"
)
# result.rewritten = "Apple Inc. patent infringement case details"
# result.changes   = ["LLM rewrote: '...' → '...'"]
QueryRewriter( llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, temperature: float = 0.0)
69    def __init__(self, llm_gateway: UnifiedLLMGateway, temperature: float = 0.0):
70        """
71        Args:
72            llm_gateway: LLM gateway for intelligent query rewriting.
73            temperature: Sampling temperature passed to the LLM (default 0.0 for
74                         deterministic rewrites). Keep low — rewriting should
75                         produce consistent, reproducible output.
76        """
77        self.llm_gateway = llm_gateway
78        self.temperature = temperature

Args: llm_gateway: LLM gateway for intelligent query rewriting. temperature: Sampling temperature passed to the LLM (default 0.0 for deterministic rewrites). Keep low — rewriting should produce consistent, reproducible output.

llm_gateway
temperature
async def rewrite( self, query: str, context: Optional[str] = None) -> RewrittenQuery:
 80    async def rewrite(
 81        self,
 82        query: str,
 83        context: Optional[str] = None,
 84    ) -> RewrittenQuery:
 85        """
 86        Rewrite a query for better retrieval using LLM.
 87
 88        Args:
 89            query:   The original query to improve.
 90            context: Optional domain hint passed to the LLM
 91                     (e.g., "legal documents", "financial filings database").
 92
 93        Returns:
 94            RewrittenQuery with improved text and list of changes made.
 95        """
 96        context_line = f"Domain context: {context}\n\n" if context else ""
 97        prompt = self._REWRITE_PROMPT.format(
 98            query=query,
 99            context_line=context_line,
100        )
101
102        response = await self.llm_gateway.complete(
103            prompt=prompt,
104            temperature=self.temperature,
105            max_tokens=150,
106        )
107
108        rewritten = response.content.strip().strip('"').strip("'")
109
110        if not rewritten or rewritten.lower() == query.lower():
111            return RewrittenQuery(
112                original=query,
113                rewritten=query,
114                changes=["No rewrite needed"],
115            )
116
117        return RewrittenQuery(
118            original=query,
119            rewritten=rewritten,
120            changes=[f"LLM rewrote: '{query}' → '{rewritten}'"],
121        )

Rewrite a query for better retrieval using LLM.

Args: query: The original query to improve. context: Optional domain hint passed to the LLM (e.g., "legal documents", "financial filings database").

Returns: RewrittenQuery with improved text and list of changes made.

@dataclass
class RewrittenQuery:
16@dataclass
17class RewrittenQuery:
18    """
19    Result of query rewriting.
20
21    Attributes:
22        original:  The original query string before rewriting.
23        rewritten: The improved query string after rewriting.
24        changes:   Human-readable list of transformations applied.
25    """
26    original: str
27    rewritten: str
28    changes: List[str] = field(default_factory=list)

Result of query rewriting.

Attributes: original: The original query string before rewriting. rewritten: The improved query string after rewriting. changes: Human-readable list of transformations applied.

RewrittenQuery(original: str, rewritten: str, changes: List[str] = <factory>)
original: str
rewritten: str
changes: List[str]
class HyDEGenerator:
 41class HyDEGenerator:
 42    """
 43    Hypothetical Document Embeddings (HyDE) generator.
 44
 45    Why this works:
 46    ---------------
 47    Short query strings ("antitrust cases 2024") and full answer passages live
 48    in very different regions of an embedding space. A hypothetical passage that
 49    ANSWERS the query occupies the same region as real answer documents, so
 50    cosine similarity between the HyDE embedding and indexed document embeddings
 51    is substantially higher than query-vs-document similarity.
 52
 53    Usage pattern:
 54    --------------
 55    1. Call generate_and_embed(query) → HypotheticalDocument (with embedding set).
 56    2. Feed the embedding into VectorRetriever via RetrievalQuery(embedding=...).
 57    3. Compare results against standard VectorRetriever on the same query.
 58
 59    Example:
 60        ```python
 61        from gmf_forge_ai_data.query import HyDEGenerator
 62        from gmf_forge_ai_data.retrieval import VectorRetriever, RetrievalQuery
 63
 64        hyde = HyDEGenerator(llm_gateway=gateway, embedder=embedder)
 65
 66        # Generate hypothetical doc and embed it
 67        hypo = await hyde.generate_and_embed(
 68            "What are the penalties for antitrust violations?",
 69            domain="legal documents"
 70        )
 71
 72        # Use HyDE embedding for retrieval
 73        query = RetrievalQuery(embedding=hypo.embedding, top_k=5)
 74        results = vector_retriever.retrieve(query)
 75        ```
 76    """
 77
 78    _HYDE_PROMPT = (
 79        "Write a concise, authoritative passage that directly answers the question below.\n"
 80        "Write it as if it were an excerpt from a reference document or knowledge base.\n"
 81        "{domain_line}"
 82        "Keep the passage under 150 words. "
 83        "Do not include meta-commentary or mention that this is hypothetical.\n\n"
 84        "Question: {query}\n\n"
 85        "Passage:"
 86    )
 87
 88    def __init__(
 89        self,
 90        llm_gateway: UnifiedLLMGateway,
 91        embedder: Optional[EmbeddingProvider] = None,
 92    ):
 93        """
 94        Initialize the HyDE generator.
 95
 96        Args:
 97            llm_gateway: LLM gateway used to generate the hypothetical document.
 98            embedder:    Embedding provider used to vectorize the hypothetical doc.
 99                         Required only for generate_and_embed(); optional for generate().
100        """
101        self.llm_gateway = llm_gateway
102        self.embedder = embedder
103
104    async def generate(
105        self,
106        query: str,
107        domain: Optional[str] = None,
108    ) -> HypotheticalDocument:
109        """
110        Generate a hypothetical document that would answer the query.
111
112        The returned HypotheticalDocument has embedding=None. Call
113        generate_and_embed() to also produce a vector in one step.
114
115        Args:
116            query:  The retrieval query to generate a passage for.
117            domain: Optional domain hint to guide the LLM style
118                    (e.g., "legal documents", "financial reports", "AI/ML knowledge base").
119
120        Returns:
121            HypotheticalDocument with hypothetical_doc text, embedding=None.
122        """
123        domain_line = f"Domain: {domain}\n" if domain else ""
124        prompt = self._HYDE_PROMPT.format(query=query, domain_line=domain_line)
125
126        response = await self.llm_gateway.complete(
127            prompt=prompt,
128            temperature=0.5,
129            max_tokens=200,
130        )
131
132        return HypotheticalDocument(
133            query=query,
134            hypothetical_doc=response.content.strip(),
135            domain=domain,
136        )
137
138    async def generate_and_embed(
139        self,
140        query: str,
141        domain: Optional[str] = None,
142    ) -> HypotheticalDocument:
143        """
144        Generate a hypothetical document and embed it in a single step.
145
146        Calls generate() then uses the configured embedder to vectorize the
147        resulting passage. The embedding can be passed directly to VectorRetriever
148        via RetrievalQuery(embedding=result.embedding, ...).
149
150        Args:
151            query:  The retrieval query.
152            domain: Optional domain hint for generation style.
153
154        Returns:
155            HypotheticalDocument with both hypothetical_doc and embedding populated.
156
157        Raises:
158            ValueError: If no embedder was provided at construction time.
159        """
160        if not self.embedder:
161            raise ValueError(
162                "An EmbeddingProvider is required for generate_and_embed(). "
163                "Pass embedder= to HyDEGenerator.__init__()."
164            )
165
166        result = await self.generate(query, domain)
167        result.embedding = self.embedder.embed_text(result.hypothetical_doc)
168        return result

Hypothetical Document Embeddings (HyDE) generator.

Why this works:

Short query strings ("antitrust cases 2024") and full answer passages live in very different regions of an embedding space. A hypothetical passage that ANSWERS the query occupies the same region as real answer documents, so cosine similarity between the HyDE embedding and indexed document embeddings is substantially higher than query-vs-document similarity.

Usage pattern:

  1. Call generate_and_embed(query) → HypotheticalDocument (with embedding set).
  2. Feed the embedding into VectorRetriever via RetrievalQuery(embedding=...).
  3. Compare results against standard VectorRetriever on the same query.

Example:

from gmf_forge_ai_data.query import HyDEGenerator
from gmf_forge_ai_data.retrieval import VectorRetriever, RetrievalQuery

hyde = HyDEGenerator(llm_gateway=gateway, embedder=embedder)

# Generate hypothetical doc and embed it
hypo = await hyde.generate_and_embed(
    "What are the penalties for antitrust violations?",
    domain="legal documents"
)

# Use HyDE embedding for retrieval
query = RetrievalQuery(embedding=hypo.embedding, top_k=5)
results = vector_retriever.retrieve(query)
HyDEGenerator( llm_gateway: gmf_forge_ai_shared_core.llm_gateway.UnifiedLLMGateway, embedder: Optional[gmf_forge_ai_data.EmbeddingProvider] = None)
 88    def __init__(
 89        self,
 90        llm_gateway: UnifiedLLMGateway,
 91        embedder: Optional[EmbeddingProvider] = None,
 92    ):
 93        """
 94        Initialize the HyDE generator.
 95
 96        Args:
 97            llm_gateway: LLM gateway used to generate the hypothetical document.
 98            embedder:    Embedding provider used to vectorize the hypothetical doc.
 99                         Required only for generate_and_embed(); optional for generate().
100        """
101        self.llm_gateway = llm_gateway
102        self.embedder = embedder

Initialize the HyDE generator.

Args: llm_gateway: LLM gateway used to generate the hypothetical document. embedder: Embedding provider used to vectorize the hypothetical doc. Required only for generate_and_embed(); optional for generate().

llm_gateway
embedder
async def generate( self, query: str, domain: Optional[str] = None) -> HypotheticalDocument:
104    async def generate(
105        self,
106        query: str,
107        domain: Optional[str] = None,
108    ) -> HypotheticalDocument:
109        """
110        Generate a hypothetical document that would answer the query.
111
112        The returned HypotheticalDocument has embedding=None. Call
113        generate_and_embed() to also produce a vector in one step.
114
115        Args:
116            query:  The retrieval query to generate a passage for.
117            domain: Optional domain hint to guide the LLM style
118                    (e.g., "legal documents", "financial reports", "AI/ML knowledge base").
119
120        Returns:
121            HypotheticalDocument with hypothetical_doc text, embedding=None.
122        """
123        domain_line = f"Domain: {domain}\n" if domain else ""
124        prompt = self._HYDE_PROMPT.format(query=query, domain_line=domain_line)
125
126        response = await self.llm_gateway.complete(
127            prompt=prompt,
128            temperature=0.5,
129            max_tokens=200,
130        )
131
132        return HypotheticalDocument(
133            query=query,
134            hypothetical_doc=response.content.strip(),
135            domain=domain,
136        )

Generate a hypothetical document that would answer the query.

The returned HypotheticalDocument has embedding=None. Call generate_and_embed() to also produce a vector in one step.

Args: query: The retrieval query to generate a passage for. domain: Optional domain hint to guide the LLM style (e.g., "legal documents", "financial reports", "AI/ML knowledge base").

Returns: HypotheticalDocument with hypothetical_doc text, embedding=None.

async def generate_and_embed( self, query: str, domain: Optional[str] = None) -> HypotheticalDocument:
138    async def generate_and_embed(
139        self,
140        query: str,
141        domain: Optional[str] = None,
142    ) -> HypotheticalDocument:
143        """
144        Generate a hypothetical document and embed it in a single step.
145
146        Calls generate() then uses the configured embedder to vectorize the
147        resulting passage. The embedding can be passed directly to VectorRetriever
148        via RetrievalQuery(embedding=result.embedding, ...).
149
150        Args:
151            query:  The retrieval query.
152            domain: Optional domain hint for generation style.
153
154        Returns:
155            HypotheticalDocument with both hypothetical_doc and embedding populated.
156
157        Raises:
158            ValueError: If no embedder was provided at construction time.
159        """
160        if not self.embedder:
161            raise ValueError(
162                "An EmbeddingProvider is required for generate_and_embed(). "
163                "Pass embedder= to HyDEGenerator.__init__()."
164            )
165
166        result = await self.generate(query, domain)
167        result.embedding = self.embedder.embed_text(result.hypothetical_doc)
168        return result

Generate a hypothetical document and embed it in a single step.

Calls generate() then uses the configured embedder to vectorize the resulting passage. The embedding can be passed directly to VectorRetriever via RetrievalQuery(embedding=result.embedding, ...).

Args: query: The retrieval query. domain: Optional domain hint for generation style.

Returns: HypotheticalDocument with both hypothetical_doc and embedding populated.

Raises: ValueError: If no embedder was provided at construction time.

@dataclass
class HypotheticalDocument:
24@dataclass
25class HypotheticalDocument:
26    """
27    Result of HyDE generation.
28
29    Attributes:
30        query:            The original retrieval query.
31        hypothetical_doc: LLM-generated passage that would answer the query.
32        embedding:        Vector embedding of hypothetical_doc (None until embedded).
33        domain:           Optional domain hint that was passed during generation.
34    """
35    query: str
36    hypothetical_doc: str
37    embedding: Optional[List[float]] = None
38    domain: Optional[str] = None

Result of HyDE generation.

Attributes: query: The original retrieval query. hypothetical_doc: LLM-generated passage that would answer the query. embedding: Vector embedding of hypothetical_doc (None until embedded). domain: Optional domain hint that was passed during generation.

HypotheticalDocument( query: str, hypothetical_doc: str, embedding: Optional[List[float]] = None, domain: Optional[str] = None)
query: str
hypothetical_doc: str
embedding: Optional[List[float]] = None
domain: Optional[str] = None