Skip to content

Ask

Bases: Retrieval

Source code in src/engramic/application/retrieve/ask.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
class Ask(Retrieval):
    class Ask(Retrieval):
        """
        Ask is a specifc type of retrieval focused on traditional Q&A. It is a single instanc of an ask.

        This class handles the end-to-end workflow of transforming a raw prompt into
        contextual embeddings, querying the vector database, and returning retrieved engram ids.
        It also supports various conversation analysis and prompt index generation.

        Attributes:
            id (str): A unique identifier for this retrieval session.
            prompt (Prompt): The original prompt provided by the user.
            plugin_manager (PluginManager): Plugin manager used to access LLM, vector DB, and embedding components.
            metrics_tracker (MetricsTracker): Tracks operational metrics for observability.
            db_plugin (dict): Plugin used to interact with the document database.
            service (RetrieveService): Reference to the parent service coordinating this request.
            library (str | None): Optional name of the target library to search within.
            conversation_direction (dict[str, str]): Stores current user intent and working memory.
            prompt_analysis (PromptAnalysis | None): Stores structured analysis of the prompt after processing.

        Methods:
            get_sources():
                Initiates the async pipeline for directional memory retrieval.

            _fetch_history():
                Asynchronously retrieves prior user history from the document DB.

            _retrieve_gen_conversation_direction():
                Uses LLM plugin to extract user intent and conversational memory.

            _embed_gen_direction():
                Converts extracted intent into embeddings. General direction determines intent and manages short term memory.

            _vector_fetch_direction_meta():
                Queries the metadata collection in the vector DB using intent embeddings.

            _fetch_direction_meta():
                Loads Meta objects from the metadata store based on query results.

            _analyze_prompt():
                Uses LLM plugin to analyze the user prompt in the context of metadata.

            _generate_indices():
                Generates semantic indices from the prompt and metadata for retrieval.

            _generate_indicies_embeddings():
                Converts generated index phrases into embeddings.

            _query_index_db():
                Searches the main vector DB with embeddings to identify related engrams.

            on_fetch_history_complete(fut):
                Callback when prompt history fetch is complete; begins direction generation.

            on_direction_ret_complete(fut):
                Callback when conversation direction is generated; begins embedding.

            on_embed_direction_complete(fut):
                Callback when direction embedding is ready; begins vector DB search for metadata.

            on_vector_fetch_direction_meta_complete(fut):
                Callback when metadata vector search is complete; begins metadata fetch.

            on_fetch_direction_meta_complete(fut):
                Callback when metadata objects are fetched; begins analysis and index generation.

            on_analyze_complete(fut):
                Callback when prompt analysis and index generation are complete; begins embedding generation.

            on_indices_embeddings_generated(fut):
                Callback when index embeddings are ready; triggers main index DB query.

            on_query_index_db(fut):
                Final callback when engram retrieval is complete; assembles and emits the result.
        """

    def __init__(
        self,
        ask_id: str,
        prompt: Prompt,
        plugin_manager: PluginManager,
        metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric],
        db_plugin: dict[str, Any],
        service: RetrieveService,
        library: str | None = None,
    ) -> None:
        self.id = ask_id
        self.service = service
        self.metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric] = (
            metrics_tracker
        )
        self.library = library
        self.prompt = prompt
        self.conversation_direction: dict[str, str]
        self.prompt_analysis: PromptAnalysis | None = None
        self.retrieve_gen_conversation_direction_plugin = plugin_manager.get_plugin(
            'llm', 'retrieve_gen_conversation_direction'
        )
        self.prompt_analysis_plugin = plugin_manager.get_plugin('llm', 'retrieve_prompt_analysis')
        self.prompt_retrieve_indices_plugin = plugin_manager.get_plugin('llm', 'retrieve_gen_index')
        self.prompt_vector_db_plugin = plugin_manager.get_plugin('vector_db', 'db')
        self.prompt_db_document_plugin = db_plugin
        self.embeddings_gen_embed = plugin_manager.get_plugin('embedding', 'gen_embed')

    def get_sources(self) -> None:
        direction_step = self.service.run_task(self._fetch_history())
        direction_step.add_done_callback(self.on_fetch_history_complete)

    """
    ### CONVERSATION DIRECTION

    Fetches related domain knowledge based on the prompt intent.
    """

    async def _fetch_history(self) -> list[dict[str, Any]]:
        plugin = self.prompt_db_document_plugin
        args = plugin['args']
        args['history'] = 1

        ret_val = await asyncio.to_thread(plugin['func'].fetch, table=DB.DBTables.HISTORY, ids=[], args=args)
        history_dict: list[dict[str, Any]] = ret_val[0]
        return history_dict

    def on_fetch_history_complete(self, fut: Future[Any]) -> None:
        response_array: list[dict[str, Any]] = fut.result()
        retrieve_gen_conversation_direction_step = self.service.run_task(
            self._retrieve_gen_conversation_direction(response_array)
        )
        retrieve_gen_conversation_direction_step.add_done_callback(self.on_direction_ret_complete)

    async def _retrieve_gen_conversation_direction(self, response_array: list[dict[str, Any]]) -> dict[str, str]:
        if __debug__:
            self.service.send_message_async(self.service.Topic.DEBUG_ASK_CREATED, {'ask_id': self.id})

        input_data = {'history_array': response_array}
        plugin = self.retrieve_gen_conversation_direction_plugin
        # add prompt engineering here and submit as the full prompt.
        prompt_gen = PromptGenConversation(prompt_str=self.prompt.prompt_str, input_data=input_data)

        structured_schema = {'current_user_intent': str, 'working_memory': str}

        ret = await asyncio.to_thread(
            plugin['func'].submit,
            prompt=prompt_gen,
            structured_schema=structured_schema,
            args=self.service.host.mock_update_args(plugin),
        )

        json_parsed: dict[str, str] = json.loads(ret[0]['llm_response'])

        self.conversation_direction = json_parsed

        if __debug__:
            self.service.send_message_async(
                self.service.Topic.DEBUG_CONVERSATION_DIRECTION,
                {'ask_id': self.id, 'prompt': prompt_gen.render_prompt(), 'working_memory': ret[0]['llm_response']},
            )

        self.service.host.update_mock_data(plugin, ret)

        self.metrics_tracker.increment(
            engramic.application.retrieve.retrieve_service.RetrieveMetric.CONVERSATION_DIRECTION_CALCULATED
        )

        return json_parsed

    def on_direction_ret_complete(self, fut: Future[Any]) -> None:
        direction_ret = fut.result()

        logging.debug('current_user_intent: %s', direction_ret)
        intent_and_direction = direction_ret['current_user_intent']

        embed_step = self.service.run_task(self._embed_gen_direction(intent_and_direction))
        embed_step.add_done_callback(self.on_embed_direction_complete)

    async def _embed_gen_direction(self, main_prompt: str) -> list[float]:
        plugin = self.embeddings_gen_embed

        ret = await asyncio.to_thread(
            plugin['func'].gen_embed, strings=[main_prompt], args=self.service.host.mock_update_args(plugin)
        )

        self.service.host.update_mock_data(plugin, ret)

        float_array: list[float] = ret[0]['embeddings_list'][0]
        return float_array

    def on_embed_direction_complete(self, fut: Future[Any]) -> None:
        embedding = fut.result()
        fetch_direction_step = self.service.run_task(self._vector_fetch_direction_meta(embedding))
        fetch_direction_step.add_done_callback(self.on_vector_fetch_direction_meta_complete)

    async def _vector_fetch_direction_meta(self, embedding: list[float]) -> list[str]:
        plugin = self.prompt_vector_db_plugin
        plugin['args'].update({'threshold': 0.6, 'n_results': 5})

        ret = await asyncio.to_thread(
            plugin['func'].query,
            collection_name='meta',
            embeddings=embedding,
            args=self.service.host.mock_update_args(plugin),
        )

        self.service.host.update_mock_data(plugin, ret)

        list_str: list[str] = ret[0]['query_set']
        return list_str

    def on_vector_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
        meta_ids = fut.result()
        meta_fetch_step = self.service.run_task(self._fetch_direction_meta(meta_ids))
        meta_fetch_step.add_done_callback(self.on_fetch_direction_meta_complete)

    async def _fetch_direction_meta(self, meta_id: list[str]) -> list[Meta]:
        meta_list = self.service.meta_repository.load_batch(meta_id)

        if __debug__:
            dict_meta = [meta.summary_full.text if meta.summary_full is not None else '' for meta in meta_list]

            self.service.send_message_async(
                self.service.Topic.DEBUG_ASK_META, {'ask_id': self.id, 'ask_meta': dict_meta}
            )

        return meta_list

    def on_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
        meta_list = fut.result()
        analyze_step = self.service.run_tasks([self._analyze_prompt(meta_list), self._generate_indices(meta_list)])
        analyze_step.add_done_callback(self.on_analyze_complete)

    """
    ### Prompt Analysis

    Analyzies the prompt and generates lookups that will aid in vector searching of related content
    """

    async def _analyze_prompt(self, meta_list: list[Meta]) -> dict[str, str]:
        plugin = self.prompt_analysis_plugin
        # add prompt engineering here and submit as the full prompt.
        prompt = PromptAnalyzePrompt(prompt_str=self.prompt.prompt_str, input_data={'meta_list': meta_list})
        structured_response = {'response_length': str}
        ret = await asyncio.to_thread(
            plugin['func'].submit,
            prompt=prompt,
            structured_schema=structured_response,
            args=self.service.host.mock_update_args(plugin),
        )

        self.service.host.update_mock_data(plugin, ret)

        self.metrics_tracker.increment(engramic.application.retrieve.retrieve_service.RetrieveMetric.PROMPTS_ANALYZED)

        if not isinstance(ret[0], dict):
            error = f'Expected dict[str, str], got {type(ret[0])}'
            raise TypeError(error)

        return ret[0]

    def on_analyze_complete(self, fut: Future[Any]) -> None:
        analysis = fut.result()  # This will raise an exception if the coroutine fails

        self.prompt_analysis = PromptAnalysis(
            json.loads(analysis['_analyze_prompt'][0]['llm_response']),
            json.loads(analysis['_generate_indices'][0]['llm_response']),
        )

        genrate_indices_future = self.service.run_task(
            self._generate_indicies_embeddings(self.prompt_analysis.indices['indices'])
        )
        genrate_indices_future.add_done_callback(self.on_indices_embeddings_generated)

    async def _generate_indices(self, meta_list: list[Meta]) -> dict[str, str]:
        plugin = self.prompt_retrieve_indices_plugin
        # add prompt engineering here and submit as the full prompt.
        prompt = PromptGenIndices(prompt_str=self.prompt.prompt_str, input_data={'meta_list': meta_list})
        structured_output = {'indices': list[str]}
        ret = await asyncio.to_thread(
            plugin['func'].submit,
            prompt=prompt,
            structured_schema=structured_output,
            args=self.service.host.mock_update_args(plugin),
        )

        if __debug__:
            prompt_render = prompt.render_prompt()
            self.service.send_message_async(
                Service.Topic.DEBUG_ASK_INDICES,
                {'ask_id': self.id, 'prompt': prompt_render, 'indices': ret[0]['llm_response']},
            )

        self.service.host.update_mock_data(plugin, ret)
        response = ret[0]['llm_response']
        response_json = json.loads(response)
        count = len(response_json['indices'])
        self.metrics_tracker.increment(
            engramic.application.retrieve.retrieve_service.RetrieveMetric.DYNAMIC_INDICES_GENERATED, count
        )

        if not isinstance(ret[0], dict):
            error = f'Expected dict[str, str], got {type(ret[0])}'
            raise TypeError(error)

        return ret[0]

    def on_indices_embeddings_generated(self, fut: Future[Any]) -> None:
        embeddings = fut.result()

        query_index_db_future = self.service.run_task(self._query_index_db(embeddings))
        query_index_db_future.add_done_callback(self.on_query_index_db)

    async def _generate_indicies_embeddings(self, indices: list[str]) -> list[list[float]]:
        plugin = self.embeddings_gen_embed

        ret = await asyncio.to_thread(
            plugin['func'].gen_embed, strings=indices, args=self.service.host.mock_update_args(plugin)
        )

        self.service.host.update_mock_data(plugin, ret)
        embeddings_list: list[list[float]] = ret[0]['embeddings_list']
        return embeddings_list

    """
    ### Fetch Engram IDs

    Use the indices to fetch related Engram IDs
    """

    async def _query_index_db(self, embeddings: list[list[float]]) -> set[str]:
        plugin = self.prompt_vector_db_plugin

        ids = set()

        ret = await asyncio.to_thread(
            plugin['func'].query,
            collection_name='main',
            embeddings=embeddings,
            args=self.service.host.mock_update_args(plugin),
        )

        self.service.host.update_mock_data(plugin, ret)
        ids.update(ret[0]['query_set'])

        num_queries = len(ids)
        self.metrics_tracker.increment(
            engramic.application.retrieve.retrieve_service.RetrieveMetric.VECTOR_DB_QUERIES, num_queries
        )

        return ids

    def on_query_index_db(self, fut: Future[Any]) -> None:
        ret = fut.result()
        logging.debug('Query Result: %s', ret)

        retrieve_result = RetrieveResult(
            self.id, engram_id_array=list(ret), conversation_direction=self.conversation_direction
        )

        if self.prompt_analysis is None:
            error = 'Prompt analysis None in on_query_index_db'
            raise RuntimeError(error)

        retrieve_response = {
            'analysis': asdict(self.prompt_analysis),
            'prompt_str': self.prompt.prompt_str,
            'retrieve_response': asdict(retrieve_result),
        }

        if __debug__:
            self.service.host.update_mock_data_output(self.service, retrieve_response)

        self.service.send_message_async(Service.Topic.RETRIEVE_COMPLETE, retrieve_response)

Ask

Bases: Retrieval

Ask is a specifc type of retrieval focused on traditional Q&A. It is a single instanc of an ask.

This class handles the end-to-end workflow of transforming a raw prompt into contextual embeddings, querying the vector database, and returning retrieved engram ids. It also supports various conversation analysis and prompt index generation.

Attributes:

Name Type Description
id str

A unique identifier for this retrieval session.

prompt Prompt

The original prompt provided by the user.

plugin_manager PluginManager

Plugin manager used to access LLM, vector DB, and embedding components.

metrics_tracker MetricsTracker

Tracks operational metrics for observability.

db_plugin dict

Plugin used to interact with the document database.

service RetrieveService

Reference to the parent service coordinating this request.

library str | None

Optional name of the target library to search within.

conversation_direction dict[str, str]

Stores current user intent and working memory.

prompt_analysis PromptAnalysis | None

Stores structured analysis of the prompt after processing.

Methods:

Name Description
get_sources

Initiates the async pipeline for directional memory retrieval.

_fetch_history

Asynchronously retrieves prior user history from the document DB.

_retrieve_gen_conversation_direction

Uses LLM plugin to extract user intent and conversational memory.

_embed_gen_direction

Converts extracted intent into embeddings. General direction determines intent and manages short term memory.

_vector_fetch_direction_meta

Queries the metadata collection in the vector DB using intent embeddings.

_fetch_direction_meta

Loads Meta objects from the metadata store based on query results.

_analyze_prompt

Uses LLM plugin to analyze the user prompt in the context of metadata.

_generate_indices

Generates semantic indices from the prompt and metadata for retrieval.

_generate_indicies_embeddings

Converts generated index phrases into embeddings.

_query_index_db

Searches the main vector DB with embeddings to identify related engrams.

on_fetch_history_complete

Callback when prompt history fetch is complete; begins direction generation.

on_direction_ret_complete

Callback when conversation direction is generated; begins embedding.

on_embed_direction_complete

Callback when direction embedding is ready; begins vector DB search for metadata.

on_vector_fetch_direction_meta_complete

Callback when metadata vector search is complete; begins metadata fetch.

on_fetch_direction_meta_complete

Callback when metadata objects are fetched; begins analysis and index generation.

on_analyze_complete

Callback when prompt analysis and index generation are complete; begins embedding generation.

on_indices_embeddings_generated

Callback when index embeddings are ready; triggers main index DB query.

on_query_index_db

Final callback when engram retrieval is complete; assembles and emits the result.

Source code in src/engramic/application/retrieve/ask.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Ask(Retrieval):
    """
    Ask is a specifc type of retrieval focused on traditional Q&A. It is a single instanc of an ask.

    This class handles the end-to-end workflow of transforming a raw prompt into
    contextual embeddings, querying the vector database, and returning retrieved engram ids.
    It also supports various conversation analysis and prompt index generation.

    Attributes:
        id (str): A unique identifier for this retrieval session.
        prompt (Prompt): The original prompt provided by the user.
        plugin_manager (PluginManager): Plugin manager used to access LLM, vector DB, and embedding components.
        metrics_tracker (MetricsTracker): Tracks operational metrics for observability.
        db_plugin (dict): Plugin used to interact with the document database.
        service (RetrieveService): Reference to the parent service coordinating this request.
        library (str | None): Optional name of the target library to search within.
        conversation_direction (dict[str, str]): Stores current user intent and working memory.
        prompt_analysis (PromptAnalysis | None): Stores structured analysis of the prompt after processing.

    Methods:
        get_sources():
            Initiates the async pipeline for directional memory retrieval.

        _fetch_history():
            Asynchronously retrieves prior user history from the document DB.

        _retrieve_gen_conversation_direction():
            Uses LLM plugin to extract user intent and conversational memory.

        _embed_gen_direction():
            Converts extracted intent into embeddings. General direction determines intent and manages short term memory.

        _vector_fetch_direction_meta():
            Queries the metadata collection in the vector DB using intent embeddings.

        _fetch_direction_meta():
            Loads Meta objects from the metadata store based on query results.

        _analyze_prompt():
            Uses LLM plugin to analyze the user prompt in the context of metadata.

        _generate_indices():
            Generates semantic indices from the prompt and metadata for retrieval.

        _generate_indicies_embeddings():
            Converts generated index phrases into embeddings.

        _query_index_db():
            Searches the main vector DB with embeddings to identify related engrams.

        on_fetch_history_complete(fut):
            Callback when prompt history fetch is complete; begins direction generation.

        on_direction_ret_complete(fut):
            Callback when conversation direction is generated; begins embedding.

        on_embed_direction_complete(fut):
            Callback when direction embedding is ready; begins vector DB search for metadata.

        on_vector_fetch_direction_meta_complete(fut):
            Callback when metadata vector search is complete; begins metadata fetch.

        on_fetch_direction_meta_complete(fut):
            Callback when metadata objects are fetched; begins analysis and index generation.

        on_analyze_complete(fut):
            Callback when prompt analysis and index generation are complete; begins embedding generation.

        on_indices_embeddings_generated(fut):
            Callback when index embeddings are ready; triggers main index DB query.

        on_query_index_db(fut):
            Final callback when engram retrieval is complete; assembles and emits the result.
    """