Skip to content

Dispatcher

zenml.dispatcher

Attributes

__all__ = ['EventDispatcher', 'EventHandler'] module-attribute

Classes

EventDispatcher()

Event Dispatcher class.

Event Dispatcher constructor.

Source code in src/zenml/dispatcher/dispatcher.py
29
30
31
32
def __init__(self) -> None:
    """Event Dispatcher constructor."""
    self._event_handlers: list[EventHandler] = []
    self._handlers_lock = threading.Lock()
Functions
handle_run_status_update(run: PipelineRunResponse) -> None

Handle a status update on a PipelineRun object.

Note: Status updates are a run-specific concept. This method is non-generalizable across types by design. To support richer events like creation or deletion of a resource we should extend the interface signature with generic methods.

Parameters:

Name Type Description Default
run PipelineRunResponse

A PipelineRunResponse object (with a status change).

required
Source code in src/zenml/dispatcher/dispatcher.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a PipelineRun object.

    Note: Status updates are a run-specific concept. This
    method is non-generalizable across types by design. To support richer events
    like `creation` or `deletion` of a resource we should extend the interface
    signature with generic methods.

    Args:
        run: A PipelineRunResponse object (with a status change).
    """
    if not self._event_handlers:
        return

    for event_handler in self._event_handlers:
        try:
            logger.debug(
                "Event handler: %s picking up %s status change to %s",
                event_handler.__class__.__name__,
                run.id,
                run.status,
            )
            event_handler.handle_run_status_update(run)
        except Exception as exc:
            logger.exception(
                "%s failed to handle update",
                event_handler.__class__.__name__,
                exc_info=exc,
            )
register_event_handler(event_handler: EventHandler) -> None

Register an event handler.

Parameters:

Name Type Description Default
event_handler EventHandler

An event handler to register.

required
Source code in src/zenml/dispatcher/dispatcher.py
34
35
36
37
38
39
40
41
def register_event_handler(self, event_handler: EventHandler) -> None:
    """Register an event handler.

    Args:
        event_handler: An event handler to register.
    """
    with self._handlers_lock:
        self._event_handlers.append(event_handler)

EventHandler

Bases: ABC

Abstraction for EventHandler classes.

Functions
create() -> EventHandler abstractmethod async staticmethod

Factory method.

Returns:

Type Description
EventHandler

An EventHandler instance.

Source code in src/zenml/dispatcher/handler.py
41
42
43
44
45
46
47
48
49
@staticmethod
@abstractmethod
async def create() -> "EventHandler":
    """Factory method.

    Returns:
        An EventHandler instance.
    """
    pass
handle_run_status_update(run: PipelineRunResponse) -> None abstractmethod

Handle a status update on a PipelineRun object.

Note: Status updates are a run-specific concept. This method is non-generalizable across types by design. To support richer events like creation or deletion of a resource we should extend the interface signature with generic methods.

Parameters:

Name Type Description Default
run PipelineRunResponse

A PipelineRunResponse object (with a status change).

required
Source code in src/zenml/dispatcher/handler.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@abstractmethod
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a PipelineRun object.

    Note: Status updates are a run-specific concept. This
    method is non-generalizable across types by design. To support richer events
    like `creation` or `deletion` of a resource we should extend the interface
    signature with generic methods.

    Args:
        run: A PipelineRunResponse object (with a status change).
    """
    pass

Modules

dispatcher

Event dispatcher base functionality.

Classes
EventDispatcher()

Event Dispatcher class.

Event Dispatcher constructor.

Source code in src/zenml/dispatcher/dispatcher.py
29
30
31
32
def __init__(self) -> None:
    """Event Dispatcher constructor."""
    self._event_handlers: list[EventHandler] = []
    self._handlers_lock = threading.Lock()
Functions
handle_run_status_update(run: PipelineRunResponse) -> None

Handle a status update on a PipelineRun object.

Note: Status updates are a run-specific concept. This method is non-generalizable across types by design. To support richer events like creation or deletion of a resource we should extend the interface signature with generic methods.

Parameters:

Name Type Description Default
run PipelineRunResponse

A PipelineRunResponse object (with a status change).

required
Source code in src/zenml/dispatcher/dispatcher.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a PipelineRun object.

    Note: Status updates are a run-specific concept. This
    method is non-generalizable across types by design. To support richer events
    like `creation` or `deletion` of a resource we should extend the interface
    signature with generic methods.

    Args:
        run: A PipelineRunResponse object (with a status change).
    """
    if not self._event_handlers:
        return

    for event_handler in self._event_handlers:
        try:
            logger.debug(
                "Event handler: %s picking up %s status change to %s",
                event_handler.__class__.__name__,
                run.id,
                run.status,
            )
            event_handler.handle_run_status_update(run)
        except Exception as exc:
            logger.exception(
                "%s failed to handle update",
                event_handler.__class__.__name__,
                exc_info=exc,
            )
register_event_handler(event_handler: EventHandler) -> None

Register an event handler.

Parameters:

Name Type Description Default
event_handler EventHandler

An event handler to register.

required
Source code in src/zenml/dispatcher/dispatcher.py
34
35
36
37
38
39
40
41
def register_event_handler(self, event_handler: EventHandler) -> None:
    """Register an event handler.

    Args:
        event_handler: An event handler to register.
    """
    with self._handlers_lock:
        self._event_handlers.append(event_handler)

handler

Event Handler base functionality.

Classes
EventHandler

Bases: ABC

Abstraction for EventHandler classes.

Functions
create() -> EventHandler abstractmethod async staticmethod

Factory method.

Returns:

Type Description
EventHandler

An EventHandler instance.

Source code in src/zenml/dispatcher/handler.py
41
42
43
44
45
46
47
48
49
@staticmethod
@abstractmethod
async def create() -> "EventHandler":
    """Factory method.

    Returns:
        An EventHandler instance.
    """
    pass
handle_run_status_update(run: PipelineRunResponse) -> None abstractmethod

Handle a status update on a PipelineRun object.

Note: Status updates are a run-specific concept. This method is non-generalizable across types by design. To support richer events like creation or deletion of a resource we should extend the interface signature with generic methods.

Parameters:

Name Type Description Default
run PipelineRunResponse

A PipelineRunResponse object (with a status change).

required
Source code in src/zenml/dispatcher/handler.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@abstractmethod
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a PipelineRun object.

    Note: Status updates are a run-specific concept. This
    method is non-generalizable across types by design. To support richer events
    like `creation` or `deletion` of a resource we should extend the interface
    signature with generic methods.

    Args:
        run: A PipelineRunResponse object (with a status change).
    """
    pass