Skip to content

Streaming

zenml.streaming

Producer API for live event streaming inside pipelines.

Attributes

__all__ = ['flush', 'publish'] module-attribute

Functions

flush(timeout: float = 2.0) -> bool

Block until all queued events have been sent or timeout elapses.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for the queue to drain.

2.0

Returns:

Type Description
bool

True if drained (or publisher never started). False on timeout.

Source code in src/zenml/streaming/publishing.py
315
316
317
318
319
320
321
322
323
324
325
326
def flush(timeout: float = 2.0) -> bool:
    """Block until all queued events have been sent or `timeout` elapses.

    Args:
        timeout: Maximum seconds to wait for the queue to drain.

    Returns:
        True if drained (or publisher never started). False on timeout.
    """
    if not _StreamPublisher._exists():
        return True
    return _StreamPublisher().flush(timeout=timeout)

publish(payload: Dict[str, Any], *, kind: str = 'event', correlation_id: Optional[str] = None, index: Optional[int] = None) -> None

Publish an event for the active pipeline run.

Parameters:

Name Type Description Default
payload Dict[str, Any]

JSON-encodable event payload.

required
kind str

Event kind tag.

'event'
correlation_id Optional[str]

Optional tag grouping events from one logical sub-flow.

None
index Optional[int]

Optional in-order index within a correlation group.

None

Raises:

Type Description
ValueError

If kind collides with a reserved control name, or payload is not JSON-encodable or exceeds the STREAM_EVENT_PAYLOAD_BYTES_MAX cap.

Source code in src/zenml/streaming/publishing.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def publish(
    payload: Dict[str, Any],
    *,
    kind: str = "event",
    correlation_id: Optional[str] = None,
    index: Optional[int] = None,
) -> None:
    """Publish an event for the active pipeline run.

    Args:
        payload: JSON-encodable event payload.
        kind: Event kind tag.
        correlation_id: Optional tag grouping events from one logical sub-flow.
        index: Optional in-order index within a correlation group.

    Raises:
        ValueError: If `kind` collides with a reserved control name, or
            `payload` is not JSON-encodable or exceeds the
            `STREAM_EVENT_PAYLOAD_BYTES_MAX` cap.
    """
    if kind in _RESERVED_KINDS:
        raise ValueError(
            f"`{kind}` is reserved for server-emitted control frames "
            f"({sorted(_RESERVED_KINDS)})"
        )
    ctx = _resolve_publish_context()
    if ctx is None:
        logger.warning(
            "`publish(...)` called outside a pipeline run, ignoring event: %s",
            payload,
        )
        return

    event = StreamEvent(
        pipeline_run_id=ctx.pipeline_run_id,
        step_run_id=ctx.step_run_id,
        step_name=ctx.step_name,
        kind=kind,
        correlation_id=correlation_id,
        index=index,
        payload=payload,
    )
    _StreamPublisher().publish(event)

Modules

publishing

Producer implementation for live event streaming inside pipelines.

Classes
Functions
flush(timeout: float = 2.0) -> bool

Block until all queued events have been sent or timeout elapses.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for the queue to drain.

2.0

Returns:

Type Description
bool

True if drained (or publisher never started). False on timeout.

Source code in src/zenml/streaming/publishing.py
315
316
317
318
319
320
321
322
323
324
325
326
def flush(timeout: float = 2.0) -> bool:
    """Block until all queued events have been sent or `timeout` elapses.

    Args:
        timeout: Maximum seconds to wait for the queue to drain.

    Returns:
        True if drained (or publisher never started). False on timeout.
    """
    if not _StreamPublisher._exists():
        return True
    return _StreamPublisher().flush(timeout=timeout)
publish(payload: Dict[str, Any], *, kind: str = 'event', correlation_id: Optional[str] = None, index: Optional[int] = None) -> None

Publish an event for the active pipeline run.

Parameters:

Name Type Description Default
payload Dict[str, Any]

JSON-encodable event payload.

required
kind str

Event kind tag.

'event'
correlation_id Optional[str]

Optional tag grouping events from one logical sub-flow.

None
index Optional[int]

Optional in-order index within a correlation group.

None

Raises:

Type Description
ValueError

If kind collides with a reserved control name, or payload is not JSON-encodable or exceeds the STREAM_EVENT_PAYLOAD_BYTES_MAX cap.

Source code in src/zenml/streaming/publishing.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def publish(
    payload: Dict[str, Any],
    *,
    kind: str = "event",
    correlation_id: Optional[str] = None,
    index: Optional[int] = None,
) -> None:
    """Publish an event for the active pipeline run.

    Args:
        payload: JSON-encodable event payload.
        kind: Event kind tag.
        correlation_id: Optional tag grouping events from one logical sub-flow.
        index: Optional in-order index within a correlation group.

    Raises:
        ValueError: If `kind` collides with a reserved control name, or
            `payload` is not JSON-encodable or exceeds the
            `STREAM_EVENT_PAYLOAD_BYTES_MAX` cap.
    """
    if kind in _RESERVED_KINDS:
        raise ValueError(
            f"`{kind}` is reserved for server-emitted control frames "
            f"({sorted(_RESERVED_KINDS)})"
        )
    ctx = _resolve_publish_context()
    if ctx is None:
        logger.warning(
            "`publish(...)` called outside a pipeline run, ignoring event: %s",
            payload,
        )
        return

    event = StreamEvent(
        pipeline_run_id=ctx.pipeline_run_id,
        step_run_id=ctx.step_run_id,
        step_name=ctx.step_name,
        kind=kind,
        correlation_id=correlation_id,
        index=index,
        payload=payload,
    )
    _StreamPublisher().publish(event)