30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484 | class Ask(Retrieval):
"""
Handles Q&A retrieval by transforming prompts into contextual embeddings and returning relevant engram IDs.
This class implements the retrieval workflow from raw prompts to vector database queries,
supporting conversation analysis and dynamic index generation.
Attributes:
id (str): Unique identifier for this retrieval session.
prompt (Prompt): The original prompt provided by the user.
service (RetrieveService): Parent service coordinating this request.
metrics_tracker (MetricsTracker): Tracks operational metrics for observability.
library (str | None): Optional target library to search within.
widget_cmd (Any): Widget command from the prompt.
conversation_direction (dict[str, Any]): Stores user intent and working memory.
prompt_analysis (PromptAnalysis | None): Structured analysis of the prompt.
new_conversation (bool): Flag indicating if this is a new conversation thread.
type_filters (list[str]): Content type filters for vector database queries.
Methods:
get_sources() -> None:
Initiates the async pipeline for directional memory retrieval.
_fetch_history() -> list[dict[str, Any]]:
Retrieves prior conversation history from the document database.
on_fetch_history_complete(fut: Future[Any]) -> None:
Processes history results and initiates conversation direction analysis.
_retrieve_gen_conversation_direction(response_array: dict[str, Any]) -> None:
Extracts user intent and conversational working memory using LLM analysis.
on_direction_ret_complete(fut: Future[Any]) -> None:
Processes conversation direction and initiates embedding generation.
_embed_gen_direction() -> list[float]:
Converts extracted user intent into vector embeddings.
on_embed_direction_complete(fut: Future[Any]) -> None:
Processes intent embeddings and initiates metadata retrieval.
_vector_fetch_direction_meta(intent_embedding: list[float]) -> list[str]:
Queries metadata collection using intent embeddings to find relevant context.
on_vector_fetch_direction_meta_complete(fut: Future[Any]) -> None:
Processes metadata IDs and loads full metadata objects.
_fetch_direction_meta(meta_id: list[str]) -> list[Meta]:
Loads Meta objects from metadata store for context enrichment.
on_fetch_direction_meta_complete(fut: Future[Any]) -> None:
Initiates parallel prompt analysis and index generation.
_analyze_prompt() -> dict[str, Any]:
Analyzes user prompt to determine response requirements and thinking steps.
_generate_indices(meta_list: list[Meta]) -> dict[str, Any]:
Generates semantic search indices based on prompt and metadata context.
on_analyze_complete(fut: Future[Any]) -> None:
Processes analysis results and initiates index embedding generation.
_generate_indicies_embeddings(indices: list[str]) -> list[list[float]]:
Converts generated index phrases into vector embeddings.
on_indices_embeddings_generated(fut: Future[Any]) -> None:
Processes index embeddings and initiates final vector database query.
_query_index_db(embeddings: list[list[float]]) -> set[str]:
Searches main vector database to identify related engram IDs.
on_query_index_db(fut: Future[Any]) -> None:
Finalizes retrieval results and sends completion message.
"""
def __init__(
self,
ask_id: str,
prompt: Prompt,
plugin_manager: PluginManager,
metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric],
db_plugin: dict[str, Any],
service: RetrieveService,
library: str | None = None,
) -> None:
self.id = ask_id
self.service = service
self.metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric] = (
metrics_tracker
)
self.library = library
self.prompt = prompt
self.widget_cmd = None
self.conversation_direction: dict[str, Any]
self.prompt_analysis: PromptAnalysis | None = None
self.retrieve_gen_conversation_direction_plugin = plugin_manager.get_plugin(
'llm', 'retrieve_gen_conversation_direction'
)
self.prompt_analysis_plugin = plugin_manager.get_plugin('llm', 'retrieve_prompt_analysis')
self.prompt_retrieve_indices_plugin = plugin_manager.get_plugin('llm', 'retrieve_gen_index')
self.prompt_vector_db_plugin = plugin_manager.get_plugin('vector_db', 'db')
self.prompt_db_document_plugin = db_plugin
self.embeddings_gen_embed = plugin_manager.get_plugin('embedding', 'gen_embed')
def get_sources(self) -> None:
direction_step = self.service.run_task(self._fetch_history())
direction_step.add_done_callback(self.on_fetch_history_complete)
"""
### CONVERSATION DIRECTION
Fetches related domain knowledge based on the prompt intent.
"""
async def _fetch_history(self) -> list[dict[str, Any]]:
plugin = self.prompt_db_document_plugin
args = plugin['args']
args['history_limit'] = 1
args['repo_ids_filters'] = self.prompt.repo_ids_filters
args['conversation_id'] = self.prompt.conversation_id
ret_val = await asyncio.to_thread(plugin['func'].fetch, table=DB.DBTables.HISTORY, ids=[], args=args)
history_dict: list[dict[str, Any]] = ret_val[0]
return history_dict
def on_fetch_history_complete(self, fut: Future[Any]) -> None:
response_array: dict[str, Any] = fut.result()
if response_array['history']:
self.new_conversation = (
response_array['history'][0]['prompt']['conversation_id'] != self.prompt.conversation_id
)
else:
self.new_conversation = True
retrieve_gen_conversation_direction_step = self.service.run_task(
self._retrieve_gen_conversation_direction(response_array)
)
retrieve_gen_conversation_direction_step.add_done_callback(self.on_direction_ret_complete)
async def _retrieve_gen_conversation_direction(self, response_array: dict[str, Any]) -> None:
if __debug__:
self.service.send_message_async(self.service.Topic.DEBUG_ASK_CREATED, {'ask_id': self.id})
input_data: dict[str, Any] = response_array
plugin = self.retrieve_gen_conversation_direction_plugin
if len(self.service.repo_folders.items()) > 0:
input_data.update({'all_repos': self.service.repo_folders})
else:
input_data.update({'all_repos': None})
self.conversation_direction = {}
if self.prompt.widget_cmd:
input_data.update({'current_engramic_widget': self.prompt.widget_cmd})
# add prompt engineering here and submit as the full prompt.
prompt_gen = PromptGenConversation(
prompt_str=self.prompt.prompt_str, input_data=input_data, repo_ids_filters=self.prompt.repo_ids_filters
)
structured_schema = {
'current_user_intent': str,
'working_memory_step_1': str,
'working_memory_step_2': str,
'working_memory_step_3': str,
'working_memory_step_4': str,
}
ret = await asyncio.to_thread(
plugin['func'].submit,
prompt=prompt_gen,
structured_schema=structured_schema,
args=self.service.host.mock_update_args(plugin),
images=None,
)
json_parsed: dict[str, str] = json.loads(ret[0]['llm_response'])
self.conversation_direction['current_user_intent'] = json_parsed['current_user_intent']
self.conversation_direction['working_memory'] = json_parsed['working_memory_step_4']
if __debug__:
self.service.send_message_async(
self.service.Topic.DEBUG_CONVERSATION_DIRECTION,
{'ask_id': self.id, 'prompt': prompt_gen.render_prompt(), 'working_memory': ret[0]['llm_response']},
)
self.service.host.update_mock_data(plugin, ret)
self.metrics_tracker.increment(
engramic.application.retrieve.retrieve_service.RetrieveMetric.CONVERSATION_DIRECTION_CALCULATED
)
def on_direction_ret_complete(self, fut: Future[Any]) -> None:
ret_val = fut.result()
del ret_val
embed_step = self.service.run_task(self._embed_gen_direction())
embed_step.add_done_callback(self.on_embed_direction_complete)
async def _embed_gen_direction(self) -> list[float]:
plugin = self.embeddings_gen_embed
ret = await asyncio.to_thread(
plugin['func'].gen_embed,
strings=[self.conversation_direction['current_user_intent']],
args=self.service.host.mock_update_args(plugin),
)
self.service.host.update_mock_data(plugin, ret)
float_array: list[float] = ret[0]['embeddings_list'][0]
return float_array
def on_embed_direction_complete(self, fut: Future[Any]) -> None:
intent_embedding = fut.result()
fetch_direction_step = self.service.run_task(self._vector_fetch_direction_meta(intent_embedding))
fetch_direction_step.add_done_callback(self.on_vector_fetch_direction_meta_complete)
async def _vector_fetch_direction_meta(self, intent_embedding: list[float]) -> list[str]:
plugin = self.prompt_vector_db_plugin
plugin['args'].update({'threshold': 0.6}) # meta needs a broader threshold.
plugin['args'].update({'n_results': 2}) # num results per vector
self.type_filters = ['native', 'episodic']
if self.prompt.widget_cmd:
self.type_filters.append('procedural')
ret = await asyncio.to_thread(
plugin['func'].query,
collection_name='meta',
embeddings=intent_embedding,
repo_filters=self.prompt.repo_ids_filters,
type_filters=self.type_filters,
args=self.service.host.mock_update_args(plugin),
)
self.service.host.update_mock_data(plugin, ret)
list_str: list[str] = ret[0]['query_set']
# logging.warning(list_str)
return list_str
def on_vector_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
meta_ids = fut.result()
meta_fetch_step = self.service.run_task(self._fetch_direction_meta(meta_ids))
meta_fetch_step.add_done_callback(self.on_fetch_direction_meta_complete)
async def _fetch_direction_meta(self, meta_id: list[str]) -> list[Meta]:
meta_list = self.service.meta_repository.load_batch(meta_id)
if __debug__:
dict_meta = [meta.summary_full.text if meta.summary_full is not None else '' for meta in meta_list]
self.service.send_message_async(
self.service.Topic.DEBUG_ASK_META, {'ask_id': self.id, 'ask_meta': dict_meta}
)
return meta_list
def on_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
meta_list = fut.result()
analyze_step = self.service.run_tasks([self._analyze_prompt(), self._generate_indices(meta_list)])
analyze_step.add_done_callback(self.on_analyze_complete)
"""
### Prompt Analysis
Analyzies the prompt and generates lookups that will aid in vector searching of related content
"""
async def _analyze_prompt(self) -> dict[str, Any]:
plugin = self.prompt_analysis_plugin
# add prompt engineering here and submit as the full prompt.
prompt = PromptAnalyzePrompt(
prompt_str=self.prompt.prompt_str,
input_data={
'working_memory': self.conversation_direction['working_memory'],
'current_user_intent': self.conversation_direction['current_user_intent'],
},
)
structured_response = {
'response_length': str,
'user_prompt_type': str,
'thinking_steps': str,
'remember_request': bool,
}
ret = await asyncio.to_thread(
plugin['func'].submit,
prompt=prompt,
structured_schema=structured_response,
args=self.service.host.mock_update_args(plugin),
images=None,
)
self.service.host.update_mock_data(plugin, ret)
self.metrics_tracker.increment(engramic.application.retrieve.retrieve_service.RetrieveMetric.PROMPTS_ANALYZED)
if not isinstance(ret[0], dict):
error = f'Expected dict[str, str], got {type(ret[0])}'
raise TypeError(error)
json_ret: dict[str, Any] = json.loads(ret[0]['llm_response'])
return json_ret
async def _generate_indices(self, meta_list: list[Meta]) -> dict[str, Any]:
plugin = self.prompt_retrieve_indices_plugin
# add prompt engineering here and submit as the full prompt.
input_data: dict[str, Any] = {
'meta_list': meta_list,
'current_user_intent': self.conversation_direction['current_user_intent'],
}
if len(self.service.repo_folders.items()) > 0:
input_data.update({'all_repos': self.service.repo_folders})
else:
input_data.update({'all_repos': None})
prompt = PromptGenIndices(
prompt_str=self.prompt.prompt_str, input_data=input_data, repo_ids_filters=self.prompt.repo_ids_filters
)
structured_output = {'indices': list[str]}
ret = await asyncio.to_thread(
plugin['func'].submit,
prompt=prompt,
structured_schema=structured_output,
args=self.service.host.mock_update_args(plugin),
images=None,
)
if __debug__:
prompt_render = prompt.render_prompt()
self.service.send_message_async(
Service.Topic.DEBUG_ASK_INDICES,
{'ask_id': self.id, 'prompt': prompt_render, 'indices': ret[0]['llm_response']},
)
self.service.host.update_mock_data(plugin, ret)
response = ret[0]['llm_response']
try:
response_json = json.loads(response)
except json.JSONDecodeError:
logging.exception('Failed to parse JSON in _generate_indices: Response: %s', response)
raise
count = len(response_json['indices'])
self.metrics_tracker.increment(
engramic.application.retrieve.retrieve_service.RetrieveMetric.DYNAMIC_INDICES_GENERATED, count
)
if not isinstance(ret[0], dict):
error = f'Expected dict[str, str], got {type(ret[0])}'
raise TypeError(error)
json_ret: dict[str, Any] = json.loads(ret[0]['llm_response'])
return json_ret
def on_analyze_complete(self, fut: Future[Any]) -> None:
analysis = fut.result() # This will raise an exception if the coroutine fails
try:
analysis_json = analysis['_analyze_prompt'][0]
indices_json = analysis['_generate_indices'][0]
except json.JSONDecodeError:
logging.exception('Failed to parse JSON in on_analyze_complete')
raise
if self.prompt.widget_cmd:
indices_json['indices'].append('widget ' + self.prompt.widget_cmd)
self.prompt_analysis = PromptAnalysis(
analysis_json,
indices_json,
)
genrate_indices_future = self.service.run_task(
self._generate_indicies_embeddings(self.prompt_analysis.indices['indices'])
)
genrate_indices_future.add_done_callback(self.on_indices_embeddings_generated)
def on_indices_embeddings_generated(self, fut: Future[Any]) -> None:
embeddings = fut.result()
query_index_db_future = self.service.run_task(self._query_index_db(embeddings))
query_index_db_future.add_done_callback(self.on_query_index_db)
async def _generate_indicies_embeddings(self, indices: list[str]) -> list[list[float]]:
plugin = self.embeddings_gen_embed
if not indices:
return []
ret = await asyncio.to_thread(
plugin['func'].gen_embed, strings=indices, args=self.service.host.mock_update_args(plugin)
)
self.service.host.update_mock_data(plugin, ret)
embeddings_list: list[list[float]] = ret[0]['embeddings_list']
return embeddings_list
"""
### Fetch Engram IDs
Use the indices to fetch related Engram IDs
"""
async def _query_index_db(self, embeddings: list[list[float]]) -> set[str]:
plugin = self.prompt_vector_db_plugin
if not embeddings:
return set()
ids = set()
ret = await asyncio.to_thread(
plugin['func'].query,
collection_name='main',
embeddings=embeddings,
repo_filters=self.prompt.repo_ids_filters,
type_filters=self.type_filters,
args=self.service.host.mock_update_args(plugin),
)
self.service.host.update_mock_data(plugin, ret)
ids.update(ret[0]['query_set'])
num_queries = len(ids)
self.metrics_tracker.increment(
engramic.application.retrieve.retrieve_service.RetrieveMetric.VECTOR_DB_QUERIES, num_queries
)
return ids
def on_query_index_db(self, fut: Future[Any]) -> None:
ret = fut.result()
logging.debug('Query Result: %s', ret)
if self.prompt_analysis is None:
error = 'on_query_index_db failed: prompt_analysis is None and likely failed during an earlier process.'
raise RuntimeError
retrieve_result = RetrieveResult(
self.id,
self.prompt.prompt_id,
engram_id_array=list(ret),
conversation_direction=self.conversation_direction,
analysis=asdict(self.prompt_analysis)['prompt_analysis'],
)
if self.prompt_analysis.prompt_analysis['remember_request']:
self.prompt.training_mode = True
if self.prompt_analysis is None:
error = 'Prompt analysis None in on_query_index_db'
raise RuntimeError(error)
retrieve_response = {
'analysis': asdict(self.prompt_analysis),
'prompt': asdict(self.prompt),
'retrieve_response': asdict(retrieve_result),
}
if __debug__:
self.service.host.update_mock_data_output(self.service, retrieve_response)
self.service.send_message_async(Service.Topic.RETRIEVE_COMPLETE, retrieve_response)
|