Skip to content

Pub Sub

zenml.pub_sub

Modules

base

Base abstractions for the pub/sub layer.

Classes
ConsumerBase(config: ConsumerRuntimeConfig, executor: Executor, event_handler: CriticalEventHandler | None = None)

Bases: ABC

Async core pipeline shared by polling and push transports.

The transport decides how raw messages arrive. The base class provides: - decode/preprocess - execute with retries - ack with retries (+ critical event on exhaustion) - invalid/failing handlers

Consumer base constructor.

Parameters:

Name Type Description Default
config ConsumerRuntimeConfig

The consumer config object.

required
executor Executor

An executor instance.

required
event_handler CriticalEventHandler | None

A CriticalEventHandler instance (optional).

None
Source code in src/zenml/pub_sub/base.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def __init__(
    self,
    config: ConsumerRuntimeConfig,
    executor: Executor,
    event_handler: CriticalEventHandler | None = None,
) -> None:
    """Consumer base constructor.

    Args:
        config: The consumer config object.
        executor: An executor instance.
        event_handler: A CriticalEventHandler instance (optional).
    """
    self._cfg = config
    self._executor = executor
    self._event_handler = event_handler
Attributes
config: ConsumerRuntimeConfig property

Implements the 'config' property.

Returns:

Type Description
ConsumerRuntimeConfig

The consumer config object.

Functions
acknowledge_message(raw_message: Any) -> None abstractmethod async

Acks/Deletes message from the queue.

Parameters:

Name Type Description Default
raw_message Any

The raw broker-specific message.

required
Source code in src/zenml/pub_sub/base.py
361
362
363
364
365
366
367
@abstractmethod
async def acknowledge_message(self, raw_message: Any) -> None:
    """Acks/Deletes message from the queue.

    Args:
        raw_message: The raw broker-specific message.
    """
handle_critical_event(event: CriticalEvent) -> None async

Critical event handler.

A critical event indicates that the application behavior has degraded. For instance, the inability to serialize the message to a broker-compatible format is extremely severe as it most probably repeats across messages meaning no messages are published for execution at all.

A complete implementation of a critical event handling process would be the following:

  1. Log in detailed fashion the event so that it is traceable in the logs.
  2. Publish the event to an event aggregator service so that it is tracked long-term and alerts can be generated based on its frequency and severity.
  3. Hold the message in a DLQ to enable re-execution when a bug fix is introduced in the code.

Parameters:

Name Type Description Default
event CriticalEvent

A critical event instance.

required
Source code in src/zenml/pub_sub/base.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
async def handle_critical_event(self, event: CriticalEvent) -> None:
    """Critical event handler.

    A critical event indicates that the application behavior has degraded.
    For instance, the inability to serialize the message to a broker-compatible format
    is extremely severe as it most probably repeats across messages meaning no messages
    are published for execution at all.

    A complete implementation of a critical event handling process would be the following:

    1. Log in detailed fashion the event so that it is traceable in the logs.
    2. Publish the event to an event aggregator service so that it is tracked long-term and
    alerts can be generated based on its frequency and severity.
    3. Hold the message in a DLQ to enable re-execution when a bug fix is introduced in the code.

    Args:
        event: A critical event instance.
    """
    logger.exception(
        "Critical event %s caused by the following exception:",
        event.value,
        exc_info=event.exception,
    )
    if self._event_handler:
        asyncio.create_task(self._event_handler.handle(event))
handle_invalid_message(raw_message: Any) -> None abstractmethod async

Handles message of invalid format (not suitable for execution).

Messages that are invalid should still be acked to avoid looping execution and system poisoning.

Parameters:

Name Type Description Default
raw_message Any

The raw broker-specific message.

required
Source code in src/zenml/pub_sub/base.py
369
370
371
372
373
374
375
376
377
378
379
@abstractmethod
async def handle_invalid_message(self, raw_message: Any) -> None:
    """Handles message of invalid format (not suitable for execution).

    Messages that are invalid should still be acked to avoid
    looping execution and system poisoning.

    Args:
        raw_message: The raw broker-specific message.

    """
handle_raw_message(raw_message: Any) -> None async

Handles one message at a time.

Parameters:

Name Type Description Default
raw_message Any

A raw broker-specific message.

required
Source code in src/zenml/pub_sub/base.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
async def handle_raw_message(self, raw_message: Any) -> None:
    """Handles one message at a time.

    Args:
        raw_message: A raw broker-specific message.
    """
    try:
        msg = await self.preprocess_message(raw_message)
    except MessageDecodeError:
        await self._ack_with_retries(raw_message=raw_message)
        await self.handle_invalid_message(raw_message)
        return

    if self.config.late_ack:
        await self._process_with_retries(msg)
        await self._ack_with_retries(raw_message=msg.raw_message)
    else:
        await self._ack_with_retries(raw_message=msg.raw_message)
        await self._process_with_retries(msg)
preprocess_message(raw_message: Any) -> MessageEnvelope abstractmethod async

Decode/Validate broker message and wrap to MessageEnvelope.

Parameters:

Name Type Description Default
raw_message Any

Broker-specific message.

required

Returns:

Type Description
MessageEnvelope

A MessageEnvelope object.

Source code in src/zenml/pub_sub/base.py
342
343
344
345
346
347
348
349
350
351
@abstractmethod
async def preprocess_message(self, raw_message: Any) -> MessageEnvelope:
    """Decode/Validate broker message and wrap to MessageEnvelope.

    Args:
        raw_message: Broker-specific message.

    Returns:
        A MessageEnvelope object.
    """
process_message(message: MessageEnvelope) -> None async

Execution of message payload.

Parameters:

Name Type Description Default
message MessageEnvelope

A MessageEnvelope object.

required
Source code in src/zenml/pub_sub/base.py
353
354
355
356
357
358
359
async def process_message(self, message: MessageEnvelope) -> None:
    """Execution of message payload.

    Args:
        message: A MessageEnvelope object.
    """
    await self._executor.execute(payload=message.payload)
run() -> None abstractmethod async

Starts the consumer task.

Source code in src/zenml/pub_sub/base.py
470
471
472
473
@abstractmethod
async def run(self) -> None:
    """Starts the consumer task."""
    pass
ConsumerRuntimeConfig

Bases: ConfigBase

Config shared by polling and push consumers.

Functions
prefixes() -> list[str] staticmethod

Implementation of the prefixes config method.

Returns:

Type Description
list[str]

A list of prefixes to use for env var name resolution.

Source code in src/zenml/pub_sub/base.py
232
233
234
235
236
237
238
239
@staticmethod
def prefixes() -> list[str]:
    """Implementation of the prefixes config method.

    Returns:
        A list of prefixes to use for env var name resolution.
    """
    return [ENV_ZENML_CONSUMER_PREFIX]
CriticalEventHandler

Bases: ABC

Abstraction for handling critical events.

Functions
handle(event: CriticalEvent) -> None abstractmethod async

Handles a critical event.

Handling of a critical event would be include the following operations: - Tracking of the event instance and their evolution - Alerting mechanisms based on event monitoring alarms - Mitigation actions

Parameters:

Name Type Description Default
event CriticalEvent

A CriticalEvent instance.

required
Source code in src/zenml/pub_sub/base.py
79
80
81
82
83
84
85
86
87
88
89
90
91
@abstractmethod
async def handle(self, event: CriticalEvent) -> None:
    """Handles a critical event.

    Handling of a critical event would be include the following operations:
    - Tracking of the event instance and their evolution
    - Alerting mechanisms based on event monitoring alarms
    - Mitigation actions

    Args:
        event: A CriticalEvent instance.
    """
    pass
Executor

Bases: ABC

Abstract executor interface.

Functions
execute(payload: MessagePayload) -> None abstractmethod async

Executor main method.

Responsible with executing the payload and adding implementing additional layers of error handling and retries:

  • Should decide whether to propagate errors to the queue layer.
  • Should decide whether retries are applicable.

Parameters:

Name Type Description Default
payload MessagePayload

A standard message payload.

required
Source code in src/zenml/pub_sub/base.py
272
273
274
275
276
277
278
279
280
281
282
283
284
@abstractmethod
async def execute(self, payload: MessagePayload) -> None:
    """Executor main method.

    Responsible with executing the payload and adding implementing
    additional layers of error handling and retries:

    - Should decide whether to propagate errors to the queue layer.
    - Should decide whether retries are applicable.

    Args:
        payload: A standard message payload.
    """
max_message_capacity() -> int | None abstractmethod

Helper utility for variable batch size consumption.

Helpful when execution needs to be throttled. The executor responds with the number of messages it can pickup for execution at the moment.

Returns:

Type Description
int | None

Number of messages the executor can handle (None if no limit).

Source code in src/zenml/pub_sub/base.py
291
292
293
294
295
296
297
298
299
300
301
@abstractmethod
def max_message_capacity(self) -> int | None:
    """Helper utility for variable batch size consumption.

    Helpful when execution needs to be throttled. The executor
    responds with the number of messages it can pickup for execution
    at the moment.

    Returns:
        Number of messages the executor can handle (None if no limit).
    """
shutdown() -> None abstractmethod async

Graceful executor shutdown (completes work, then exits).

Source code in src/zenml/pub_sub/base.py
286
287
288
289
@abstractmethod
async def shutdown(self) -> None:
    """Graceful executor shutdown (completes work, then exits)."""
    pass
MessageDecodeError

Bases: Exception

Raised when a raw broker message cannot be decoded/preprocessed.

MessageEncodeError

Bases: Exception

Raised when a payload cannot be encoded into a broker message.

MessageExecutionError

Bases: Exception

Raised when message processing fails after retries.

MessageSubmissionError

Bases: Exception

Raised when sending a message fails.

PollingConfig

Bases: ConsumerRuntimeConfig

Base config class for polling consumers.

Functions
prefixes() -> list[str] staticmethod

Implementation of the prefixes config method.

Returns:

Type Description
list[str]

A list of prefixes to use for env var name resolution.

Source code in src/zenml/pub_sub/base.py
479
480
481
482
483
484
485
486
487
488
@staticmethod
def prefixes() -> list[str]:
    """Implementation of the prefixes config method.

    Returns:
        A list of prefixes to use for env var name resolution.
    """
    return ConsumerRuntimeConfig.prefixes() + [
        ENV_ZENML_CONSUMER_POLLING_PREFIX
    ]
PollingConsumer(config: PollingConfig, executor: Executor, event_handler: CriticalEventHandler | None = None)

Bases: ConsumerBase, ABC

Polling transport: the consumer fetches batches periodically.

Polling consumer constructor.

Parameters:

Name Type Description Default
config PollingConfig

Polling config instance.

required
executor Executor

Executor instance.

required
event_handler CriticalEventHandler | None

A CriticalEventHandler instance (optional).

None
Source code in src/zenml/pub_sub/base.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def __init__(
    self,
    config: PollingConfig,
    executor: Executor,
    event_handler: CriticalEventHandler | None = None,
) -> None:
    """Polling consumer constructor.

    Args:
        config: Polling config instance.
        executor: Executor instance.
        event_handler: A CriticalEventHandler instance (optional).
    """
    super().__init__(
        config, executor=executor, event_handler=event_handler
    )
    self._stopped: bool = False

    # Jitter is computed once at construction (fixed for this consumer instance).
    self._polling_interval = config.interval + random.uniform(
        -config.jitter, config.jitter
    )
Attributes
polling_interval: float property

Implement the 'polling_interval' property.

Returns:

Type Description
float

The polling interval value.

Functions
get_messages_batch_size() -> int abstractmethod

Getter for the batch size.

Enables implementations that adapt batch size per iteration based on configuration, throttling or other factors.

Returns:

Type Description
int

The size of the next batch of messages.

Source code in src/zenml/pub_sub/base.py
554
555
556
557
558
559
560
561
562
563
@abstractmethod
def get_messages_batch_size(self) -> int:
    """Getter for the batch size.

    Enables implementations that adapt batch size per iteration
    based on configuration, throttling or other factors.

    Returns:
        The size of the next batch of messages.
    """
poll_once() -> None async

Receive and handle one batch. Receive failures emit read_failed.

Source code in src/zenml/pub_sub/base.py
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
async def poll_once(self) -> None:
    """Receive and handle one batch. Receive failures emit read_failed."""
    try:
        batch_size = self.get_messages_batch_size()
        raw_messages = await self.receive_messages(batch_size=batch_size)
    except Exception as exc:
        await self.handle_critical_event(
            CriticalEvent(
                value=CriticalEventType.READ_FAILED,
                description=f"Receive failed: {exc!r}",
                created_at=datetime.now(tz=timezone.utc),
                exception=exc,
            )
        )
        return

    for raw in raw_messages:
        await self.handle_raw_message(raw)
receive_messages(batch_size: int | None = None) -> list[Any] abstractmethod async

Requests to get a batch of messages from the queue.

Parameters:

Name Type Description Default
batch_size int | None

Number of messages to get.

None

Returns:

Type Description
list[Any]

A list of messages (broker-specific payload).

Source code in src/zenml/pub_sub/base.py
541
542
543
544
545
546
547
548
549
550
551
552
@abstractmethod
async def receive_messages(
    self, batch_size: int | None = None
) -> list[Any]:
    """Requests to get a batch of messages from the queue.

    Args:
        batch_size: Number of messages to get.

    Returns:
        A list of messages (broker-specific payload).
    """
run() -> None async

Run polling loop until stop() is called.

Source code in src/zenml/pub_sub/base.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
async def run(self) -> None:
    """Run polling loop until stop() is called."""
    try:
        while not self._stopped:
            try:
                start = asyncio.get_running_loop().time()
                await self.poll_once()

                elapsed = asyncio.get_running_loop().time() - start

                if elapsed > self._polling_interval:
                    logger.debug("Elapsed time for polling: %s", elapsed)
                remaining = self.polling_interval - elapsed
                if remaining > 0:
                    await asyncio.sleep(remaining)
            except Exception as exc:
                logger.exception(
                    "Unhandled exception during polling: %s", exc_info=exc
                )
    finally:
        self.stop()
        await self._executor.shutdown()
stop() -> None

Signal the run loop to stop.

Source code in src/zenml/pub_sub/base.py
528
529
530
def stop(self) -> None:
    """Signal the run loop to stop."""
    self._stopped = True
ProducerBase(config: ProducerConfig, event_handler: CriticalEventHandler | None = None)

Bases: ABC

Async producer base.

Backends implement

1) build_message(payload) -> backend-specific message object (Any) 2) send_message(message) -> submit to broker

Base provides
  • publish(payload): build + send (+ optional retries) and emits CriticalEvent on failure.

Producer constructor.

Parameters:

Name Type Description Default
config ProducerConfig

The producer config object.

required
event_handler CriticalEventHandler | None

A CriticalEventHandler instance (optional).

None
Source code in src/zenml/pub_sub/base.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def __init__(
    self,
    config: ProducerConfig,
    event_handler: CriticalEventHandler | None = None,
) -> None:
    """Producer constructor.

    Args:
        config: The producer config object.
        event_handler: A CriticalEventHandler instance (optional).
    """
    self._cfg = config
    self._event_handler = event_handler
Attributes
config: ProducerConfig property

Implements the 'config' property.

Returns:

Type Description
ProducerConfig

The producer config object.

Functions
build_message(payload: MessagePayload) -> Any abstractmethod async

Convert the message to a broker-specific payload.

Parameters:

Name Type Description Default
payload MessagePayload

A MessagePayload object.

required

Returns:

Type Description
Any

A broker-specific message.

Source code in src/zenml/pub_sub/base.py
128
129
130
131
132
133
134
135
136
137
@abstractmethod
async def build_message(self, payload: MessagePayload) -> Any:
    """Convert the message to a broker-specific payload.

    Args:
        payload: A MessagePayload object.

    Returns:
        A broker-specific message.
    """
handle_critical_event(event: CriticalEvent) -> None async

Critical event handler.

A critical event indicates that the application behavior has degraded. For instance, the inability to serialize the message to a broker-compatible format is extremely severe as it most probably repeats across messages meaning no messages are published for execution at all.

A complete implementation of a critical event handling process would be the following:

  1. Log in detailed fashion the event so that it is traceable in the logs.
  2. Publish the event to an event aggregator service so that it is tracked long-term and alerts can be generated based on its frequency and severity.
  3. Hold the message in a DLQ to enable re-execution when a bug fix is introduced in the code.

Parameters:

Name Type Description Default
event CriticalEvent

A critical event instance.

required
Source code in src/zenml/pub_sub/base.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def handle_critical_event(self, event: CriticalEvent) -> None:
    """Critical event handler.

    A critical event indicates that the application behavior has degraded.
    For instance, the inability to serialize the message to a broker-compatible format
    is extremely severe as it most probably repeats across messages meaning no messages
    are published for execution at all.

    A complete implementation of a critical event handling process would be the following:

    1. Log in detailed fashion the event so that it is traceable in the logs.
    2. Publish the event to an event aggregator service so that it is tracked long-term and
    alerts can be generated based on its frequency and severity.
    3. Hold the message in a DLQ to enable re-execution when a bug fix is introduced in the code.

    Args:
        event: A critical event instance.
    """
    logger.exception(
        "Critical event %s caused by the following exception:",
        event.value,
        exc_info=event.exception if event.exception else None,
    )
    if self._event_handler:
        asyncio.create_task(self._event_handler.handle(event))
publish(payload: MessagePayload) -> str async

Build and send message, emitting CriticalEvent on failure.

Parameters:

Name Type Description Default
payload MessagePayload

Application payload.

required

Returns:

Type Description
str

Message ID generated by the broker.

Raises:

Type Description
MessageEncodeError

If encoding/building fails.

MessageSubmissionError

If sending fails after retries.

Source code in src/zenml/pub_sub/base.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
async def publish(self, payload: MessagePayload) -> str:
    """Build and send message, emitting CriticalEvent on failure.

    Args:
        payload: Application payload.

    Returns:
        Message ID generated by the broker.

    Raises:
        MessageEncodeError: If encoding/building fails.
        MessageSubmissionError: If sending fails after retries.
    """
    try:
        message = await self.build_message(payload)
    except Exception as exc:
        await self.handle_critical_event(
            CriticalEvent(
                value=CriticalEventType.ENCODE_FAILED,
                description=f"Encode failed for payload {payload.id}: {exc!r}",
                created_at=datetime.now(tz=timezone.utc),
                exception=exc,
            )
        )
        raise MessageEncodeError(
            f"Encode failed for payload {payload.id}"
        ) from exc

    attempts = self.config.send_retries + 1
    last_exc = None

    for attempt in range(attempts):
        try:
            return await self.send_message(message)
        except Exception as exc:
            last_exc = exc
            logger.exception(
                "Attempt %s to send message %s failed",
                attempt,
                payload.id,
            )

    await self.handle_critical_event(
        CriticalEvent(
            value=CriticalEventType.SEND_FAILED,
            description=f"Submission failed for payload {payload.id}",
            created_at=datetime.now(tz=timezone.utc),
            exception=last_exc,
        )
    )
    raise MessageSubmissionError(f"Send failed for payload {payload.id}")
send_message(message: Any) -> str abstractmethod async

Sends a message to the broker.

Parameters:

Name Type Description Default
message Any

A broker-specific message.

required

Returns:

Type Description
str

The message ID generated by the broker.

Source code in src/zenml/pub_sub/base.py
139
140
141
142
143
144
145
146
147
148
@abstractmethod
async def send_message(self, message: Any) -> str:
    """Sends a message to the broker.

    Args:
        message: A broker-specific message.

    Returns:
        The message ID generated by the broker.
    """
ProducerConfig

Bases: ConfigBase

Config object grouping producer configuration params.

Functions
prefixes() -> list[str] staticmethod

Implementation of the prefixes config method.

Returns:

Type Description
list[str]

A list of prefixes to use for env var name resolution.

Source code in src/zenml/pub_sub/base.py
60
61
62
63
64
65
66
67
@staticmethod
def prefixes() -> list[str]:
    """Implementation of the prefixes config method.

    Returns:
        A list of prefixes to use for env var name resolution.
    """
    return [ENV_ZENML_PRODUCER_PREFIX]

models

Models for the pub/sub layer.

Classes
CriticalEvent

Bases: BaseModel

Class capturing a critical event basic properties.

CriticalEventType

Bases: StrEnum

Critical event types enum.

MessageEnvelope

Bases: BaseModel

Envelope class for consumer operations (raw payload & de-serialized payload).

Attributes
id: str property

Implements the 'id' property.

Returns:

Type Description
str

The ID of the message payload.

MessagePayload

Bases: BaseModel

Class capturing a message payload basic properties.

SnapshotExecutionPayload

Bases: BaseModel

Payload class for snapshot async execution.