Skip to content

Event Hub

zenml.event_hub special

ZenML Event Hub module.

The Event Hub is responsible for receiving all Events and dispatching them to the relevant Subscribers/Triggers.

base_event_hub

Base class for event hub implementations.

BaseEventHub (ABC)

Base class for event hub implementations.

The event hub is responsible for relaying events from event sources to action handlers. It functions similarly to a pub/sub system where event source handlers publish events and action handlers subscribe to them.

The event hub also serves to decouple event sources from action handlers, allowing them to be configured independently and their implementations to be unaware of each other.

Source code in zenml/event_hub/base_event_hub.py
class BaseEventHub(ABC):
    """Base class for event hub implementations.

    The event hub is responsible for relaying events from event sources to
    action handlers. It functions similarly to a pub/sub system where
    event source handlers publish events and action handlers subscribe to them.

    The event hub also serves to decouple event sources from action handlers,
    allowing them to be configured independently and their implementations to be
    unaware of each other.
    """

    action_handlers: Dict[Tuple[str, str], ActionHandlerCallback] = {}

    @property
    def zen_store(self) -> "SqlZenStore":
        """Returns the active zen store.

        Returns:
            The active zen store.

        Raises:
            ValueError: If the active zen store is not a SQL zen store.
        """
        from zenml.zen_stores.sql_zen_store import SqlZenStore

        zen_store = GlobalConfiguration().zen_store

        if not isinstance(zen_store, SqlZenStore):
            raise ValueError("The event hub requires a SQL zen store.")

        return zen_store

    def subscribe_action_handler(
        self,
        action_flavor: str,
        action_subtype: str,
        callback: ActionHandlerCallback,
    ) -> None:
        """Subscribe an action handler to the event hub.

        Args:
            action_flavor: the flavor of the action to trigger.
            action_subtype: the subtype of the action to trigger.
            callback: the action to trigger when the trigger is activated.
        """
        self.action_handlers[(action_flavor, action_subtype)] = callback

    def unsubscribe_action_handler(
        self,
        action_flavor: str,
        action_subtype: str,
    ) -> None:
        """Unsubscribe an action handler from the event hub.

        Args:
            action_flavor: the flavor of the action to trigger.
            action_subtype: the subtype of the action to trigger.
        """
        self.action_handlers.pop((action_flavor, action_subtype), None)

    def trigger_action(
        self,
        event: BaseEvent,
        event_source: EventSourceResponse,
        trigger: TriggerResponse,
        action_callback: ActionHandlerCallback,
    ) -> None:
        """Trigger an action.

        Args:
            event: The event.
            event_source: The event source that produced the event.
            trigger: The trigger that was activated.
            action_callback: The action to trigger.
        """
        request = TriggerExecutionRequest(
            trigger=trigger.id, event_metadata=event.model_dump()
        )

        action_config = trigger.action.configuration

        trigger_execution = self.zen_store.create_trigger_execution(request)

        # Generate an API token that can be used by external workloads
        # implementing the action to authenticate with the server. This token
        # is associated with the service account configured for the trigger
        # and has a validity defined by the trigger's authentication window.
        token = JWTToken(
            user_id=trigger.action.service_account.id,
        )
        expires: Optional[datetime] = None
        if trigger.action.auth_window:
            expires = datetime.utcnow() + timedelta(
                minutes=trigger.action.auth_window
            )
        encoded_token = token.encode(expires=expires)
        auth_context = AuthContext(
            user=trigger.action.service_account,
            access_token=token,
            encoded_access_token=encoded_token,
        )

        try:
            # TODO: We need to make this async, as this might take quite some
            # time per trigger. We can either use threads starting here, or
            # use fastapi background tasks that get passed here instead of
            # running the event hub as a background tasks in the webhook
            # endpoints
            action_callback(
                action_config,
                trigger_execution,
                auth_context,
            )
        except Exception:
            # Don't let action errors stop the event hub from working
            logger.exception(
                f"An error occurred while executing trigger {trigger}."
            )

    @abstractmethod
    def activate_trigger(self, trigger: TriggerResponse) -> None:
        """Add a trigger to the event hub.

        Configure the event hub to trigger an action when an event is received.

        Args:
            trigger: the trigger to activate.
        """

    @abstractmethod
    def deactivate_trigger(self, trigger: TriggerResponse) -> None:
        """Remove a trigger from the event hub.

        Configure the event hub to stop triggering an action when an event is
        received.

        Args:
            trigger: the trigger to deactivate.
        """

    @abstractmethod
    def publish_event(
        self,
        event: BaseEvent,
        event_source: EventSourceResponse,
    ) -> None:
        """Publish an event to the event hub.

        Args:
            event: The event.
            event_source: The event source that produced the event.
        """
zen_store: SqlZenStore property readonly

Returns the active zen store.

Returns:

Type Description
SqlZenStore

The active zen store.

Exceptions:

Type Description
ValueError

If the active zen store is not a SQL zen store.

activate_trigger(self, trigger)

Add a trigger to the event hub.

Configure the event hub to trigger an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to activate.

required
Source code in zenml/event_hub/base_event_hub.py
@abstractmethod
def activate_trigger(self, trigger: TriggerResponse) -> None:
    """Add a trigger to the event hub.

    Configure the event hub to trigger an action when an event is received.

    Args:
        trigger: the trigger to activate.
    """
deactivate_trigger(self, trigger)

Remove a trigger from the event hub.

Configure the event hub to stop triggering an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to deactivate.

required
Source code in zenml/event_hub/base_event_hub.py
@abstractmethod
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
    """Remove a trigger from the event hub.

    Configure the event hub to stop triggering an action when an event is
    received.

    Args:
        trigger: the trigger to deactivate.
    """
publish_event(self, event, event_source)

Publish an event to the event hub.

Parameters:

Name Type Description Default
event BaseEvent

The event.

required
event_source EventSourceResponse

The event source that produced the event.

required
Source code in zenml/event_hub/base_event_hub.py
@abstractmethod
def publish_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> None:
    """Publish an event to the event hub.

    Args:
        event: The event.
        event_source: The event source that produced the event.
    """
subscribe_action_handler(self, action_flavor, action_subtype, callback)

Subscribe an action handler to the event hub.

Parameters:

Name Type Description Default
action_flavor str

the flavor of the action to trigger.

required
action_subtype str

the subtype of the action to trigger.

required
callback Callable[[Dict[str, Any], zenml.models.v2.core.trigger_execution.TriggerExecutionResponse, zenml.zen_server.auth.AuthContext], NoneType]

the action to trigger when the trigger is activated.

required
Source code in zenml/event_hub/base_event_hub.py
def subscribe_action_handler(
    self,
    action_flavor: str,
    action_subtype: str,
    callback: ActionHandlerCallback,
) -> None:
    """Subscribe an action handler to the event hub.

    Args:
        action_flavor: the flavor of the action to trigger.
        action_subtype: the subtype of the action to trigger.
        callback: the action to trigger when the trigger is activated.
    """
    self.action_handlers[(action_flavor, action_subtype)] = callback
trigger_action(self, event, event_source, trigger, action_callback)

Trigger an action.

Parameters:

Name Type Description Default
event BaseEvent

The event.

required
event_source EventSourceResponse

The event source that produced the event.

required
trigger TriggerResponse

The trigger that was activated.

required
action_callback Callable[[Dict[str, Any], zenml.models.v2.core.trigger_execution.TriggerExecutionResponse, zenml.zen_server.auth.AuthContext], NoneType]

The action to trigger.

required
Source code in zenml/event_hub/base_event_hub.py
def trigger_action(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
    trigger: TriggerResponse,
    action_callback: ActionHandlerCallback,
) -> None:
    """Trigger an action.

    Args:
        event: The event.
        event_source: The event source that produced the event.
        trigger: The trigger that was activated.
        action_callback: The action to trigger.
    """
    request = TriggerExecutionRequest(
        trigger=trigger.id, event_metadata=event.model_dump()
    )

    action_config = trigger.action.configuration

    trigger_execution = self.zen_store.create_trigger_execution(request)

    # Generate an API token that can be used by external workloads
    # implementing the action to authenticate with the server. This token
    # is associated with the service account configured for the trigger
    # and has a validity defined by the trigger's authentication window.
    token = JWTToken(
        user_id=trigger.action.service_account.id,
    )
    expires: Optional[datetime] = None
    if trigger.action.auth_window:
        expires = datetime.utcnow() + timedelta(
            minutes=trigger.action.auth_window
        )
    encoded_token = token.encode(expires=expires)
    auth_context = AuthContext(
        user=trigger.action.service_account,
        access_token=token,
        encoded_access_token=encoded_token,
    )

    try:
        # TODO: We need to make this async, as this might take quite some
        # time per trigger. We can either use threads starting here, or
        # use fastapi background tasks that get passed here instead of
        # running the event hub as a background tasks in the webhook
        # endpoints
        action_callback(
            action_config,
            trigger_execution,
            auth_context,
        )
    except Exception:
        # Don't let action errors stop the event hub from working
        logger.exception(
            f"An error occurred while executing trigger {trigger}."
        )
unsubscribe_action_handler(self, action_flavor, action_subtype)

Unsubscribe an action handler from the event hub.

Parameters:

Name Type Description Default
action_flavor str

the flavor of the action to trigger.

required
action_subtype str

the subtype of the action to trigger.

required
Source code in zenml/event_hub/base_event_hub.py
def unsubscribe_action_handler(
    self,
    action_flavor: str,
    action_subtype: str,
) -> None:
    """Unsubscribe an action handler from the event hub.

    Args:
        action_flavor: the flavor of the action to trigger.
        action_subtype: the subtype of the action to trigger.
    """
    self.action_handlers.pop((action_flavor, action_subtype), None)

event_hub

Base class for all the Event Hub.

InternalEventHub (BaseEventHub)

Internal in-server event hub implementation.

The internal in-server event hub uses the database as a source of truth for configured triggers and triggers actions by calling the action handlers directly.

Source code in zenml/event_hub/event_hub.py
class InternalEventHub(BaseEventHub):
    """Internal in-server event hub implementation.

    The internal in-server event hub uses the database as a source of truth for
    configured triggers and triggers actions by calling the action handlers
    directly.
    """

    def activate_trigger(self, trigger: TriggerResponse) -> None:
        """Add a trigger to the event hub.

        Configure the event hub to trigger an action when an event is received.

        Args:
            trigger: the trigger to activate.
        """
        # We don't need to do anything here to change the event hub
        # configuration. The in-server event hub already uses the database
        # as the source of truth regarding configured active triggers.

    def deactivate_trigger(self, trigger: TriggerResponse) -> None:
        """Remove a trigger from the event hub.

        Configure the event hub to stop triggering an action when an event is
        received.

        Args:
            trigger: the trigger to deactivate.
        """
        # We don't need to do anything here to change the event hub
        # configuration. The in-server event hub already uses the database
        # as the source of truth regarding configured active triggers.

    def publish_event(
        self,
        event: BaseEvent,
        event_source: EventSourceResponse,
    ) -> None:
        """Process an incoming event and trigger all configured actions.

        This will first check for any subscribers/triggers for this event,
        then log the event for later reference and finally perform the
        configured action(s).

        Args:
            event: The event.
            event_source: The event source that produced the event.
        """
        triggers = self.get_matching_active_triggers_for_event(
            event=event, event_source=event_source
        )

        for trigger in triggers:
            action_callback = self.action_handlers.get(
                (trigger.action_flavor, trigger.action_subtype)
            )
            if not action_callback:
                logger.error(
                    f"The event hub could not deliver event to action handler "
                    f"for flavor {trigger.action_flavor} and subtype "
                    f"{trigger.action_subtype} because no handler was found."
                )
                continue

            self.trigger_action(
                event=event,
                event_source=event_source,
                trigger=trigger,
                action_callback=action_callback,
            )

    def get_matching_active_triggers_for_event(
        self,
        event: BaseEvent,
        event_source: EventSourceResponse,
    ) -> List[TriggerResponse]:
        """Get all triggers that match an incoming event.

        Args:
            event: The inbound event.
            event_source: The event source which emitted the event.

        Returns:
            The list of matching triggers.
        """
        # get all event sources configured for this flavor
        triggers: List[TriggerResponse] = depaginate(
            partial(
                self.zen_store.list_triggers,
                trigger_filter_model=TriggerFilter(
                    event_source_id=event_source.id, is_active=True
                ),
                hydrate=True,
            )
        )

        trigger_list: List[TriggerResponse] = []

        for trigger in triggers:
            # For now, the matching of trigger filters vs event is implemented
            # in each filter class. This is not ideal and should be refactored
            # to a more generic solution that doesn't require the plugin
            # implementation to be imported here.
            try:
                plugin_flavor = plugin_flavor_registry().get_flavor_class(
                    name=event_source.flavor,
                    _type=PluginType.EVENT_SOURCE,
                    subtype=event_source.plugin_subtype,
                )
            except KeyError:
                logger.exception(
                    f"Could not find plugin flavor for event source "
                    f"{event_source.id} and flavor {event_source.flavor}. "
                    f"Skipping trigger {trigger.id}."
                )
                continue

            assert issubclass(plugin_flavor, BaseEventSourceFlavor)

            event_filter_config_class = plugin_flavor.EVENT_FILTER_CONFIG_CLASS
            try:
                event_filter = event_filter_config_class(
                    **trigger.event_filter if trigger.event_filter else {}
                )
            except ValidationError:
                logger.exception(
                    f"Could not instantiate event filter config class for "
                    f"event source {event_source.id}. Skipping trigger "
                    f"{trigger.id}."
                )
                continue

            if event_filter.event_matches_filter(event=event):
                trigger_list.append(trigger)

        logger.debug(
            f"For event {event} and event source {event_source}, "
            f"the following triggers matched: {trigger_list}"
        )

        return trigger_list
activate_trigger(self, trigger)

Add a trigger to the event hub.

Configure the event hub to trigger an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to activate.

required
Source code in zenml/event_hub/event_hub.py
def activate_trigger(self, trigger: TriggerResponse) -> None:
    """Add a trigger to the event hub.

    Configure the event hub to trigger an action when an event is received.

    Args:
        trigger: the trigger to activate.
    """
    # We don't need to do anything here to change the event hub
    # configuration. The in-server event hub already uses the database
    # as the source of truth regarding configured active triggers.
deactivate_trigger(self, trigger)

Remove a trigger from the event hub.

Configure the event hub to stop triggering an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to deactivate.

required
Source code in zenml/event_hub/event_hub.py
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
    """Remove a trigger from the event hub.

    Configure the event hub to stop triggering an action when an event is
    received.

    Args:
        trigger: the trigger to deactivate.
    """
    # We don't need to do anything here to change the event hub
    # configuration. The in-server event hub already uses the database
    # as the source of truth regarding configured active triggers.
get_matching_active_triggers_for_event(self, event, event_source)

Get all triggers that match an incoming event.

Parameters:

Name Type Description Default
event BaseEvent

The inbound event.

required
event_source EventSourceResponse

The event source which emitted the event.

required

Returns:

Type Description
List[zenml.models.v2.core.trigger.TriggerResponse]

The list of matching triggers.

Source code in zenml/event_hub/event_hub.py
def get_matching_active_triggers_for_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> List[TriggerResponse]:
    """Get all triggers that match an incoming event.

    Args:
        event: The inbound event.
        event_source: The event source which emitted the event.

    Returns:
        The list of matching triggers.
    """
    # get all event sources configured for this flavor
    triggers: List[TriggerResponse] = depaginate(
        partial(
            self.zen_store.list_triggers,
            trigger_filter_model=TriggerFilter(
                event_source_id=event_source.id, is_active=True
            ),
            hydrate=True,
        )
    )

    trigger_list: List[TriggerResponse] = []

    for trigger in triggers:
        # For now, the matching of trigger filters vs event is implemented
        # in each filter class. This is not ideal and should be refactored
        # to a more generic solution that doesn't require the plugin
        # implementation to be imported here.
        try:
            plugin_flavor = plugin_flavor_registry().get_flavor_class(
                name=event_source.flavor,
                _type=PluginType.EVENT_SOURCE,
                subtype=event_source.plugin_subtype,
            )
        except KeyError:
            logger.exception(
                f"Could not find plugin flavor for event source "
                f"{event_source.id} and flavor {event_source.flavor}. "
                f"Skipping trigger {trigger.id}."
            )
            continue

        assert issubclass(plugin_flavor, BaseEventSourceFlavor)

        event_filter_config_class = plugin_flavor.EVENT_FILTER_CONFIG_CLASS
        try:
            event_filter = event_filter_config_class(
                **trigger.event_filter if trigger.event_filter else {}
            )
        except ValidationError:
            logger.exception(
                f"Could not instantiate event filter config class for "
                f"event source {event_source.id}. Skipping trigger "
                f"{trigger.id}."
            )
            continue

        if event_filter.event_matches_filter(event=event):
            trigger_list.append(trigger)

    logger.debug(
        f"For event {event} and event source {event_source}, "
        f"the following triggers matched: {trigger_list}"
    )

    return trigger_list
publish_event(self, event, event_source)

Process an incoming event and trigger all configured actions.

This will first check for any subscribers/triggers for this event, then log the event for later reference and finally perform the configured action(s).

Parameters:

Name Type Description Default
event BaseEvent

The event.

required
event_source EventSourceResponse

The event source that produced the event.

required
Source code in zenml/event_hub/event_hub.py
def publish_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> None:
    """Process an incoming event and trigger all configured actions.

    This will first check for any subscribers/triggers for this event,
    then log the event for later reference and finally perform the
    configured action(s).

    Args:
        event: The event.
        event_source: The event source that produced the event.
    """
    triggers = self.get_matching_active_triggers_for_event(
        event=event, event_source=event_source
    )

    for trigger in triggers:
        action_callback = self.action_handlers.get(
            (trigger.action_flavor, trigger.action_subtype)
        )
        if not action_callback:
            logger.error(
                f"The event hub could not deliver event to action handler "
                f"for flavor {trigger.action_flavor} and subtype "
                f"{trigger.action_subtype} because no handler was found."
            )
            continue

        self.trigger_action(
            event=event,
            event_source=event_source,
            trigger=trigger,
            action_callback=action_callback,
        )