30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400 | class Ask(Retrieval):
class Ask(Retrieval):
"""
Ask is a specifc type of retrieval focused on traditional Q&A. It is a single instanc of an ask.
This class handles the end-to-end workflow of transforming a raw prompt into
contextual embeddings, querying the vector database, and returning retrieved engram ids.
It also supports various conversation analysis and prompt index generation.
Attributes:
id (str): A unique identifier for this retrieval session.
prompt (Prompt): The original prompt provided by the user.
plugin_manager (PluginManager): Plugin manager used to access LLM, vector DB, and embedding components.
metrics_tracker (MetricsTracker): Tracks operational metrics for observability.
db_plugin (dict): Plugin used to interact with the document database.
service (RetrieveService): Reference to the parent service coordinating this request.
library (str | None): Optional name of the target library to search within.
conversation_direction (dict[str, str]): Stores current user intent and working memory.
prompt_analysis (PromptAnalysis | None): Stores structured analysis of the prompt after processing.
Methods:
get_sources():
Initiates the async pipeline for directional memory retrieval.
_fetch_history():
Asynchronously retrieves prior user history from the document DB.
_retrieve_gen_conversation_direction():
Uses LLM plugin to extract user intent and conversational memory.
_embed_gen_direction():
Converts extracted intent into embeddings. General direction determines intent and manages short term memory.
_vector_fetch_direction_meta():
Queries the metadata collection in the vector DB using intent embeddings.
_fetch_direction_meta():
Loads Meta objects from the metadata store based on query results.
_analyze_prompt():
Uses LLM plugin to analyze the user prompt in the context of metadata.
_generate_indices():
Generates semantic indices from the prompt and metadata for retrieval.
_generate_indicies_embeddings():
Converts generated index phrases into embeddings.
_query_index_db():
Searches the main vector DB with embeddings to identify related engrams.
on_fetch_history_complete(fut):
Callback when prompt history fetch is complete; begins direction generation.
on_direction_ret_complete(fut):
Callback when conversation direction is generated; begins embedding.
on_embed_direction_complete(fut):
Callback when direction embedding is ready; begins vector DB search for metadata.
on_vector_fetch_direction_meta_complete(fut):
Callback when metadata vector search is complete; begins metadata fetch.
on_fetch_direction_meta_complete(fut):
Callback when metadata objects are fetched; begins analysis and index generation.
on_analyze_complete(fut):
Callback when prompt analysis and index generation are complete; begins embedding generation.
on_indices_embeddings_generated(fut):
Callback when index embeddings are ready; triggers main index DB query.
on_query_index_db(fut):
Final callback when engram retrieval is complete; assembles and emits the result.
"""
def __init__(
self,
ask_id: str,
prompt: Prompt,
plugin_manager: PluginManager,
metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric],
db_plugin: dict[str, Any],
service: RetrieveService,
library: str | None = None,
) -> None:
self.id = ask_id
self.service = service
self.metrics_tracker: MetricsTracker[engramic.application.retrieve.retrieve_service.RetrieveMetric] = (
metrics_tracker
)
self.library = library
self.prompt = prompt
self.conversation_direction: dict[str, str]
self.prompt_analysis: PromptAnalysis | None = None
self.retrieve_gen_conversation_direction_plugin = plugin_manager.get_plugin(
'llm', 'retrieve_gen_conversation_direction'
)
self.prompt_analysis_plugin = plugin_manager.get_plugin('llm', 'retrieve_prompt_analysis')
self.prompt_retrieve_indices_plugin = plugin_manager.get_plugin('llm', 'retrieve_gen_index')
self.prompt_vector_db_plugin = plugin_manager.get_plugin('vector_db', 'db')
self.prompt_db_document_plugin = db_plugin
self.embeddings_gen_embed = plugin_manager.get_plugin('embedding', 'gen_embed')
def get_sources(self) -> None:
direction_step = self.service.run_task(self._fetch_history())
direction_step.add_done_callback(self.on_fetch_history_complete)
"""
### CONVERSATION DIRECTION
Fetches related domain knowledge based on the prompt intent.
"""
async def _fetch_history(self) -> list[dict[str, Any]]:
plugin = self.prompt_db_document_plugin
args = plugin['args']
args['history'] = 1
ret_val = await asyncio.to_thread(plugin['func'].fetch, table=DB.DBTables.HISTORY, ids=[], args=args)
history_dict: list[dict[str, Any]] = ret_val[0]
return history_dict
def on_fetch_history_complete(self, fut: Future[Any]) -> None:
response_array: list[dict[str, Any]] = fut.result()
retrieve_gen_conversation_direction_step = self.service.run_task(
self._retrieve_gen_conversation_direction(response_array)
)
retrieve_gen_conversation_direction_step.add_done_callback(self.on_direction_ret_complete)
async def _retrieve_gen_conversation_direction(self, response_array: list[dict[str, Any]]) -> dict[str, str]:
if __debug__:
self.service.send_message_async(self.service.Topic.DEBUG_ASK_CREATED, {'ask_id': self.id})
input_data = {'history_array': response_array}
plugin = self.retrieve_gen_conversation_direction_plugin
# add prompt engineering here and submit as the full prompt.
prompt_gen = PromptGenConversation(prompt_str=self.prompt.prompt_str, input_data=input_data)
structured_schema = {'current_user_intent': str, 'working_memory': str}
ret = await asyncio.to_thread(
plugin['func'].submit,
prompt=prompt_gen,
structured_schema=structured_schema,
args=self.service.host.mock_update_args(plugin),
)
json_parsed: dict[str, str] = json.loads(ret[0]['llm_response'])
self.conversation_direction = json_parsed
if __debug__:
self.service.send_message_async(
self.service.Topic.DEBUG_CONVERSATION_DIRECTION,
{'ask_id': self.id, 'prompt': prompt_gen.render_prompt(), 'working_memory': ret[0]['llm_response']},
)
self.service.host.update_mock_data(plugin, ret)
self.metrics_tracker.increment(
engramic.application.retrieve.retrieve_service.RetrieveMetric.CONVERSATION_DIRECTION_CALCULATED
)
return json_parsed
def on_direction_ret_complete(self, fut: Future[Any]) -> None:
direction_ret = fut.result()
logging.debug('current_user_intent: %s', direction_ret)
intent_and_direction = direction_ret['current_user_intent']
embed_step = self.service.run_task(self._embed_gen_direction(intent_and_direction))
embed_step.add_done_callback(self.on_embed_direction_complete)
async def _embed_gen_direction(self, main_prompt: str) -> list[float]:
plugin = self.embeddings_gen_embed
ret = await asyncio.to_thread(
plugin['func'].gen_embed, strings=[main_prompt], args=self.service.host.mock_update_args(plugin)
)
self.service.host.update_mock_data(plugin, ret)
float_array: list[float] = ret[0]['embeddings_list'][0]
return float_array
def on_embed_direction_complete(self, fut: Future[Any]) -> None:
embedding = fut.result()
fetch_direction_step = self.service.run_task(self._vector_fetch_direction_meta(embedding))
fetch_direction_step.add_done_callback(self.on_vector_fetch_direction_meta_complete)
async def _vector_fetch_direction_meta(self, embedding: list[float]) -> list[str]:
plugin = self.prompt_vector_db_plugin
plugin['args'].update({'threshold': 0.6, 'n_results': 5})
ret = await asyncio.to_thread(
plugin['func'].query,
collection_name='meta',
embeddings=embedding,
args=self.service.host.mock_update_args(plugin),
)
self.service.host.update_mock_data(plugin, ret)
list_str: list[str] = ret[0]['query_set']
return list_str
def on_vector_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
meta_ids = fut.result()
meta_fetch_step = self.service.run_task(self._fetch_direction_meta(meta_ids))
meta_fetch_step.add_done_callback(self.on_fetch_direction_meta_complete)
async def _fetch_direction_meta(self, meta_id: list[str]) -> list[Meta]:
meta_list = self.service.meta_repository.load_batch(meta_id)
if __debug__:
dict_meta = [meta.summary_full.text if meta.summary_full is not None else '' for meta in meta_list]
self.service.send_message_async(
self.service.Topic.DEBUG_ASK_META, {'ask_id': self.id, 'ask_meta': dict_meta}
)
return meta_list
def on_fetch_direction_meta_complete(self, fut: Future[Any]) -> None:
meta_list = fut.result()
analyze_step = self.service.run_tasks([self._analyze_prompt(meta_list), self._generate_indices(meta_list)])
analyze_step.add_done_callback(self.on_analyze_complete)
"""
### Prompt Analysis
Analyzies the prompt and generates lookups that will aid in vector searching of related content
"""
async def _analyze_prompt(self, meta_list: list[Meta]) -> dict[str, str]:
plugin = self.prompt_analysis_plugin
# add prompt engineering here and submit as the full prompt.
prompt = PromptAnalyzePrompt(prompt_str=self.prompt.prompt_str, input_data={'meta_list': meta_list})
structured_response = {'response_length': str}
ret = await asyncio.to_thread(
plugin['func'].submit,
prompt=prompt,
structured_schema=structured_response,
args=self.service.host.mock_update_args(plugin),
)
self.service.host.update_mock_data(plugin, ret)
self.metrics_tracker.increment(engramic.application.retrieve.retrieve_service.RetrieveMetric.PROMPTS_ANALYZED)
if not isinstance(ret[0], dict):
error = f'Expected dict[str, str], got {type(ret[0])}'
raise TypeError(error)
return ret[0]
def on_analyze_complete(self, fut: Future[Any]) -> None:
analysis = fut.result() # This will raise an exception if the coroutine fails
self.prompt_analysis = PromptAnalysis(
json.loads(analysis['_analyze_prompt'][0]['llm_response']),
json.loads(analysis['_generate_indices'][0]['llm_response']),
)
genrate_indices_future = self.service.run_task(
self._generate_indicies_embeddings(self.prompt_analysis.indices['indices'])
)
genrate_indices_future.add_done_callback(self.on_indices_embeddings_generated)
async def _generate_indices(self, meta_list: list[Meta]) -> dict[str, str]:
plugin = self.prompt_retrieve_indices_plugin
# add prompt engineering here and submit as the full prompt.
prompt = PromptGenIndices(prompt_str=self.prompt.prompt_str, input_data={'meta_list': meta_list})
structured_output = {'indices': list[str]}
ret = await asyncio.to_thread(
plugin['func'].submit,
prompt=prompt,
structured_schema=structured_output,
args=self.service.host.mock_update_args(plugin),
)
if __debug__:
prompt_render = prompt.render_prompt()
self.service.send_message_async(
Service.Topic.DEBUG_ASK_INDICES,
{'ask_id': self.id, 'prompt': prompt_render, 'indices': ret[0]['llm_response']},
)
self.service.host.update_mock_data(plugin, ret)
response = ret[0]['llm_response']
response_json = json.loads(response)
count = len(response_json['indices'])
self.metrics_tracker.increment(
engramic.application.retrieve.retrieve_service.RetrieveMetric.DYNAMIC_INDICES_GENERATED, count
)
if not isinstance(ret[0], dict):
error = f'Expected dict[str, str], got {type(ret[0])}'
raise TypeError(error)
return ret[0]
def on_indices_embeddings_generated(self, fut: Future[Any]) -> None:
embeddings = fut.result()
query_index_db_future = self.service.run_task(self._query_index_db(embeddings))
query_index_db_future.add_done_callback(self.on_query_index_db)
async def _generate_indicies_embeddings(self, indices: list[str]) -> list[list[float]]:
plugin = self.embeddings_gen_embed
ret = await asyncio.to_thread(
plugin['func'].gen_embed, strings=indices, args=self.service.host.mock_update_args(plugin)
)
self.service.host.update_mock_data(plugin, ret)
embeddings_list: list[list[float]] = ret[0]['embeddings_list']
return embeddings_list
"""
### Fetch Engram IDs
Use the indices to fetch related Engram IDs
"""
async def _query_index_db(self, embeddings: list[list[float]]) -> set[str]:
plugin = self.prompt_vector_db_plugin
ids = set()
ret = await asyncio.to_thread(
plugin['func'].query,
collection_name='main',
embeddings=embeddings,
args=self.service.host.mock_update_args(plugin),
)
self.service.host.update_mock_data(plugin, ret)
ids.update(ret[0]['query_set'])
num_queries = len(ids)
self.metrics_tracker.increment(
engramic.application.retrieve.retrieve_service.RetrieveMetric.VECTOR_DB_QUERIES, num_queries
)
return ids
def on_query_index_db(self, fut: Future[Any]) -> None:
ret = fut.result()
logging.debug('Query Result: %s', ret)
retrieve_result = RetrieveResult(
self.id, engram_id_array=list(ret), conversation_direction=self.conversation_direction
)
if self.prompt_analysis is None:
error = 'Prompt analysis None in on_query_index_db'
raise RuntimeError(error)
retrieve_response = {
'analysis': asdict(self.prompt_analysis),
'prompt_str': self.prompt.prompt_str,
'retrieve_response': asdict(retrieve_result),
}
if __debug__:
self.service.host.update_mock_data_output(self.service, retrieve_response)
self.service.send_message_async(Service.Topic.RETRIEVE_COMPLETE, retrieve_response)
|