Bases: Service
A service responsible for persisting various data artifacts generated during the Engramic runtime process.
The StorageService listens to multiple system topics and asynchronously saves different types of data,
including observations, engrams, metadata, and history responses. It utilizes plugin-based repositories
for storage and maintains metrics on saved entities for monitoring and reporting.
Attributes:
Name |
Type |
Description |
plugin_manager |
PluginManager
|
Manages system plugins including the database plugin.
|
db_document_plugin |
|
The database document plugin used by repositories.
|
history_repository |
HistoryRepository
|
Handles persistence of history data.
|
observation_repository |
ObservationRepository
|
Handles persistence of observations.
|
engram_repository |
EngramRepository
|
Handles persistence of engrams.
|
meta_repository |
MetaRepository
|
Handles persistence of metadata.
|
metrics_tracker |
MetricsTracker
|
Tracks and reports storage metrics for each saved entity type.
|
Methods:
Name |
Description |
start |
Subscribes to message topics and prepares the service for operation.
|
init_async |
Initializes database connections asynchronously.
|
on_engram_complete |
Callback to handle completed engrams and trigger storage.
|
on_observation_complete |
Callback to handle completed observations and trigger storage.
|
on_prompt_complete |
Callback to handle completed prompt responses and trigger history storage.
|
on_meta_complete |
Callback to handle completed meta information and trigger storage.
|
save_observation |
Asynchronously saves an observation and updates metrics.
|
save_history |
Asynchronously saves a history response and updates metrics.
|
save_engram |
Asynchronously saves an engram and updates metrics.
|
save_meta |
Asynchronously saves meta information and updates metrics.
|
on_acknowledge |
Collects metrics and sends a status update on acknowledgment.
|
Source code in src/engramic/application/storage/storage_service.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 | class StorageService(Service):
"""
A service responsible for persisting various data artifacts generated during the Engramic runtime process.
The StorageService listens to multiple system topics and asynchronously saves different types of data,
including observations, engrams, metadata, and history responses. It utilizes plugin-based repositories
for storage and maintains metrics on saved entities for monitoring and reporting.
Attributes:
plugin_manager (PluginManager): Manages system plugins including the database plugin.
db_document_plugin: The database document plugin used by repositories.
history_repository (HistoryRepository): Handles persistence of history data.
observation_repository (ObservationRepository): Handles persistence of observations.
engram_repository (EngramRepository): Handles persistence of engrams.
meta_repository (MetaRepository): Handles persistence of metadata.
metrics_tracker (MetricsTracker): Tracks and reports storage metrics for each saved entity type.
Methods:
start(): Subscribes to message topics and prepares the service for operation.
init_async(): Initializes database connections asynchronously.
on_engram_complete(engram_dict): Callback to handle completed engrams and trigger storage.
on_observation_complete(response): Callback to handle completed observations and trigger storage.
on_prompt_complete(response_dict): Callback to handle completed prompt responses and trigger history storage.
on_meta_complete(meta_dict): Callback to handle completed meta information and trigger storage.
save_observation(response): Asynchronously saves an observation and updates metrics.
save_history(response): Asynchronously saves a history response and updates metrics.
save_engram(engram): Asynchronously saves an engram and updates metrics.
save_meta(meta): Asynchronously saves meta information and updates metrics.
on_acknowledge(message_in): Collects metrics and sends a status update on acknowledgment.
"""
def __init__(self, host: Host) -> None:
super().__init__(host)
self.plugin_manager: PluginManager = host.plugin_manager
self.db_document_plugin = self.plugin_manager.get_plugin('db', 'document')
self.history_repository: HistoryRepository = HistoryRepository(self.db_document_plugin)
self.observation_repository: ObservationRepository = ObservationRepository(self.db_document_plugin)
self.engram_repository: EngramRepository = EngramRepository(self.db_document_plugin)
self.meta_repository: MetaRepository = MetaRepository(self.db_document_plugin)
self.metrics_tracker: MetricsTracker[StorageMetric] = MetricsTracker[StorageMetric]()
def start(self) -> None:
self.subscribe(Service.Topic.ACKNOWLEDGE, self.on_acknowledge)
self.subscribe(Service.Topic.MAIN_PROMPT_COMPLETE, self.on_prompt_complete)
self.subscribe(Service.Topic.OBSERVATION_COMPLETE, self.on_observation_complete)
self.subscribe(Service.Topic.ENGRAM_COMPLETE, self.on_engram_complete)
self.subscribe(Service.Topic.META_COMPLETE, self.on_meta_complete)
def init_async(self) -> None:
self.db_document_plugin['func'].connect(args=None)
return super().init_async()
def on_engram_complete(self, engram_dict: dict[str, Any]) -> None:
engram_batch = self.engram_repository.load_batch_dict(engram_dict['engram_array'])
for engram in engram_batch:
self.run_task(self.save_engram(engram))
def on_observation_complete(self, response: Observation) -> None:
self.run_task(self.save_observation(response))
def on_prompt_complete(self, response_dict: dict[Any, Any]) -> None:
response = Response(**response_dict)
self.run_task(self.save_history(response))
def on_meta_complete(self, meta_dict: dict[str, str]) -> None:
meta: Meta = self.meta_repository.load(meta_dict)
self.run_task(self.save_meta(meta))
async def save_observation(self, response: Observation) -> None:
self.observation_repository.save(response)
self.metrics_tracker.increment(StorageMetric.OBSERVATION_SAVED)
logging.debug('Storage service saving observation.')
async def save_history(self, response: Response) -> None:
await asyncio.to_thread(self.history_repository.save_history, response)
self.metrics_tracker.increment(StorageMetric.HISTORY_SAVED)
logging.debug('Storage service saving history.')
async def save_engram(self, engram: Engram) -> None:
await asyncio.to_thread(self.engram_repository.save_engram, engram)
self.metrics_tracker.increment(StorageMetric.ENGRAM_SAVED)
logging.debug('Storage service saving engram.')
async def save_meta(self, meta: Meta) -> None:
logging.debug('Storage service saving meta.')
await asyncio.to_thread(self.meta_repository.save, meta)
self.metrics_tracker.increment(StorageMetric.META_SAVED)
def on_acknowledge(self, message_in: str) -> None:
del message_in
metrics_packet: MetricPacket = self.metrics_tracker.get_and_reset_packet()
self.send_message_async(
Service.Topic.STATUS,
{'id': self.id, 'name': self.__class__.__name__, 'timestamp': time.time(), 'metrics': metrics_packet},
)
|