Skip to content

Message Service

Bases: BaseMessageService

A system-level message handling service that provides runtime profiling capabilities and reports metrics.

The MessageService listens for system-level control messages, such as those initiating or ending profiling sessions, and acknowledgment messages for metrics reporting. It extends BaseMessageService to provide core message routing functionality, with additional support for CPU profiling via cProfile.

Attributes:

Name Type Description
profiler Profile | None

An optional CPU profiler instance used to monitor performance during execution. Initialized and controlled via START_PROFILER and END_PROFILER messages.

Methods:

Name Description
init_async

Initializes the service asynchronously, resetting any existing profiler instance.

start

Subscribes to system topics to enable profiler control and metric acknowledgment.

stop

Stops the message service.

start_profiler

Starts the CPU profiler if not already running.

end_profiler

Stops the CPU profiler and writes the profiling data to a file.

on_acknowledge

Sends a status update containing tracked metrics upon acknowledgment.

Source code in src/engramic/application/message/message_service.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class MessageService(BaseMessageService):
    """
    A system-level message handling service that provides runtime profiling capabilities and reports metrics.

    The MessageService listens for system-level control messages, such as those initiating or ending
    profiling sessions, and acknowledgment messages for metrics reporting. It extends BaseMessageService
    to provide core message routing functionality, with additional support for CPU profiling via `cProfile`.

    Attributes:
        profiler (cProfile.Profile | None): An optional CPU profiler instance used to monitor performance
            during execution. Initialized and controlled via START_PROFILER and END_PROFILER messages.

    Methods:
        init_async(): Initializes the service asynchronously, resetting any existing profiler instance.
        start(): Subscribes to system topics to enable profiler control and metric acknowledgment.
        stop(): Stops the message service.
        start_profiler(data): Starts the CPU profiler if not already running.
        end_profiler(data): Stops the CPU profiler and writes the profiling data to a file.
        on_acknowledge(message_in): Sends a status update containing tracked metrics upon acknowledgment.
    """

    def __init__(self, host: Host) -> None:
        super().__init__(host)
        self.profiler: cProfile.Profile | None = None

    def init_async(self) -> None:
        super().init_async()
        self.profiler = None

    def start(self) -> None:
        self.subscribe(Service.Topic.ACKNOWLEDGE, self.on_acknowledge)
        self.subscribe(Service.Topic.START_PROFILER, self.start_profiler)
        self.subscribe(Service.Topic.END_PROFILER, self.end_profiler)

    def stop(self) -> None:
        super().stop()

    def start_profiler(self, data: dict[Any, Any]) -> None:
        if data is not None:
            del data
        logging.info('Start Profiler')
        self.profiler = cProfile.Profile()
        if self.profiler:
            self.profiler.enable()

    def end_profiler(self, data: dict[Any, Any]) -> None:
        if data is not None:
            del data
        logging.info('Stop Profiler')
        if self.profiler:
            self.profiler.disable()
            self.profiler.dump_stats('profile_output.prof')

    def on_acknowledge(self, message_in: str) -> None:
        del message_in

        metrics_packet: MetricPacket = self.metrics_tracker.get_and_reset_packet()

        self.send_message_async(
            Service.Topic.STATUS,
            {'id': self.id, 'name': self.__class__.__name__, 'timestamp': time.time(), 'metrics': metrics_packet},
        )