Skip to content

Ask

Bases: Retrieval

Handles Q&A retrieval by transforming prompts into contextual embeddings and returning relevant engram IDs.

This class implements the retrieval workflow from raw prompts to vector database queries, supporting conversation analysis and dynamic index generation.

Attributes:

Name Type Description
id str

Unique identifier for this retrieval session.

prompt Prompt

The original prompt provided by the user.

plugin_manager PluginManager

Access to LLM, vector DB, and embedding components.

metrics_tracker MetricsTracker

Tracks operational metrics for observability.

db_plugin dict

Plugin for document database interactions.

service RetrieveService

Parent service coordinating this request.

library str | None

Optional target library to search within.

conversation_direction dict[str, str]

Stores user intent and working memory.

prompt_analysis PromptAnalysis | None

Structured analysis of the prompt.

Methods:

Name Description
get_sources

Initiates the async pipeline for directional memory retrieval.

_fetch_history

Retrieves prior user history from the document DB.

_retrieve_gen_conversation_direction

Extracts user intent and conversational memory.

_embed_gen_direction

Converts extracted intent into embeddings.

_vector_fetch_direction_meta

Queries metadata collection using intent embeddings.

_fetch_direction_meta

Loads Meta objects from metadata store.

_analyze_prompt

Analyzes user prompt in context of metadata.

_generate_indices

Generates semantic indices for retrieval.

_generate_indicies_embeddings

Converts index phrases into embeddings.

_query_index_db

Searches vector DB to identify related engrams.

Source code in src/engramic/application/retrieve/ask.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
class Ask(Retrieval):
    """
    Handles Q&A retrieval by transforming prompts into contextual embeddings and returning relevant engram IDs.

    This class implements the retrieval workflow from raw prompts to vector database queries,
    supporting conversation analysis and dynamic index generation.

    Attributes:
        id (str): Unique identifier for this retrieval session.
        prompt (Prompt): The original prompt provided by the user.
        plugin_manager (PluginManager): Access to LLM, vector DB, and embedding components.
        metrics_tracker (MetricsTracker): Tracks operational metrics for observability.
        db_plugin (dict): Plugin for document database interactions.
        service (RetrieveService): Parent service coordinating this request.
        library (str | None): Optional target library to search within.
        conversation_direction (dict[str, str]): Stores user intent and working memory.
        prompt_analysis (PromptAnalysis | None): Structured analysis of the prompt.

    Methods:
        get_sources() -> None:
            Initiates the async pipeline for directional memory retrieval.
        _fetch_history() -> list[dict[str, Any]]:
            Retrieves prior user history from the document DB.
        _retrieve_gen_conversation_direction(response_array) -> dict[str, str]:
            Extracts user intent and conversational memory.
        _embed_gen_direction(main_prompt) -> list[float]:
            Converts extracted intent into embeddings.
        _vector_fetch_direction_meta(embedding) -> list[str]:
            Queries metadata collection using intent embeddings.
        _fetch_direction_meta(meta_id) -> list[Meta]:
            Loads Meta objects from metadata store.
        _analyze_prompt(meta_list) -> dict[str, str]:
            Analyzes user prompt in context of metadata.
        _generate_indices(meta_list) -> dict[str, str]:
            Generates semantic indices for retrieval.
        _generate_indicies_embeddings(indices) -> list[list[float]]:
            Converts index phrases into embeddings.
        _query_index_db(embeddings) -> set[str]:
            Searches vector DB to identify related engrams.
    """

    def __init__(
        self,
        ask_id: str,
        prompt: Prompt,
        plugin_manager: PluginManager,
        metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric],
        db_plugin: dict[str, Any],
        service: RetrieveService,
        library: str | None = None,
    ) -> None:
        self.id = ask_id
        self.service = service
        self.metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric] = (
            metrics_tracker
        )
        self.library = library
        self.prompt = prompt
        self.conversation_direction: dict[str, str]
        self.prompt_analysis: PromptAnalysis | None = None
        self.retrieve_gen_conversation_direction_plugin = plugin_manager.get_plugin(
            'llm', 'retrieve_gen_conversation_direction'
        )
        self.prompt_analysis_plugin = plugin_manager.get_plugin('llm', 'retrieve_prompt_analysis')
        self.prompt_retrieve_indices_plugin = plugin_manager.get_plugin('llm', 'retrieve_gen_index')
        self.prompt_vector_db_plugin = plugin_manager.get_plugin('vector_db', 'db')
        self.prompt_db_document_plugin = db_plugin
        self.embeddings_gen_embed = plugin_manager.get_plugin('embedding', 'gen_embed')

    def get_sources(self) -> None:
        direction_step = self.service.run_task(self._fetch_history())
        direction_step.add_done_callback(self.on_fetch_history_complete)

    """
    ### CONVERSATION DIRECTION

    Fetches related domain knowledge based on the prompt intent.
    """

    async def _fetch_history(self) -> list[dict[str, Any]]:
        plugin = self.prompt_db_document_plugin
        args = plugin['args']
        args['history_limit'] = 10
        args['repo_ids_filters'] = self.prompt.repo_ids_filters

        ret_val = await asyncio.to_thread(plugin['func'].fetch, table=DB.DBTables.HISTORY, ids=[], args=args)
        history_dict: list[dict[str, Any]] = ret_val[0]
        return history_dict

    def on_fetch_history_complete(self, fut: Future[Any]) -> None:
        response_array: dict[str, Any] = fut.result()
        retrieve_gen_conversation_direction_step = self.service.run_task(
            self._retrieve_gen_conversation_direction(response_array)
        )
        retrieve_gen_conversation_direction_step.add_done_callback(self.on_direction_ret_complete)

    async def _retrieve_gen_conversation_direction(self, response_array: dict[str, Any]) -> dict[str, str]:
        if __debug__:
            self.service.send_message_async(self.service.Topic.DEBUG_ASK_CREATED, {'ask_id': self.id})

        input_data = response_array
        plugin = self.retrieve_gen_conversation_direction_plugin

        if len(self.service.repo_folders.items()) > 0:
            input_data.update({'all_repos': self.service.repo_folders})

        # add prompt engineering here and submit as the full prompt.
        prompt_gen = PromptGenConversation(
            prompt_str=self.prompt.prompt_str, input_data=input_data, repo_ids_filters=self.prompt.repo_ids_filters
        )

        structured_schema = {
            'current_user_intent': str,
            'working_memory_step_1': str,
            'working_memory_step_2': str,
            'working_memory_step_3': str,
            'working_memory_step_4': str,
        }

        ret = await asyncio.to_thread(
            plugin['func'].submit,
            prompt=prompt_gen,
            structured_schema=structured_schema,
            args=self.service.host.mock_update_args(plugin),
            images=None,
        )

        json_parsed: dict[str, str] = json.loads(ret[0]['llm_response'])

        self.conversation_direction = {}
        self.conversation_direction['current_user_intent'] = json_parsed['current_user_intent']

        self.conversation_direction['working_memory'] = json_parsed['working_memory_step_4']

        if __debug__:
            self.service.send_message_async(
                self.service.Topic.DEBUG_CONVERSATION_DIRECTION,
                {'ask_id': self.id, 'prompt': prompt_gen.render_prompt(), 'working_memory': ret[0]['llm_response']},
            )

        self.service.host.update_mock_data(plugin, ret)

        self.metrics_tracker.increment(
            engramic.application.retrieve.retrieve_service.RetrieveMetric.CONVERSATION_DIRECTION_CALCULATED
        )

        return json_parsed

    def on_direction_ret_complete(self, fut: Future[Any]) -> None:
        direction_ret = fut.result()

        logging.debug('current_user_intent: %s', direction_ret)
        intent_and_direction = direction_ret['current_user_intent']

        embed_step = self.service.run_task(self._embed_gen_direction(intent_and_direction))
        embed_step.add_done_callback(self.on_embed_direction_complete)

    async def _embed_gen_direction(self, main_prompt: str) -> list[float]:
        plugin = self.embeddings_gen_embed

        ret = await asyncio.to_thread(
            plugin['func'].gen_embed, strings=[main_prompt], args=self.service.host.mock_update_args(plugin)
        )

        self.service.host.update_mock_data(plugin, ret)

        float_array: list[float] = ret[0]['embeddings_list'][0]
        return float_array

    def on_embed_direction_complete(self, fut: Future[Any]) -> None:
        embedding = fut.result()
        fetch_direction_step = self.service.run_task(self._vector_fetch_direction_meta(embedding))
        fetch_direction_step.add_done_callback(self.on_vector_fetch_direction_meta_complete)

    async def _vector_fetch_direction_meta(self, embedding: list[float]) -> list[str]:
        plugin = self.prompt_vector_db_plugin

        ret = await asyncio.to_thread(
            plugin['func'].query,
            collection_name='meta',
            embeddings=embedding,
            filters=self.prompt.repo_ids_filters,
            args=self.service.host.mock_update_args(plugin),
        )

        self.service.host.update_mock_data(plugin, ret)

        list_str: list[str] = ret[0]['query_set']
        # logging.warning(list_str)
        return list_str

    def on_vector_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
        meta_ids = fut.result()
        meta_fetch_step = self.service.run_task(self._fetch_direction_meta(meta_ids))
        meta_fetch_step.add_done_callback(self.on_fetch_direction_meta_complete)

    async def _fetch_direction_meta(self, meta_id: list[str]) -> list[Meta]:
        meta_list = self.service.meta_repository.load_batch(meta_id)

        if __debug__:
            dict_meta = [meta.summary_full.text if meta.summary_full is not None else '' for meta in meta_list]

            self.service.send_message_async(
                self.service.Topic.DEBUG_ASK_META, {'ask_id': self.id, 'ask_meta': dict_meta}
            )

        return meta_list

    def on_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
        meta_list = fut.result()
        analyze_step = self.service.run_tasks([self._analyze_prompt(meta_list), self._generate_indices(meta_list)])
        analyze_step.add_done_callback(self.on_analyze_complete)

    """
    ### Prompt Analysis

    Analyzies the prompt and generates lookups that will aid in vector searching of related content
    """

    async def _analyze_prompt(self, meta_list: list[Meta]) -> dict[str, str]:
        plugin = self.prompt_analysis_plugin
        # add prompt engineering here and submit as the full prompt.
        prompt = PromptAnalyzePrompt(
            prompt_str=self.prompt.prompt_str,
            input_data={'meta_list': meta_list, 'working_memory': self.conversation_direction['working_memory']},
        )
        structured_response = {'response_length': str, 'user_prompt_type': str, 'thinking_steps': str}
        ret = await asyncio.to_thread(
            plugin['func'].submit,
            prompt=prompt,
            structured_schema=structured_response,
            args=self.service.host.mock_update_args(plugin),
            images=None,
        )

        self.service.host.update_mock_data(plugin, ret)

        self.metrics_tracker.increment(engramic.application.retrieve.retrieve_service.RetrieveMetric.PROMPTS_ANALYZED)

        if not isinstance(ret[0], dict):
            error = f'Expected dict[str, str], got {type(ret[0])}'
            raise TypeError(error)

        return ret[0]

    def on_analyze_complete(self, fut: Future[Any]) -> None:
        analysis = fut.result()  # This will raise an exception if the coroutine fails

        self.prompt_analysis = PromptAnalysis(
            json.loads(analysis['_analyze_prompt'][0]['llm_response']),
            json.loads(analysis['_generate_indices'][0]['llm_response']),
        )

        genrate_indices_future = self.service.run_task(
            self._generate_indicies_embeddings(self.prompt_analysis.indices['indices'])
        )
        genrate_indices_future.add_done_callback(self.on_indices_embeddings_generated)

    async def _generate_indices(self, meta_list: list[Meta]) -> dict[str, str]:
        plugin = self.prompt_retrieve_indices_plugin
        # add prompt engineering here and submit as the full prompt.
        input_data: dict[str, Any] = {'meta_list': meta_list}
        if len(self.service.repo_folders.items()) > 0:
            input_data.update({'all_repos': self.service.repo_folders})

        prompt = PromptGenIndices(
            prompt_str=self.prompt.prompt_str, input_data=input_data, repo_ids_filters=self.prompt.repo_ids_filters
        )
        structured_output = {'indices': list[str]}
        ret = await asyncio.to_thread(
            plugin['func'].submit,
            prompt=prompt,
            structured_schema=structured_output,
            args=self.service.host.mock_update_args(plugin),
            images=None,
        )

        if __debug__:
            prompt_render = prompt.render_prompt()
            self.service.send_message_async(
                Service.Topic.DEBUG_ASK_INDICES,
                {'ask_id': self.id, 'prompt': prompt_render, 'indices': ret[0]['llm_response']},
            )

        self.service.host.update_mock_data(plugin, ret)
        response = ret[0]['llm_response']
        response_json = json.loads(response)
        count = len(response_json['indices'])
        self.metrics_tracker.increment(
            engramic.application.retrieve.retrieve_service.RetrieveMetric.DYNAMIC_INDICES_GENERATED, count
        )

        if not isinstance(ret[0], dict):
            error = f'Expected dict[str, str], got {type(ret[0])}'
            raise TypeError(error)

        return ret[0]

    def on_indices_embeddings_generated(self, fut: Future[Any]) -> None:
        embeddings = fut.result()

        query_index_db_future = self.service.run_task(self._query_index_db(embeddings))
        query_index_db_future.add_done_callback(self.on_query_index_db)

    async def _generate_indicies_embeddings(self, indices: list[str]) -> list[list[float]]:
        plugin = self.embeddings_gen_embed

        ret = await asyncio.to_thread(
            plugin['func'].gen_embed, strings=indices, args=self.service.host.mock_update_args(plugin)
        )

        self.service.host.update_mock_data(plugin, ret)
        embeddings_list: list[list[float]] = ret[0]['embeddings_list']
        return embeddings_list

    """
    ### Fetch Engram IDs

    Use the indices to fetch related Engram IDs
    """

    async def _query_index_db(self, embeddings: list[list[float]]) -> set[str]:
        plugin = self.prompt_vector_db_plugin

        ids = set()

        ret = await asyncio.to_thread(
            plugin['func'].query,
            collection_name='main',
            embeddings=embeddings,
            filters=self.prompt.repo_ids_filters,
            args=self.service.host.mock_update_args(plugin),
        )

        self.service.host.update_mock_data(plugin, ret)
        ids.update(ret[0]['query_set'])

        num_queries = len(ids)
        self.metrics_tracker.increment(
            engramic.application.retrieve.retrieve_service.RetrieveMetric.VECTOR_DB_QUERIES, num_queries
        )

        return ids

    def on_query_index_db(self, fut: Future[Any]) -> None:
        ret = fut.result()
        logging.debug('Query Result: %s', ret)

        if self.prompt_analysis is None:
            error = 'on_query_index_db failed: prompt_analysis is None and likely failed during an earlier process.'
            raise RuntimeError

        retrieve_result = RetrieveResult(
            self.id,
            self.prompt.prompt_id,
            engram_id_array=list(ret),
            conversation_direction=self.conversation_direction,
            analysis=asdict(self.prompt_analysis)['prompt_analysis'],
        )

        if self.prompt_analysis is None:
            error = 'Prompt analysis None in on_query_index_db'
            raise RuntimeError(error)

        retrieve_response = {
            'analysis': asdict(self.prompt_analysis),
            'prompt': asdict(self.prompt),
            'retrieve_response': asdict(retrieve_result),
        }

        if __debug__:
            self.service.host.update_mock_data_output(self.service, retrieve_response)

        self.service.send_message_async(Service.Topic.RETRIEVE_COMPLETE, retrieve_response)