Skip to content

Repo Service

Bases: Service

Service for managing repositories and their document contents.

Handles repository discovery, file indexing, and document submission for processing. Maintains in-memory indices of repositories and their files. Supports loading of .engram files and processing of PDF documents.

Attributes:

Name Type Description
plugin_manager PluginManager

Manager for system plugins.

db_document_plugin Any

Plugin for document database operations.

document_repository DocumentRepository

Repository for document storage and retrieval.

engram_repository EngramRepository

Repository for engram storage and retrieval.

observation_repository ObservationRepository

Repository for observation storage and retrieval.

repos dict[str, Repo]

Mapping of repository IDs to Repo objects.

file_index dict[str, Any]

Index of all files by document ID.

file_repos dict[str, Any]

Mapping of repository IDs to lists of document IDs.

submitted_documents set[str]

Set of document IDs that have been submitted for processing.

Methods:

Name Description
start

Starts the service and subscribes to relevant topics.

init_async

Initializes asynchronous components of the service.

submit_ids

Submits documents for processing by their IDs.

scan_folders

Discovers repositories and indexes their files.

update_repo_files

Updates the list of files for a repository.

Source code in src/engramic/application/repo/repo_service.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
class RepoService(Service):
    """
    Service for managing repositories and their document contents.

    Handles repository discovery, file indexing, and document submission for processing.
    Maintains in-memory indices of repositories and their files. Supports loading of .engram
    files and processing of PDF documents.

    Attributes:
        plugin_manager (PluginManager): Manager for system plugins.
        db_document_plugin (Any): Plugin for document database operations.
        document_repository (DocumentRepository): Repository for document storage and retrieval.
        engram_repository (EngramRepository): Repository for engram storage and retrieval.
        observation_repository (ObservationRepository): Repository for observation storage and retrieval.
        repos (dict[str, Repo]): Mapping of repository IDs to Repo objects.
        file_index (dict[str, Any]): Index of all files by document ID.
        file_repos (dict[str, Any]): Mapping of repository IDs to lists of document IDs.
        submitted_documents (set[str]): Set of document IDs that have been submitted for processing.

    Methods:
        start() -> None:
            Starts the service and subscribes to relevant topics.
        init_async() -> None:
            Initializes asynchronous components of the service.
        submit_ids(id_array, overwrite) -> None:
            Submits documents for processing by their IDs.
        scan_folders(repo_id) -> None:
            Discovers repositories and indexes their files.
        update_repo_files(repo_id, update_ids) -> None:
            Updates the list of files for a repository.
    """

    def __init__(self, host: Host) -> None:
        """
        Initializes the repository service.

        Args:
            host (Host): The host system that this service is attached to.
        """
        super().__init__(host)
        self.plugin_manager: PluginManager = host.plugin_manager
        self.db_document_plugin = self.plugin_manager.get_plugin('db', 'document')
        self.document_repository: DocumentRepository = DocumentRepository(self.db_document_plugin)
        self.engram_repository: EngramRepository = EngramRepository(self.db_document_plugin)
        self.observation_repository: ObservationRepository = ObservationRepository(self.db_document_plugin)
        self.repos: dict[str, Repo] = {}  # memory copy of all folders
        self.file_index: dict[str, Any] = {}  # memory copy of all files
        self.file_repos: dict[str, Any] = {}  # memory copy of all files in repos
        self.submitted_documents: set[str] = set()

    def start(self) -> None:
        """
        Starts the repository service and subscribes to relevant topics.
        """
        self.subscribe(Service.Topic.REPO_SUBMIT_IDS, self._on_submit_ids)
        self.subscribe(Service.Topic.REPO_UPDATE_REPOS, self.scan_folders)
        self.subscribe(Service.Topic.PROGRESS_UPDATED, self._on_progress_updated)
        super().start()
        self.scan_folders()

    def init_async(self) -> None:
        """
        Initializes asynchronous components of the service.
        """
        return super().init_async()

    def _on_submit_ids(self, msg: str) -> None:
        """
        Handles the REPO_SUBMIT_IDS message.

        Args:
            msg (str): JSON message containing document IDs to submit.
        """
        json_msg = json.loads(msg)
        id_array = json_msg['submit_ids']
        overwrite = False
        if 'overwrite' in json_msg:
            overwrite = json_msg['overwrite']
        self.submit_ids(id_array, overwrite=overwrite)

    def submit_ids(self, id_array: list[str], *, overwrite: bool = False) -> None:
        """
        Submits documents for processing by their IDs.

        Args:
            id_array (list[str]): List of document IDs to submit.
            overwrite (bool): Whether to overwrite existing documents. Defaults to False.
        """
        for sub_id in id_array:
            document = self.file_index[sub_id]
            self.send_message_async(
                Service.Topic.SUBMIT_DOCUMENT, {'document': asdict(document), 'overwrite': overwrite}
            )
            self.submitted_documents.add(document.id)

    def _load_repository(self, folder_path: Path) -> tuple[str, bool]:
        """
        Loads the repository ID from a .repo file.

        Args:
            folder_path (Path): Path to the repository folder.

        Returns:
            str: The repository ID.

        Raises:
            RuntimeError: If the .repo file is missing or invalid.
            TypeError: If the repository ID is not a string.
        """
        repo_file = folder_path / '.repo'
        if not repo_file.is_file():
            error = f"Repository config file '.repo' not found in folder '{folder_path}'."
            raise RuntimeError(error)
        with repo_file.open('rb') as f:
            data = tomli.load(f)
        try:
            repository_id = data['repository']['id']
            is_default = False
            if 'is_default' in data['repository']:
                is_default = data['repository']['is_default']

        except KeyError as err:
            error = f"Missing 'repository.id' entry in .repo file at '{repo_file}'."
            raise RuntimeError(error) from err
        if not isinstance(repository_id, str):
            error = "'repository.id' must be a string in '%s'."
            raise TypeError(error % repo_file)
        return repository_id, is_default

    def _discover_repos(self, repo_root: Path) -> None:
        """
        Discovers repositories in the specified root directory.

        Args:
            repo_root (Path): Root directory containing repositories.

        Raises:
            ValueError: If a repository is named 'null'.
        """
        for name in os.listdir(repo_root):
            folder_path = repo_root / name
            if folder_path.is_dir():
                if name == 'null':
                    error = "Folder name 'null' is reserved and cannot be used as a repository name."
                    logging.error(error)
                    raise ValueError(error)
                try:
                    repo_id, is_default = self._load_repository(folder_path)
                    self.repos[repo_id] = Repo(name=name, repo_id=repo_id, is_default=is_default)
                except (FileNotFoundError, PermissionError, ValueError, OSError) as e:
                    info = "Skipping '%s': %s"
                    logging.info(info, name, e)

    def _on_progress_updated(self, msg: dict[str, Any]) -> None:
        progress_type = msg['progress_type']
        doc_id = None
        if progress_type == 'document':
            doc_id = msg['id']
        if progress_type == 'lesson':
            doc_id = msg['target_id']

        if doc_id is not None and doc_id in self.file_index:  # might be a different progress update
            file = self.file_index[doc_id]

            if progress_type == 'document':
                file.percent_complete_document = msg['percent_complete']
            elif progress_type == 'lesson':
                file.percent_complete_lesson = msg['percent_complete']

            folder = self.repos[file.repo_id].name

            self.send_message_async(
                Service.Topic.REPO_FILES,
                {'repo': folder, 'repo_id': file.repo_id, 'files': [asdict(file)]},
            )

    async def update_repo_files(self, repo_id: str, update_ids: list[str] | None = None) -> None:
        """
        Updates the list of files for a repository.

        Args:
            repo_id (str): ID of the repository to update.
            update_ids (list[str] | None): List of document IDs to update. If None, updates all files.
        """
        document_dicts = []

        folder = self.repos[repo_id].name

        update_list = self.file_repos[repo_id] if update_ids is None else update_ids

        document_dicts = [asdict(self.file_index[document_id]) for document_id in update_list]

        self.send_message_async(
            Service.Topic.REPO_FILES,
            {'repo': folder, 'repo_id': repo_id, 'files': document_dicts},
        )

    def scan_folders(self, repo_id: dict[str, str] | None = None) -> None:
        """
        Scans repository folders and indexes their files.

        Discovers repositories, indexes their files, and sends messages with the repository information.

        Args:
            repo_id (dict[str,str] | None): Optional dictionary containing repository ID with key "repo_id". If None, scans all repositories.

        Raises:
            RuntimeError: If the REPO_ROOT environment variable is not set.
            ValueError: If the specified repo_id is not found.
        """
        repo_root = self._get_repo_root()
        repos_to_scan = self._determine_repos_to_scan(repo_id, repo_root)

        self._send_repo_folders_message()
        self._scan_and_index_repos(repos_to_scan, repo_root)

    def _get_repo_root(self) -> Path:
        """Get and validate the repository root path."""
        repo_root = os.getenv('REPO_ROOT')
        if repo_root is None:
            error = "Environment variable 'REPO_ROOT' is not set."
            raise RuntimeError(error)
        return Path(repo_root).expanduser()

    def _determine_repos_to_scan(self, repo_id: dict[str, str] | None, repo_root: Path) -> dict[str, Repo]:
        """Determine which repositories need to be scanned."""
        target_repo_id = repo_id.get('repo_id') if repo_id is not None else None

        if target_repo_id is not None:
            return self._get_specific_repo(target_repo_id, repo_root)
        self._discover_repos(repo_root)
        return self.repos

    def _get_specific_repo(self, target_repo_id: str, repo_root: Path) -> dict[str, Repo]:
        """Get a specific repository, discovering it if necessary."""
        if target_repo_id not in self.repos:
            self._discover_repos(repo_root)
            if target_repo_id not in self.repos:
                error = f"Repository with ID '{target_repo_id}' not found."
                raise ValueError(error)
        return {target_repo_id: self.repos[target_repo_id]}

    def _send_repo_folders_message(self) -> None:
        """Send async message with repository folders."""

        async def send_message() -> None:
            # Convert Repo objects to dictionaries for serialization
            repos_dict = {repo_id: asdict(repo) for repo_id, repo in self.repos.items()}
            self.send_message_async(Service.Topic.REPO_FOLDERS, {'repo_folders': repos_dict})

        self.run_task(send_message())

    def _scan_and_index_repos(self, repos_to_scan: dict[str, Repo], repo_root: Path) -> None:
        """Scan and index files in the specified repositories."""
        for current_repo_id, repo in repos_to_scan.items():
            document_ids = self._index_repo_files(current_repo_id, repo.name, repo_root)
            self.file_repos[current_repo_id] = document_ids
            future = self.run_task(self.update_repo_files(current_repo_id))
            future.add_done_callback(self._on_update_repo_files_complete)

    def _index_repo_files(self, repo_id: str, folder: str, repo_root: Path) -> list[str]:
        """Index all files in a repository folder."""
        document_ids = []
        for root, dirs, files in os.walk(repo_root / folder):
            del dirs
            for file in files:
                if file.startswith('.'):
                    continue  # Skip hidden files

                doc = self._handle_file_by_type(repo_id, folder, root, file, repo_root)
                if doc is not None:
                    document_ids.append(doc.id)
                    self.file_index[doc.id] = doc

        return document_ids

    def _handle_file_by_type(self, repo_id: str, folder: str, root: str, file: str, repo_root: Path) -> Document | None:
        """Handle different file types and return a Document if applicable."""
        file_path = Path(root) / file
        relative_path = file_path.relative_to(repo_root / folder)
        relative_dir = str(relative_path.parent) if relative_path.parent != Path('.') else ''

        # Handle different file types
        file_extension = Path(file).suffix.lower()

        if file_extension == '.pdf':
            return self._create_document_from_pdf(repo_id, folder, relative_dir, file)
        if file_extension == '.engram':
            self._load_engram_file(file_path)
            return None
        # Skip other file types
        return None

    def _create_document_from_pdf(self, repo_id: str, folder: str, relative_dir: str, file_name: str) -> Document:
        """Create a Document object for PDF files."""
        doc = Document(
            root_directory=Document.Root.DATA.value,
            file_path=folder + relative_dir,
            file_name=file_name,
            repo_id=repo_id,
            tracking_id=str(uuid.uuid4()),
        )

        # Check to see if the document has been loaded before.
        fetched_doc: dict[str, Any] = self.document_repository.load(doc.id)

        # If it has been loaded, add that one to the file_index.
        if len(fetched_doc['document']) != 0:
            doc = Document(**fetched_doc['document'][0])

        return doc

    def _load_engram_file(self, file_path: Path) -> None:
        """
        Load an .engram TOML file.

        Args:
            repo_id (str): Repository ID
            folder (str): Repository folder name
            relative_dir (str): Relative directory path
            file_name (str): Name of the .engram file
            file_path (Path): Full path to the .engram file
        """
        try:
            # Load the TOML content
            with file_path.open('rb') as f:
                engram_data = tomli.load(f)

            engram_id = engram_data['engram'][0]['id']
            engram = self.engram_repository.fetch_engram(engram_id)

            if engram is None:
                logging.info('Loaded .engram file: %s', file_path)
                logging.debug('Engram data: %s', engram_data)
                engram_data.update({'parent_id': None})
                engram_data.update({'tracking_id': ''})

                engram_data['engram'][0]['context'] = json.loads(engram_data['engram'][0]['context'])

                observation = self.observation_repository.load_toml_dict(engram_data)

                async def send_message() -> Observation:
                    self.send_message_async(
                        Service.Topic.OBSERVATION_CREATED, {'id': observation.id, 'parent_id': None}
                    )

                    return observation

                task = self.run_task(send_message())
                task.add_done_callback(self._on_observation_created_complete)

                # TODO: Process the loaded TOML data according to .engram file schema

        except (FileNotFoundError, PermissionError):
            logging.warning("Could not read .engram file '%s'", file_path)
        except tomli.TOMLDecodeError:
            logging.exception("Invalid TOML format in .engram file '%s'", file_path)
        except Exception:
            logging.exception("Unexpected error loading .engram file '%s'", file_path)

    def _on_observation_created_complete(self, ret: Future[Any]) -> None:
        observation = ret.result()
        self.send_message_async(Service.Topic.OBSERVATION_COMPLETE, asdict(observation))

    def _on_update_repo_files_complete(self, ret: Future[Any]) -> None:
        """
        Callback when the update_repo_files task completes.

        Args:
            ret (Future[Any]): Future object representing the completed task.
        """
        ret.result()

__init__(host)

Initializes the repository service.

Parameters:

Name Type Description Default
host Host

The host system that this service is attached to.

required
Source code in src/engramic/application/repo/repo_service.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(self, host: Host) -> None:
    """
    Initializes the repository service.

    Args:
        host (Host): The host system that this service is attached to.
    """
    super().__init__(host)
    self.plugin_manager: PluginManager = host.plugin_manager
    self.db_document_plugin = self.plugin_manager.get_plugin('db', 'document')
    self.document_repository: DocumentRepository = DocumentRepository(self.db_document_plugin)
    self.engram_repository: EngramRepository = EngramRepository(self.db_document_plugin)
    self.observation_repository: ObservationRepository = ObservationRepository(self.db_document_plugin)
    self.repos: dict[str, Repo] = {}  # memory copy of all folders
    self.file_index: dict[str, Any] = {}  # memory copy of all files
    self.file_repos: dict[str, Any] = {}  # memory copy of all files in repos
    self.submitted_documents: set[str] = set()

init_async()

Initializes asynchronous components of the service.

Source code in src/engramic/application/repo/repo_service.py
 98
 99
100
101
102
def init_async(self) -> None:
    """
    Initializes asynchronous components of the service.
    """
    return super().init_async()

scan_folders(repo_id=None)

Scans repository folders and indexes their files.

Discovers repositories, indexes their files, and sends messages with the repository information.

Parameters:

Name Type Description Default
repo_id dict[str, str] | None

Optional dictionary containing repository ID with key "repo_id". If None, scans all repositories.

None

Raises:

Type Description
RuntimeError

If the REPO_ROOT environment variable is not set.

ValueError

If the specified repo_id is not found.

Source code in src/engramic/application/repo/repo_service.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def scan_folders(self, repo_id: dict[str, str] | None = None) -> None:
    """
    Scans repository folders and indexes their files.

    Discovers repositories, indexes their files, and sends messages with the repository information.

    Args:
        repo_id (dict[str,str] | None): Optional dictionary containing repository ID with key "repo_id". If None, scans all repositories.

    Raises:
        RuntimeError: If the REPO_ROOT environment variable is not set.
        ValueError: If the specified repo_id is not found.
    """
    repo_root = self._get_repo_root()
    repos_to_scan = self._determine_repos_to_scan(repo_id, repo_root)

    self._send_repo_folders_message()
    self._scan_and_index_repos(repos_to_scan, repo_root)

start()

Starts the repository service and subscribes to relevant topics.

Source code in src/engramic/application/repo/repo_service.py
88
89
90
91
92
93
94
95
96
def start(self) -> None:
    """
    Starts the repository service and subscribes to relevant topics.
    """
    self.subscribe(Service.Topic.REPO_SUBMIT_IDS, self._on_submit_ids)
    self.subscribe(Service.Topic.REPO_UPDATE_REPOS, self.scan_folders)
    self.subscribe(Service.Topic.PROGRESS_UPDATED, self._on_progress_updated)
    super().start()
    self.scan_folders()

submit_ids(id_array, *, overwrite=False)

Submits documents for processing by their IDs.

Parameters:

Name Type Description Default
id_array list[str]

List of document IDs to submit.

required
overwrite bool

Whether to overwrite existing documents. Defaults to False.

False
Source code in src/engramic/application/repo/repo_service.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def submit_ids(self, id_array: list[str], *, overwrite: bool = False) -> None:
    """
    Submits documents for processing by their IDs.

    Args:
        id_array (list[str]): List of document IDs to submit.
        overwrite (bool): Whether to overwrite existing documents. Defaults to False.
    """
    for sub_id in id_array:
        document = self.file_index[sub_id]
        self.send_message_async(
            Service.Topic.SUBMIT_DOCUMENT, {'document': asdict(document), 'overwrite': overwrite}
        )
        self.submitted_documents.add(document.id)

update_repo_files(repo_id, update_ids=None) async

Updates the list of files for a repository.

Parameters:

Name Type Description Default
repo_id str

ID of the repository to update.

required
update_ids list[str] | None

List of document IDs to update. If None, updates all files.

None
Source code in src/engramic/application/repo/repo_service.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
async def update_repo_files(self, repo_id: str, update_ids: list[str] | None = None) -> None:
    """
    Updates the list of files for a repository.

    Args:
        repo_id (str): ID of the repository to update.
        update_ids (list[str] | None): List of document IDs to update. If None, updates all files.
    """
    document_dicts = []

    folder = self.repos[repo_id].name

    update_list = self.file_repos[repo_id] if update_ids is None else update_ids

    document_dicts = [asdict(self.file_index[document_id]) for document_id in update_list]

    self.send_message_async(
        Service.Topic.REPO_FILES,
        {'repo': folder, 'repo_id': repo_id, 'files': document_dicts},
    )