Skip to content

Retrieve Service

Bases: Service

Service responsible for handling semantic prompt retrieval. It communicates with vector and document databases via plugins and tracks operational metrics for observability.

Attributes:

Name Type Description
plugin_manager PluginManager

Plugin manager to access database and vector services.

vector_db_plugin dict

Plugin for interacting with the vector database.

db_plugin dict

Plugin for document database operations.

metrics_tracker MetricsTracker

Tracks various retrieval-related metrics.

meta_repository MetaRepository

Handles persistence and loading of Meta objects.

Methods:

Name Description
init_async

Initializes the database connection.

start

Subscribes to service topics for prompt submission, indexing, and metadata completion.

stop

Stops the service and cleans up subscriptions.

submit

Initiates the retrieval process for a given prompt and logs metrics.

on_submit_prompt

Handles incoming prompt strings from messaging, wraps them in a Prompt object, and triggers retrieval.

on_index_complete

Callback that handles completed indices and submits them to the vector DB.

on_meta_complete

Callback for completed metadata, converts and submits to vector DB.

insert_meta_vector

Asynchronously inserts metadata summaries into the vector DB.

insert_engram_vector

Asynchronously inserts semantic indices into the vector DB.

on_acknowledge

Handles status reporting and resets the current metric tracker.

Source code in src/engramic/application/retrieve/retrieve_service.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class RetrieveService(Service):
    """
    Service responsible for handling semantic prompt retrieval. It communicates with vector and document databases via plugins and tracks operational metrics for observability.

    Attributes:
        plugin_manager (PluginManager): Plugin manager to access database and vector services.
        vector_db_plugin (dict): Plugin for interacting with the vector database.
        db_plugin (dict): Plugin for document database operations.
        metrics_tracker (MetricsTracker): Tracks various retrieval-related metrics.
        meta_repository (MetaRepository): Handles persistence and loading of Meta objects.

    Methods:
        init_async(): Initializes the database connection.
        start(): Subscribes to service topics for prompt submission, indexing, and metadata completion.
        stop(): Stops the service and cleans up subscriptions.

        submit(prompt): Initiates the retrieval process for a given prompt and logs metrics.
        on_submit_prompt(data): Handles incoming prompt strings from messaging, wraps them in a Prompt object, and triggers retrieval.

        on_index_complete(index_message): Callback that handles completed indices and submits them to the vector DB.
        on_meta_complete(meta_dict): Callback for completed metadata, converts and submits to vector DB.

        insert_meta_vector(meta): Asynchronously inserts metadata summaries into the vector DB.
        insert_engram_vector(index_list, engram_id): Asynchronously inserts semantic indices into the vector DB.

        on_acknowledge(message_in): Handles status reporting and resets the current metric tracker.
    """

    def __init__(self, host: Host) -> None:
        super().__init__(host)
        self.plugin_manager: PluginManager = host.plugin_manager
        self.vector_db_plugin = host.plugin_manager.get_plugin('vector_db', 'db')
        self.db_plugin = host.plugin_manager.get_plugin('db', 'document')
        self.metrics_tracker: MetricsTracker[RetrieveMetric] = MetricsTracker[RetrieveMetric]()
        self.meta_repository: MetaRepository = MetaRepository(self.db_plugin)

    def init_async(self) -> None:
        self.db_plugin['func'].connect(args=None)
        return super().init_async()

    def start(self) -> None:
        self.subscribe(Service.Topic.ACKNOWLEDGE, self.on_acknowledge)
        self.subscribe(Service.Topic.SUBMIT_PROMPT, self.on_submit_prompt)
        self.subscribe(Service.Topic.INDEX_COMPLETE, self.on_index_complete)
        self.subscribe(Service.Topic.META_COMPLETE, self.on_meta_complete)

    def stop(self) -> None:
        super().stop()

    # when called from monitor service
    def on_submit_prompt(self, data: str) -> None:
        self.submit(Prompt(data))

    # when used from main
    def submit(self, prompt: Prompt) -> None:
        if __debug__:
            self.host.update_mock_data_input(self, asdict(prompt))

        self.metrics_tracker.increment(RetrieveMetric.PROMPTS_SUBMITTED)
        retrieval = Ask(str(uuid.uuid4()), prompt, self.plugin_manager, self.metrics_tracker, self.db_plugin, self)
        retrieval.get_sources()

    def on_index_complete(self, index_message: dict[str, Any]) -> None:
        raw_index: list[dict[str, Any]] = index_message['index']
        engram_id: str = index_message['engram_id']
        index_list: list[Index] = [Index(**item) for item in raw_index]
        self.run_task(self._insert_engram_vector(index_list, engram_id))

    async def _insert_engram_vector(self, index_list: list[Index], engram_id: str) -> None:
        plugin = self.vector_db_plugin
        self.vector_db_plugin['func'].insert(
            collection_name='main', index_list=index_list, obj_id=engram_id, args=plugin['args']
        )

        self.metrics_tracker.increment(RetrieveMetric.EMBEDDINGS_ADDED_TO_VECTOR)

    def on_meta_complete(self, meta_dict: dict[str, Any]) -> None:
        meta = self.meta_repository.load(meta_dict)
        self.run_task(self.insert_meta_vector(meta))
        self.metrics_tracker.increment(RetrieveMetric.META_ADDED_TO_VECTOR)

    async def insert_meta_vector(self, meta: Meta) -> None:
        plugin = self.vector_db_plugin
        await asyncio.to_thread(
            self.vector_db_plugin['func'].insert,
            collection_name='meta',
            index_list=[meta.summary_full],
            obj_id=meta.id,
            args=plugin['args'],
        )

    def on_acknowledge(self, message_in: str) -> None:
        del message_in

        metrics_packet: MetricPacket = self.metrics_tracker.get_and_reset_packet()

        self.send_message_async(
            Service.Topic.STATUS,
            {'id': self.id, 'name': self.__class__.__name__, 'timestamp': time.time(), 'metrics': metrics_packet},
        )