Skip to content

Dispatcher

zenml.dispatcher

Event dispatcher for pipeline run lifecycle events.

Attributes

__all__ = ['EventDispatcher', 'EventHandler'] module-attribute

Classes

EventDispatcher()

Process-wide broadcaster of pipeline run lifecycle events.

Initialize the dispatcher with an empty handler list.

Source code in src/zenml/dispatcher/dispatcher.py
30
31
32
33
def __init__(self) -> None:
    """Initialize the dispatcher with an empty handler list."""
    self._event_handlers: List[EventHandler] = []
    self._handlers_lock = threading.Lock()
Functions
handle_run_status_update(run: PipelineRunResponse) -> None

Handle a status update on a pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The pipeline run whose status has changed.

required
Source code in src/zenml/dispatcher/dispatcher.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a pipeline run.

    Args:
        run: The pipeline run whose status has changed.
    """
    with self._handlers_lock:
        handlers = list(self._event_handlers)

    for event_handler in handlers:
        try:
            logger.debug(
                "Event handler: %s picking up %s status change to %s",
                event_handler.__class__.__name__,
                run.id,
                run.status,
            )
            event_handler.handle_run_status_update(run)
        except Exception as exc:
            logger.exception(
                "%s failed to handle update",
                event_handler.__class__.__name__,
                exc_info=exc,
            )
has_handlers() -> bool

Return True if any handlers are registered.

Returns:

Type Description
bool

Whether the dispatcher would fan out to at least one handler.

Source code in src/zenml/dispatcher/dispatcher.py
58
59
60
61
62
63
64
65
def has_handlers(self) -> bool:
    """Return True if any handlers are registered.

    Returns:
        Whether the dispatcher would fan out to at least one handler.
    """
    with self._handlers_lock:
        return bool(self._event_handlers)
register_event_handler(event_handler: EventHandler) -> None

Register an event handler.

Parameters:

Name Type Description Default
event_handler EventHandler

An event handler to register.

required
Source code in src/zenml/dispatcher/dispatcher.py
35
36
37
38
39
40
41
42
def register_event_handler(self, event_handler: EventHandler) -> None:
    """Register an event handler.

    Args:
        event_handler: An event handler to register.
    """
    with self._handlers_lock:
        self._event_handlers.append(event_handler)
unregister_event_handler(event_handler: EventHandler) -> None

Unregister a previously-registered event handler.

No-op if the handler isn't registered.

Parameters:

Name Type Description Default
event_handler EventHandler

The event handler instance to remove.

required
Source code in src/zenml/dispatcher/dispatcher.py
44
45
46
47
48
49
50
51
52
53
54
55
56
def unregister_event_handler(self, event_handler: EventHandler) -> None:
    """Unregister a previously-registered event handler.

    No-op if the handler isn't registered.

    Args:
        event_handler: The event handler instance to remove.
    """
    with self._handlers_lock:
        try:
            self._event_handlers.remove(event_handler)
        except ValueError:
            pass

EventHandler

Bases: ABC

Abstract base for handlers registered with EventDispatcher.

Functions
create() -> EventHandler async classmethod

Factory used by the source loader.

Needs to be implemented if a handler is specified via server_config.event_handler_sources.

Raises:

Type Description
NotImplementedError

Always.

Returns:

Type Description
EventHandler

Handler instance.

Source code in src/zenml/dispatcher/handler.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
async def create(cls) -> "EventHandler":
    """Factory used by the source loader.

    Needs to be implemented if a handler is specified via
    `server_config.event_handler_sources`.

    Raises:
        NotImplementedError: Always.

    Returns:
        Handler instance.
    """
    raise NotImplementedError(
        f"{cls.__name__}.create() is not implemented."
    )
handle_run_status_update(run: PipelineRunResponse) -> None abstractmethod

Handle a status update on a pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The pipeline run whose status has changed.

required
Source code in src/zenml/dispatcher/handler.py
24
25
26
27
28
29
30
31
32
33
@abstractmethod
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a pipeline run.

    Args:
        run: The pipeline run whose status has changed.
    """

Modules

dispatcher

Event dispatcher that fans out run lifecycle events to handlers.

Classes
EventDispatcher()

Process-wide broadcaster of pipeline run lifecycle events.

Initialize the dispatcher with an empty handler list.

Source code in src/zenml/dispatcher/dispatcher.py
30
31
32
33
def __init__(self) -> None:
    """Initialize the dispatcher with an empty handler list."""
    self._event_handlers: List[EventHandler] = []
    self._handlers_lock = threading.Lock()
Functions
handle_run_status_update(run: PipelineRunResponse) -> None

Handle a status update on a pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The pipeline run whose status has changed.

required
Source code in src/zenml/dispatcher/dispatcher.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a pipeline run.

    Args:
        run: The pipeline run whose status has changed.
    """
    with self._handlers_lock:
        handlers = list(self._event_handlers)

    for event_handler in handlers:
        try:
            logger.debug(
                "Event handler: %s picking up %s status change to %s",
                event_handler.__class__.__name__,
                run.id,
                run.status,
            )
            event_handler.handle_run_status_update(run)
        except Exception as exc:
            logger.exception(
                "%s failed to handle update",
                event_handler.__class__.__name__,
                exc_info=exc,
            )
has_handlers() -> bool

Return True if any handlers are registered.

Returns:

Type Description
bool

Whether the dispatcher would fan out to at least one handler.

Source code in src/zenml/dispatcher/dispatcher.py
58
59
60
61
62
63
64
65
def has_handlers(self) -> bool:
    """Return True if any handlers are registered.

    Returns:
        Whether the dispatcher would fan out to at least one handler.
    """
    with self._handlers_lock:
        return bool(self._event_handlers)
register_event_handler(event_handler: EventHandler) -> None

Register an event handler.

Parameters:

Name Type Description Default
event_handler EventHandler

An event handler to register.

required
Source code in src/zenml/dispatcher/dispatcher.py
35
36
37
38
39
40
41
42
def register_event_handler(self, event_handler: EventHandler) -> None:
    """Register an event handler.

    Args:
        event_handler: An event handler to register.
    """
    with self._handlers_lock:
        self._event_handlers.append(event_handler)
unregister_event_handler(event_handler: EventHandler) -> None

Unregister a previously-registered event handler.

No-op if the handler isn't registered.

Parameters:

Name Type Description Default
event_handler EventHandler

The event handler instance to remove.

required
Source code in src/zenml/dispatcher/dispatcher.py
44
45
46
47
48
49
50
51
52
53
54
55
56
def unregister_event_handler(self, event_handler: EventHandler) -> None:
    """Unregister a previously-registered event handler.

    No-op if the handler isn't registered.

    Args:
        event_handler: The event handler instance to remove.
    """
    with self._handlers_lock:
        try:
            self._event_handlers.remove(event_handler)
        except ValueError:
            pass
Functions

handler

Base classes for event handlers registered with the dispatcher.

Classes
EventHandler

Bases: ABC

Abstract base for handlers registered with EventDispatcher.

Functions
create() -> EventHandler async classmethod

Factory used by the source loader.

Needs to be implemented if a handler is specified via server_config.event_handler_sources.

Raises:

Type Description
NotImplementedError

Always.

Returns:

Type Description
EventHandler

Handler instance.

Source code in src/zenml/dispatcher/handler.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@classmethod
async def create(cls) -> "EventHandler":
    """Factory used by the source loader.

    Needs to be implemented if a handler is specified via
    `server_config.event_handler_sources`.

    Raises:
        NotImplementedError: Always.

    Returns:
        Handler instance.
    """
    raise NotImplementedError(
        f"{cls.__name__}.create() is not implemented."
    )
handle_run_status_update(run: PipelineRunResponse) -> None abstractmethod

Handle a status update on a pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The pipeline run whose status has changed.

required
Source code in src/zenml/dispatcher/handler.py
24
25
26
27
28
29
30
31
32
33
@abstractmethod
def handle_run_status_update(
    self,
    run: PipelineRunResponse,
) -> None:
    """Handle a status update on a pipeline run.

    Args:
        run: The pipeline run whose status has changed.
    """