Skip to content

Message Service

Bases: BaseMessageService

A system-level message handling service that provides runtime CPU profiling and metrics reporting.

MessageService extends BaseMessageService to handle system-level control messages, enabling dynamic profiling using Python's built-in cProfile module and responding to acknowledgment messages for metrics tracking. It operates by subscribing to control topics and exposing runtime observability features.

Attributes:

Name Type Description
profiler Profile | None

An optional CPU profiler used to capture runtime performance data. The profiler is started and stopped in response to specific control messages and saves output to a file named 'profile_output.prof'.

Methods:

Name Description
init_async

Resets the profiler during asynchronous initialization.

start

Subscribes to relevant system topics for profiler control and metric acknowledgment.

stop

Gracefully shuts down the message service.

start_profiler

dict[Any, Any]) -> None: Initializes and starts the CPU profiler.

end_profiler

dict[Any, Any]) -> None: Stops the profiler and dumps results to a profile file.

on_acknowledge

str) -> None: Sends a metric snapshot and service status in response to ACKNOWLEDGE messages.

Source code in src/engramic/application/message/message_service.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class MessageService(BaseMessageService):
    """
    A system-level message handling service that provides runtime CPU profiling and metrics reporting.

    MessageService extends BaseMessageService to handle system-level control messages, enabling
    dynamic profiling using Python's built-in `cProfile` module and responding to acknowledgment
    messages for metrics tracking. It operates by subscribing to control topics and exposing
    runtime observability features.

    Attributes:
        profiler (cProfile.Profile | None): An optional CPU profiler used to capture runtime performance data.
            The profiler is started and stopped in response to specific control messages and saves output
            to a file named 'profile_output.prof'.

    Methods:
        init_async() -> None:
            Resets the profiler during asynchronous initialization.
        start() -> None:
            Subscribes to relevant system topics for profiler control and metric acknowledgment.
        stop() -> None:
            Gracefully shuts down the message service.
        start_profiler(data: dict[Any, Any]) -> None:
            Initializes and starts the CPU profiler.
        end_profiler(data: dict[Any, Any]) -> None:
            Stops the profiler and dumps results to a profile file.
        on_acknowledge(message_in: str) -> None:
            Sends a metric snapshot and service status in response to ACKNOWLEDGE messages.
    """

    def __init__(self, host: Host) -> None:
        super().__init__(host)
        self.profiler: cProfile.Profile | None = None

    def init_async(self) -> None:
        super().init_async()
        self.profiler = None

    def start(self) -> None:
        self.subscribe(Service.Topic.ACKNOWLEDGE, self.on_acknowledge)
        self.subscribe(Service.Topic.START_PROFILER, self.start_profiler)
        self.subscribe(Service.Topic.END_PROFILER, self.end_profiler)
        super().start()

    async def stop(self) -> None:
        await super().stop()

    def start_profiler(self, data: dict[Any, Any]) -> None:
        if data is not None:
            del data
        logging.info('Start Profiler')
        self.profiler = cProfile.Profile()
        if self.profiler:
            self.profiler.enable()

    def end_profiler(self, data: dict[Any, Any]) -> None:
        if data is not None:
            del data
        logging.info('Stop Profiler')
        if self.profiler:
            self.profiler.disable()
            self.profiler.dump_stats('profile_output.prof')

    def on_acknowledge(self, message_in: str) -> None:
        del message_in

        metrics_packet: MetricPacket = self.metrics_tracker.get_and_reset_packet()

        self.send_message_async(
            Service.Topic.STATUS,
            {'id': self.id, 'name': self.__class__.__name__, 'timestamp': time.time(), 'metrics': metrics_packet},
        )