Skip to content

Event Sources

zenml.event_sources special

Base Classes for Event Sources.

base_event

Base implementation for events.

BaseEvent (BaseModel)

Base class for all inbound events.

Source code in zenml/event_sources/base_event.py
class BaseEvent(BaseModel):
    """Base class for all inbound events."""

base_event_source

Base implementation for event sources.

BaseEventSourceFlavor (BasePluginFlavor, ABC)

Base Event Plugin Flavor to access an event plugin along with its configurations.

Source code in zenml/event_sources/base_event_source.py
class BaseEventSourceFlavor(BasePluginFlavor, ABC):
    """Base Event Plugin Flavor to access an event plugin along with its configurations."""

    TYPE: ClassVar[PluginType] = PluginType.EVENT_SOURCE

    # EventPlugin specific
    EVENT_SOURCE_CONFIG_CLASS: ClassVar[Type[EventSourceConfig]]
    EVENT_FILTER_CONFIG_CLASS: ClassVar[Type[EventFilterConfig]]

    @classmethod
    def get_event_filter_config_schema(cls) -> Dict[str, Any]:
        """The config schema for a flavor.

        Returns:
            The config schema.
        """
        return cls.EVENT_SOURCE_CONFIG_CLASS.model_json_schema()

    @classmethod
    def get_event_source_config_schema(cls) -> Dict[str, Any]:
        """The config schema for a flavor.

        Returns:
            The config schema.
        """
        return cls.EVENT_FILTER_CONFIG_CLASS.model_json_schema()

    @classmethod
    def get_flavor_response_model(
        cls, hydrate: bool
    ) -> EventSourceFlavorResponse:
        """Convert the Flavor into a Response Model.

        Args:
            hydrate: Whether the model should be hydrated.

        Returns:
            The Flavor Response model for the Event Source implementation
        """
        metadata = None
        if hydrate:
            metadata = EventSourceFlavorResponseMetadata(
                source_config_schema=cls.get_event_source_config_schema(),
                filter_config_schema=cls.get_event_filter_config_schema(),
            )
        return EventSourceFlavorResponse(
            body=EventSourceFlavorResponseBody(),
            metadata=metadata,
            name=cls.FLAVOR,
            type=cls.TYPE,
            subtype=cls.SUBTYPE,
        )
get_event_filter_config_schema() classmethod

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

Source code in zenml/event_sources/base_event_source.py
@classmethod
def get_event_filter_config_schema(cls) -> Dict[str, Any]:
    """The config schema for a flavor.

    Returns:
        The config schema.
    """
    return cls.EVENT_SOURCE_CONFIG_CLASS.model_json_schema()
get_event_source_config_schema() classmethod

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

Source code in zenml/event_sources/base_event_source.py
@classmethod
def get_event_source_config_schema(cls) -> Dict[str, Any]:
    """The config schema for a flavor.

    Returns:
        The config schema.
    """
    return cls.EVENT_FILTER_CONFIG_CLASS.model_json_schema()
get_flavor_response_model(hydrate) classmethod

Convert the Flavor into a Response Model.

Parameters:

Name Type Description Default
hydrate bool

Whether the model should be hydrated.

required

Returns:

Type Description
EventSourceFlavorResponse

The Flavor Response model for the Event Source implementation

Source code in zenml/event_sources/base_event_source.py
@classmethod
def get_flavor_response_model(
    cls, hydrate: bool
) -> EventSourceFlavorResponse:
    """Convert the Flavor into a Response Model.

    Args:
        hydrate: Whether the model should be hydrated.

    Returns:
        The Flavor Response model for the Event Source implementation
    """
    metadata = None
    if hydrate:
        metadata = EventSourceFlavorResponseMetadata(
            source_config_schema=cls.get_event_source_config_schema(),
            filter_config_schema=cls.get_event_filter_config_schema(),
        )
    return EventSourceFlavorResponse(
        body=EventSourceFlavorResponseBody(),
        metadata=metadata,
        name=cls.FLAVOR,
        type=cls.TYPE,
        subtype=cls.SUBTYPE,
    )

BaseEventSourceHandler (BasePlugin, ABC)

Base event source handler implementation.

This class provides a base implementation for event source handlers.

The base event source handler acts as an intermediary between the REST API endpoints and the ZenML store for all operations related to event sources. It implements all operations related creating, updating, deleting, and fetching event sources from the database. It also provides a set of methods that can be overridden by concrete event source handlers that need to react to or participate in the lifecycle of an event source:

  • _validate_event_source_request: validate and/or modify an event source before it is created (before it is persisted in the database)
  • _process_event_source_request: react to a new event source being created (after it is persisted in the database)
  • _validate_event_source_update: validate and/or modify an event source update before it is applied (before the update is saved in the database)
  • _process_event_source_update: react to an event source being updated (after the update is saved in the database)
  • _process_event_source_delete: react to an event source being deleted (before it is deleted from the database)
  • _process_event_source_response: modify an event source before it is returned to the user

In addition to optionally overriding these methods, every event source handler must define and provide configuration classes for the event source and the event filter. These are used to validate and instantiate the configuration from the user requests.

Finally, since event source handlers are also sources of events, the base class provides methods that implementations can use to dispatch events to the central event hub where they can be processed and eventually trigger actions.

Source code in zenml/event_sources/base_event_source.py
class BaseEventSourceHandler(BasePlugin, ABC):
    """Base event source handler implementation.

    This class provides a base implementation for event source handlers.

    The base event source handler acts as an intermediary between the REST API
    endpoints and the ZenML store for all operations related to event sources.
    It implements all operations related creating, updating, deleting, and
    fetching event sources from the database. It also provides a set of methods
    that can be overridden by concrete event source handlers that need to
    react to or participate in the lifecycle of an event source:

    * `_validate_event_source_request`: validate and/or modify an event source
    before it is created (before it is persisted in the database)
    * `_process_event_source_request`: react to a new event source being created
    (after it is persisted in the database)
    * `_validate_event_source_update`: validate and/or modify an event source
    update before it is applied (before the update is saved in the database)
    * `_process_event_source_update`: react to an event source being updated
    (after the update is saved in the database)
    * `_process_event_source_delete`: react to an event source being deleted
    (before it is deleted from the database)
    * `_process_event_source_response`: modify an event source before it is
    returned to the user

    In addition to optionally overriding these methods, every event source
    handler must define and provide configuration classes for the event source
    and the event filter. These are used to validate and instantiate the
    configuration from the user requests.

    Finally, since event source handlers are also sources of events, the base
    class provides methods that implementations can use to dispatch events to
    the central event hub where they can be processed and eventually trigger
    actions.
    """

    _event_hub: Optional[BaseEventHub] = None

    def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
        """Event source handler initialization.

        Args:
            event_hub: Optional event hub to use to initialize the event source
                handler. If not set during initialization, it may be set
                at a later time by calling `set_event_hub`. An event hub must
                be configured before the event handler needs to dispatch events.
        """
        super().__init__()
        if event_hub is None:
            from zenml.event_hub.event_hub import (
                event_hub as default_event_hub,
            )

            # TODO: for now, we use the default internal event hub. In
            # the future, this should be configurable.
            event_hub = default_event_hub

        self.set_event_hub(event_hub)

    @property
    @abstractmethod
    def config_class(self) -> Type[EventSourceConfig]:
        """Returns the event source configuration class.

        Returns:
            The configuration.
        """

    @property
    @abstractmethod
    def filter_class(self) -> Type[EventFilterConfig]:
        """Returns the event filter configuration class.

        Returns:
            The event filter configuration class.
        """

    @property
    @abstractmethod
    def flavor_class(self) -> "Type[BaseEventSourceFlavor]":
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """

    @property
    def event_hub(self) -> BaseEventHub:
        """Get the event hub used to dispatch events.

        Returns:
            The event hub.

        Raises:
            RuntimeError: if an event hub isn't configured.
        """
        if self._event_hub is None:
            raise RuntimeError(
                f"An event hub is not configured for the "
                f"{self.flavor_class.FLAVOR} {self.flavor_class.SUBTYPE} "
                f"event source handler."
            )

        return self._event_hub

    def set_event_hub(self, event_hub: BaseEventHub) -> None:
        """Configure an event hub for this event source plugin.

        Args:
            event_hub: Event hub to be used by this event handler to dispatch
                events.
        """
        self._event_hub = event_hub

    def create_event_source(
        self, event_source: EventSourceRequest
    ) -> EventSourceResponse:
        """Process an event source request and create the event source in the database.

        Args:
            event_source: Event source request.

        Returns:
            The created event source.

        # noqa: DAR401
        """
        # Validate and instantiate the configuration from the request
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        # Call the implementation specific method to validate the request
        # before it is sent to the database
        self._validate_event_source_request(
            event_source=event_source, config=config
        )
        # Serialize the configuration back into the request
        event_source.configuration = config.model_dump(exclude_none=True)
        # Create the event source in the database
        event_source_response = self.zen_store.create_event_source(
            event_source=event_source
        )
        try:
            # Instantiate the configuration from the response
            config = self.validate_event_source_configuration(
                event_source.configuration
            )
            # Call the implementation specific method to process the created
            # event source
            self._process_event_source_request(
                event_source=event_source_response, config=config
            )
        except Exception:
            # If the event source creation fails, delete the event source from
            # the database
            logger.exception(
                f"Failed to create event source {event_source_response}. "
                f"Deleting the event source."
            )
            self.zen_store.delete_event_source(
                event_source_id=event_source_response.id
            )
            raise

        # Serialize the configuration back into the response
        event_source_response.set_configuration(
            config.model_dump(exclude_none=True)
        )

        # Return the response to the user
        return event_source_response

    def update_event_source(
        self,
        event_source: EventSourceResponse,
        event_source_update: EventSourceUpdate,
    ) -> EventSourceResponse:
        """Process an event source update request and update the event source in the database.

        Args:
            event_source: The event source to update.
            event_source_update: The update to be applied to the event source.

        Returns:
            The updated event source.

        # noqa: DAR401
        """
        # Validate and instantiate the configuration from the original event
        # source
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        # Validate and instantiate the configuration from the update request
        # NOTE: if supplied, the configuration update is a full replacement
        # of the original configuration
        config_update = config
        if event_source_update.configuration is not None:
            config_update = self.validate_event_source_configuration(
                event_source_update.configuration
            )
        # Call the implementation specific method to validate the update request
        # before it is sent to the database
        self._validate_event_source_update(
            event_source=event_source,
            config=config,
            event_source_update=event_source_update,
            config_update=config_update,
        )
        # Serialize the configuration update back into the update request
        event_source_update.configuration = config_update.model_dump(
            exclude_none=True
        )

        # Update the event source in the database
        event_source_response = self.zen_store.update_event_source(
            event_source_id=event_source.id,
            event_source_update=event_source_update,
        )
        try:
            # Instantiate the configuration from the response
            response_config = self.validate_event_source_configuration(
                event_source_response.configuration
            )
            # Call the implementation specific method to process the update
            # request before it is sent to the database
            self._process_event_source_update(
                event_source=event_source_response,
                config=response_config,
                previous_event_source=event_source,
                previous_config=config,
            )
        except Exception:
            # If the event source update fails, roll back the event source in
            # the database to the original state
            logger.exception(
                f"Failed to update event source {event_source}. "
                f"Rolling back the event source to the previous state."
            )
            self.zen_store.update_event_source(
                event_source_id=event_source.id,
                event_source_update=EventSourceUpdate.from_response(
                    event_source
                ),
            )
            raise

        # Serialize the configuration back into the response
        event_source_response.set_configuration(
            response_config.model_dump(exclude_none=True)
        )
        # Return the response to the user
        return event_source_response

    def delete_event_source(
        self,
        event_source: EventSourceResponse,
        force: bool = False,
    ) -> None:
        """Process an event source delete request and delete the event source in the database.

        Args:
            event_source: The event source to delete.
            force: Whether to force delete the event source from the database
                even if the event source handler fails to delete the event
                source.

        # noqa: DAR401
        """
        # Validate and instantiate the configuration from the original event
        # source
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        try:
            # Call the implementation specific method to process the deleted
            # event source before it is deleted from the database
            self._process_event_source_delete(
                event_source=event_source,
                config=config,
                force=force,
            )
        except Exception:
            logger.exception(f"Failed to delete event source {event_source}. ")
            if not force:
                raise

            logger.warning(f"Force deleting event source {event_source}.")

        # Delete the event source from the database
        self.zen_store.delete_event_source(
            event_source_id=event_source.id,
        )

    def get_event_source(
        self, event_source: EventSourceResponse, hydrate: bool = False
    ) -> EventSourceResponse:
        """Process an event source response before it is returned to the user.

        Args:
            event_source: The event source fetched from the database.
            hydrate: Whether to hydrate the event source.

        Returns:
            The event source.
        """
        if hydrate:
            # Instantiate the configuration from the response
            config = self.validate_event_source_configuration(
                event_source.configuration
            )
            # Call the implementation specific method to process the response
            self._process_event_source_response(
                event_source=event_source, config=config
            )
            # Serialize the configuration back into the response
            event_source.set_configuration(
                config.model_dump(exclude_none=True)
            )

        # Return the response to the user
        return event_source

    def validate_event_source_configuration(
        self, event_source_config: Dict[str, Any]
    ) -> EventSourceConfig:
        """Validate and return the event source configuration.

        Args:
            event_source_config: The event source configuration to validate.

        Returns:
            The validated event source configuration.

        Raises:
            ValueError: if the configuration is invalid.
        """
        try:
            return self.config_class(**event_source_config)
        except ValueError as e:
            raise ValueError(
                f"Invalid configuration for event source: {e}."
            ) from e

    def validate_event_filter_configuration(
        self,
        configuration: Dict[str, Any],
    ) -> EventFilterConfig:
        """Validate and return the configuration of an event filter.

        Args:
            configuration: The configuration to validate.

        Returns:
            Instantiated event filter config.

        Raises:
            ValueError: if the configuration is invalid.
        """
        try:
            return self.filter_class(**configuration)
        except ValueError as e:
            raise ValueError(
                f"Invalid configuration for event filter: {e}."
            ) from e

    def dispatch_event(
        self,
        event: BaseEvent,
        event_source: EventSourceResponse,
    ) -> None:
        """Dispatch an event to all active triggers that match the event.

        Args:
            event: The event to dispatch.
            event_source: The event source that produced the event.
        """
        if not event_source.is_active:
            logger.debug(
                f"Event source {event_source.id} is not active. Skipping event "
                f"dispatch."
            )
            return

        self.event_hub.publish_event(
            event=event,
            event_source=event_source,
        )

    def _validate_event_source_request(
        self, event_source: EventSourceRequest, config: EventSourceConfig
    ) -> None:
        """Validate an event source request before it is created in the database.

        Concrete event source handlers should override this method to add
        implementation specific functionality pertaining to the validation of
        a new event source. The implementation may also modify the event source
        request and/or configuration in place to apply implementation specific
        changes before the request is stored in the database.

        If validation is required, the implementation should raise a
        ValueError if the configuration is invalid. If any exceptions are raised
        during the validation, the event source will not be created in the
        database.

        The resulted configuration is serialized back into the event source
        request before it is stored in the database.

        The implementation should not use this method to provision any external
        resources, as the event source may not be created in the database if
        the database level validation fails.

        Args:
            event_source: Event source request.
            config: Event source configuration instantiated from the request.
        """
        pass

    def _process_event_source_request(
        self, event_source: EventSourceResponse, config: EventSourceConfig
    ) -> None:
        """Process an event source request after it is created in the database.

        Concrete event source handlers should override this method to add
        implementation specific functionality pertaining to the creation of
        a new event source. The implementation may also modify the event source
        response and/or configuration in place to apply implementation specific
        changes before the response is returned to the user.

        The resulted configuration is serialized back into the event source
        response before it is returned to the user.

        The implementation should use this method to provision any external
        resources required for the event source. If any of the provisioning
        fails, the implementation should raise an exception and the event source
        will be deleted from the database.

        Args:
            event_source: Newly created event source
            config: Event source configuration instantiated from the response.
        """
        pass

    def _validate_event_source_update(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        event_source_update: EventSourceUpdate,
        config_update: EventSourceConfig,
    ) -> None:
        """Validate an event source update before it is reflected in the database.

        Concrete event source handlers should override this method to add
        implementation specific functionality pertaining to validation of an
        event source update request. The implementation may also modify the
        event source update and/or configuration update in place to apply
        implementation specific changes.

        If validation is required, the implementation should raise a
        ValueError if the configuration update is invalid. If any exceptions are
        raised during the validation, the event source will not be updated in
        the database.

        The resulted configuration update is serialized back into the event
        source update before it is stored in the database.

        The implementation should not use this method to provision any external
        resources, as the event source may not be updated in the database if
        the database level validation fails.

        Args:
            event_source: Original event source before the update.
            config: Event source configuration instantiated from the original
                event source.
            event_source_update: Event source update request.
            config_update: Event source configuration instantiated from the
                updated event source.
        """
        pass

    def _process_event_source_update(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        previous_event_source: EventSourceResponse,
        previous_config: EventSourceConfig,
    ) -> None:
        """Process an event source after it is updated in the database.

        Concrete event source handlers should override this method to add
        implementation specific functionality pertaining to updating an existing
        event source. The implementation may also modify the event source
        and/or configuration in place to apply implementation specific
        changes before the response is returned to the user.

        The resulted configuration is serialized back into the event source
        response before it is returned to the user.

        The implementation should use this method to provision any external
        resources required for the event source update. If any of the
        provisioning fails, the implementation should raise an exception and the
        event source will be rolled back to the previous state in the database.

        Args:
            event_source: Event source after the update.
            config: Event source configuration instantiated from the updated
                event source.
            previous_event_source: Original event source before the update.
            previous_config: Event source configuration instantiated from the
                original event source.
        """
        pass

    def _process_event_source_delete(
        self,
        event_source: EventSourceResponse,
        config: EventSourceConfig,
        force: Optional[bool] = False,
    ) -> None:
        """Process an event source before it is deleted from the database.

        Concrete event source handlers should override this method to add
        implementation specific functionality pertaining to the deletion of an
        event source.

        The implementation should use this method to deprovision any external
        resources required for the event source. If any of the deprovisioning
        fails, the implementation should raise an exception and the
        event source will kept in the database (unless force is True, in which
        case the event source will be deleted from the database regardless of
        any deprovisioning failures).

        Args:
            event_source: Event source before the deletion.
            config: Validated instantiated event source configuration before
                the deletion.
            force: Whether to force deprovision the event source.
        """
        pass

    def _process_event_source_response(
        self, event_source: EventSourceResponse, config: EventSourceConfig
    ) -> None:
        """Process an event source response before it is returned to the user.

        Concrete event source handlers should override this method to add
        implementation specific functionality pertaining to how the event source
        response is returned to the user. The implementation may modify the
        the event source response and/or configuration in place.

        The resulted configuration is serialized back into the event source
        response before it is returned to the user.

        This method is applied to all event source responses fetched from the
        database before they are returned to the user, with the exception of
        those returned from the `create_event_source`, `update_event_source`,
        and `delete_event_source` methods, which have their own specific
        processing methods.

        Args:
            event_source: Event source response.
            config: Event source configuration instantiated from the response.
        """
        pass
config_class: Type[zenml.event_sources.base_event_source.EventSourceConfig] property readonly

Returns the event source configuration class.

Returns:

Type Description
Type[zenml.event_sources.base_event_source.EventSourceConfig]

The configuration.

event_hub: BaseEventHub property readonly

Get the event hub used to dispatch events.

Returns:

Type Description
BaseEventHub

The event hub.

Exceptions:

Type Description
RuntimeError

if an event hub isn't configured.

filter_class: Type[zenml.event_sources.base_event_source.EventFilterConfig] property readonly

Returns the event filter configuration class.

Returns:

Type Description
Type[zenml.event_sources.base_event_source.EventFilterConfig]

The event filter configuration class.

flavor_class: Type[BaseEventSourceFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BaseEventSourceFlavor]

The flavor class of the plugin.

__init__(self, event_hub=None) special

Event source handler initialization.

Parameters:

Name Type Description Default
event_hub Optional[zenml.event_hub.base_event_hub.BaseEventHub]

Optional event hub to use to initialize the event source handler. If not set during initialization, it may be set at a later time by calling set_event_hub. An event hub must be configured before the event handler needs to dispatch events.

None
Source code in zenml/event_sources/base_event_source.py
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
    """Event source handler initialization.

    Args:
        event_hub: Optional event hub to use to initialize the event source
            handler. If not set during initialization, it may be set
            at a later time by calling `set_event_hub`. An event hub must
            be configured before the event handler needs to dispatch events.
    """
    super().__init__()
    if event_hub is None:
        from zenml.event_hub.event_hub import (
            event_hub as default_event_hub,
        )

        # TODO: for now, we use the default internal event hub. In
        # the future, this should be configurable.
        event_hub = default_event_hub

    self.set_event_hub(event_hub)
create_event_source(self, event_source)

Process an event source request and create the event source in the database.

Parameters:

Name Type Description Default
event_source EventSourceRequest

Event source request.

required

Returns:

Type Description
EventSourceResponse

The created event source.

noqa: DAR401
Source code in zenml/event_sources/base_event_source.py
def create_event_source(
    self, event_source: EventSourceRequest
) -> EventSourceResponse:
    """Process an event source request and create the event source in the database.

    Args:
        event_source: Event source request.

    Returns:
        The created event source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the request
    config = self.validate_event_source_configuration(
        event_source.configuration
    )
    # Call the implementation specific method to validate the request
    # before it is sent to the database
    self._validate_event_source_request(
        event_source=event_source, config=config
    )
    # Serialize the configuration back into the request
    event_source.configuration = config.model_dump(exclude_none=True)
    # Create the event source in the database
    event_source_response = self.zen_store.create_event_source(
        event_source=event_source
    )
    try:
        # Instantiate the configuration from the response
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        # Call the implementation specific method to process the created
        # event source
        self._process_event_source_request(
            event_source=event_source_response, config=config
        )
    except Exception:
        # If the event source creation fails, delete the event source from
        # the database
        logger.exception(
            f"Failed to create event source {event_source_response}. "
            f"Deleting the event source."
        )
        self.zen_store.delete_event_source(
            event_source_id=event_source_response.id
        )
        raise

    # Serialize the configuration back into the response
    event_source_response.set_configuration(
        config.model_dump(exclude_none=True)
    )

    # Return the response to the user
    return event_source_response
delete_event_source(self, event_source, force=False)

Process an event source delete request and delete the event source in the database.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source to delete.

required
force bool

Whether to force delete the event source from the database even if the event source handler fails to delete the event source.

False
noqa: DAR401
Source code in zenml/event_sources/base_event_source.py
def delete_event_source(
    self,
    event_source: EventSourceResponse,
    force: bool = False,
) -> None:
    """Process an event source delete request and delete the event source in the database.

    Args:
        event_source: The event source to delete.
        force: Whether to force delete the event source from the database
            even if the event source handler fails to delete the event
            source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_event_source_configuration(
        event_source.configuration
    )
    try:
        # Call the implementation specific method to process the deleted
        # event source before it is deleted from the database
        self._process_event_source_delete(
            event_source=event_source,
            config=config,
            force=force,
        )
    except Exception:
        logger.exception(f"Failed to delete event source {event_source}. ")
        if not force:
            raise

        logger.warning(f"Force deleting event source {event_source}.")

    # Delete the event source from the database
    self.zen_store.delete_event_source(
        event_source_id=event_source.id,
    )
dispatch_event(self, event, event_source)

Dispatch an event to all active triggers that match the event.

Parameters:

Name Type Description Default
event BaseEvent

The event to dispatch.

required
event_source EventSourceResponse

The event source that produced the event.

required
Source code in zenml/event_sources/base_event_source.py
def dispatch_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> None:
    """Dispatch an event to all active triggers that match the event.

    Args:
        event: The event to dispatch.
        event_source: The event source that produced the event.
    """
    if not event_source.is_active:
        logger.debug(
            f"Event source {event_source.id} is not active. Skipping event "
            f"dispatch."
        )
        return

    self.event_hub.publish_event(
        event=event,
        event_source=event_source,
    )
get_event_source(self, event_source, hydrate=False)

Process an event source response before it is returned to the user.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source fetched from the database.

required
hydrate bool

Whether to hydrate the event source.

False

Returns:

Type Description
EventSourceResponse

The event source.

Source code in zenml/event_sources/base_event_source.py
def get_event_source(
    self, event_source: EventSourceResponse, hydrate: bool = False
) -> EventSourceResponse:
    """Process an event source response before it is returned to the user.

    Args:
        event_source: The event source fetched from the database.
        hydrate: Whether to hydrate the event source.

    Returns:
        The event source.
    """
    if hydrate:
        # Instantiate the configuration from the response
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        # Call the implementation specific method to process the response
        self._process_event_source_response(
            event_source=event_source, config=config
        )
        # Serialize the configuration back into the response
        event_source.set_configuration(
            config.model_dump(exclude_none=True)
        )

    # Return the response to the user
    return event_source
set_event_hub(self, event_hub)

Configure an event hub for this event source plugin.

Parameters:

Name Type Description Default
event_hub BaseEventHub

Event hub to be used by this event handler to dispatch events.

required
Source code in zenml/event_sources/base_event_source.py
def set_event_hub(self, event_hub: BaseEventHub) -> None:
    """Configure an event hub for this event source plugin.

    Args:
        event_hub: Event hub to be used by this event handler to dispatch
            events.
    """
    self._event_hub = event_hub
update_event_source(self, event_source, event_source_update)

Process an event source update request and update the event source in the database.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source to update.

required
event_source_update EventSourceUpdate

The update to be applied to the event source.

required

Returns:

Type Description
EventSourceResponse

The updated event source.

noqa: DAR401
Source code in zenml/event_sources/base_event_source.py
def update_event_source(
    self,
    event_source: EventSourceResponse,
    event_source_update: EventSourceUpdate,
) -> EventSourceResponse:
    """Process an event source update request and update the event source in the database.

    Args:
        event_source: The event source to update.
        event_source_update: The update to be applied to the event source.

    Returns:
        The updated event source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_event_source_configuration(
        event_source.configuration
    )
    # Validate and instantiate the configuration from the update request
    # NOTE: if supplied, the configuration update is a full replacement
    # of the original configuration
    config_update = config
    if event_source_update.configuration is not None:
        config_update = self.validate_event_source_configuration(
            event_source_update.configuration
        )
    # Call the implementation specific method to validate the update request
    # before it is sent to the database
    self._validate_event_source_update(
        event_source=event_source,
        config=config,
        event_source_update=event_source_update,
        config_update=config_update,
    )
    # Serialize the configuration update back into the update request
    event_source_update.configuration = config_update.model_dump(
        exclude_none=True
    )

    # Update the event source in the database
    event_source_response = self.zen_store.update_event_source(
        event_source_id=event_source.id,
        event_source_update=event_source_update,
    )
    try:
        # Instantiate the configuration from the response
        response_config = self.validate_event_source_configuration(
            event_source_response.configuration
        )
        # Call the implementation specific method to process the update
        # request before it is sent to the database
        self._process_event_source_update(
            event_source=event_source_response,
            config=response_config,
            previous_event_source=event_source,
            previous_config=config,
        )
    except Exception:
        # If the event source update fails, roll back the event source in
        # the database to the original state
        logger.exception(
            f"Failed to update event source {event_source}. "
            f"Rolling back the event source to the previous state."
        )
        self.zen_store.update_event_source(
            event_source_id=event_source.id,
            event_source_update=EventSourceUpdate.from_response(
                event_source
            ),
        )
        raise

    # Serialize the configuration back into the response
    event_source_response.set_configuration(
        response_config.model_dump(exclude_none=True)
    )
    # Return the response to the user
    return event_source_response
validate_event_filter_configuration(self, configuration)

Validate and return the configuration of an event filter.

Parameters:

Name Type Description Default
configuration Dict[str, Any]

The configuration to validate.

required

Returns:

Type Description
EventFilterConfig

Instantiated event filter config.

Exceptions:

Type Description
ValueError

if the configuration is invalid.

Source code in zenml/event_sources/base_event_source.py
def validate_event_filter_configuration(
    self,
    configuration: Dict[str, Any],
) -> EventFilterConfig:
    """Validate and return the configuration of an event filter.

    Args:
        configuration: The configuration to validate.

    Returns:
        Instantiated event filter config.

    Raises:
        ValueError: if the configuration is invalid.
    """
    try:
        return self.filter_class(**configuration)
    except ValueError as e:
        raise ValueError(
            f"Invalid configuration for event filter: {e}."
        ) from e
validate_event_source_configuration(self, event_source_config)

Validate and return the event source configuration.

Parameters:

Name Type Description Default
event_source_config Dict[str, Any]

The event source configuration to validate.

required

Returns:

Type Description
EventSourceConfig

The validated event source configuration.

Exceptions:

Type Description
ValueError

if the configuration is invalid.

Source code in zenml/event_sources/base_event_source.py
def validate_event_source_configuration(
    self, event_source_config: Dict[str, Any]
) -> EventSourceConfig:
    """Validate and return the event source configuration.

    Args:
        event_source_config: The event source configuration to validate.

    Returns:
        The validated event source configuration.

    Raises:
        ValueError: if the configuration is invalid.
    """
    try:
        return self.config_class(**event_source_config)
    except ValueError as e:
        raise ValueError(
            f"Invalid configuration for event source: {e}."
        ) from e

EventFilterConfig (BaseModel, ABC)

The Event Filter configuration.

Source code in zenml/event_sources/base_event_source.py
class EventFilterConfig(BaseModel, ABC):
    """The Event Filter configuration."""

    @abstractmethod
    def event_matches_filter(self, event: BaseEvent) -> bool:
        """All implementations need to implement this check.

        If the filter matches the inbound event instance, this should
        return True, else False.

        Args:
            event: The inbound event instance.

        Returns:
            Whether the filter matches the event.
        """
event_matches_filter(self, event)

All implementations need to implement this check.

If the filter matches the inbound event instance, this should return True, else False.

Parameters:

Name Type Description Default
event BaseEvent

The inbound event instance.

required

Returns:

Type Description
bool

Whether the filter matches the event.

Source code in zenml/event_sources/base_event_source.py
@abstractmethod
def event_matches_filter(self, event: BaseEvent) -> bool:
    """All implementations need to implement this check.

    If the filter matches the inbound event instance, this should
    return True, else False.

    Args:
        event: The inbound event instance.

    Returns:
        Whether the filter matches the event.
    """

EventSourceConfig (BasePluginConfig)

The Event Source configuration.

Source code in zenml/event_sources/base_event_source.py
class EventSourceConfig(BasePluginConfig):
    """The Event Source configuration."""

webhooks special

Base Classes for Webhook Event Sources.

base_webhook_event_source

Abstract BaseEvent class that all Event implementations must implement.

BaseWebhookEvent (BaseEvent)

Base class for all inbound events.

Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class BaseWebhookEvent(BaseEvent):
    """Base class for all inbound events."""
BaseWebhookEventSourceFlavor (BaseEventSourceFlavor, ABC)

Base Event Plugin Flavor to access an event plugin along with its configurations.

Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class BaseWebhookEventSourceFlavor(BaseEventSourceFlavor, ABC):
    """Base Event Plugin Flavor to access an event plugin along with its configurations."""

    SUBTYPE: ClassVar[PluginSubType] = PluginSubType.WEBHOOK
BaseWebhookEventSourceHandler (BaseEventSourceHandler, ABC)

Base implementation for all Webhook event sources.

Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class BaseWebhookEventSourceHandler(BaseEventSourceHandler, ABC):
    """Base implementation for all Webhook event sources."""

    @property
    @abstractmethod
    def config_class(self) -> Type[WebhookEventSourceConfig]:
        """Returns the webhook event source configuration class.

        Returns:
            The configuration.
        """

    @property
    @abstractmethod
    def filter_class(self) -> Type[WebhookEventFilterConfig]:
        """Returns the webhook event filter configuration class.

        Returns:
            The event filter configuration class.
        """

    @property
    @abstractmethod
    def flavor_class(self) -> "Type[BaseWebhookEventSourceFlavor]":
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """

    def is_valid_signature(
        self, raw_body: bytes, secret_token: str, signature_header: str
    ) -> bool:
        """Verify the SHA256 signature of the payload.

        Args:
            raw_body: original request body to verify
            secret_token: secret token used to generate the signature
            signature_header: signature header to verify (x-hub-signature-256)

        Returns:
            Whether if the signature is valid.
        """
        hash_object = hmac.new(
            secret_token.encode("utf-8"),
            msg=raw_body,
            digestmod=hashlib.sha256,
        )
        expected_signature = "sha256=" + hash_object.hexdigest()

        if not hmac.compare_digest(expected_signature, signature_header):
            return False
        return True

    @abstractmethod
    def _interpret_event(self, event: Dict[str, Any]) -> BaseEvent:
        """Converts the generic event body into a event-source specific pydantic model.

        Args:
            event: The generic event body

        Return:
            An instance of the event source specific pydantic model.
        """

    @abstractmethod
    def _get_webhook_secret(
        self, event_source: EventSourceResponse
    ) -> Optional[str]:
        """Get the webhook secret for the event source.

        Inheriting classes should implement this method to retrieve the webhook
        secret associated with an event source. If a webhook secret is not
        applicable for the event source, this method should return None.

        Args:
            event_source: The event source to retrieve the secret for.

        Return:
            The webhook secret associated with the event source, or None if a
            secret is not applicable.
        """

    def _validate_webhook_event_signature(
        self, raw_body: bytes, headers: Dict[str, str], webhook_secret: str
    ) -> None:
        """Validate the signature of an incoming webhook event.

        Args:
            raw_body: The raw inbound webhook event.
            headers: The headers of the inbound webhook event.
            webhook_secret: The webhook secret to use for signature validation.

        Raises:
            AuthorizationException: If the signature validation fails.
        """
        signature_header = headers.get("x-hub-signature-256") or headers.get(
            "x-hub-signature"
        )
        if not signature_header:
            raise AuthorizationException(
                "x-hub-signature-256 or x-hub-signature header is missing!"
            )

        if not self.is_valid_signature(
            raw_body=raw_body,
            secret_token=webhook_secret,
            signature_header=signature_header,
        ):
            raise AuthorizationException(
                "Webhook signature verification failed!"
            )

    def _load_payload(
        self, raw_body: bytes, headers: Dict[str, str]
    ) -> Dict[Any, Any]:
        """Converts the raw body of the request into a python dictionary.

        Args:
            raw_body: The raw event body.
            headers: The request headers.

        Returns:
            An instance of the event source specific pydantic model.

        Raises:
            ValueError: In case the body can not be parsed.
        """
        # For now assume all webhook events are json encoded and parse
        # the body as such.
        try:
            body_dict: Dict[Any, Any] = json.loads(raw_body)
            return body_dict
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON body received: {e}")

    def process_webhook_event(
        self,
        event_source: EventSourceResponse,
        raw_body: bytes,
        headers: Dict[str, str],
    ) -> None:
        """Process an incoming webhook event.

        Args:
            event_source: The event source that the event belongs to.
            raw_body: The raw inbound webhook event.
            headers: The headers of the inbound webhook event.
        """
        json_body = self._load_payload(raw_body=raw_body, headers=headers)

        webhook_secret = self._get_webhook_secret(event_source)
        if webhook_secret:
            self._validate_webhook_event_signature(
                raw_body=raw_body,
                headers=headers,
                webhook_secret=webhook_secret,
            )

        event = self._interpret_event(json_body)

        self.dispatch_event(
            event=event,
            event_source=event_source,
        )
config_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventSourceConfig] property readonly

Returns the webhook event source configuration class.

Returns:

Type Description
Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventSourceConfig]

The configuration.

filter_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventFilterConfig] property readonly

Returns the webhook event filter configuration class.

Returns:

Type Description
Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventFilterConfig]

The event filter configuration class.

flavor_class: Type[BaseWebhookEventSourceFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BaseWebhookEventSourceFlavor]

The flavor class of the plugin.

is_valid_signature(self, raw_body, secret_token, signature_header)

Verify the SHA256 signature of the payload.

Parameters:

Name Type Description Default
raw_body bytes

original request body to verify

required
secret_token str

secret token used to generate the signature

required
signature_header str

signature header to verify (x-hub-signature-256)

required

Returns:

Type Description
bool

Whether if the signature is valid.

Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
def is_valid_signature(
    self, raw_body: bytes, secret_token: str, signature_header: str
) -> bool:
    """Verify the SHA256 signature of the payload.

    Args:
        raw_body: original request body to verify
        secret_token: secret token used to generate the signature
        signature_header: signature header to verify (x-hub-signature-256)

    Returns:
        Whether if the signature is valid.
    """
    hash_object = hmac.new(
        secret_token.encode("utf-8"),
        msg=raw_body,
        digestmod=hashlib.sha256,
    )
    expected_signature = "sha256=" + hash_object.hexdigest()

    if not hmac.compare_digest(expected_signature, signature_header):
        return False
    return True
process_webhook_event(self, event_source, raw_body, headers)

Process an incoming webhook event.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source that the event belongs to.

required
raw_body bytes

The raw inbound webhook event.

required
headers Dict[str, str]

The headers of the inbound webhook event.

required
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
def process_webhook_event(
    self,
    event_source: EventSourceResponse,
    raw_body: bytes,
    headers: Dict[str, str],
) -> None:
    """Process an incoming webhook event.

    Args:
        event_source: The event source that the event belongs to.
        raw_body: The raw inbound webhook event.
        headers: The headers of the inbound webhook event.
    """
    json_body = self._load_payload(raw_body=raw_body, headers=headers)

    webhook_secret = self._get_webhook_secret(event_source)
    if webhook_secret:
        self._validate_webhook_event_signature(
            raw_body=raw_body,
            headers=headers,
            webhook_secret=webhook_secret,
        )

    event = self._interpret_event(json_body)

    self.dispatch_event(
        event=event,
        event_source=event_source,
    )
WebhookEventFilterConfig (EventFilterConfig)

The Event Filter configuration.

Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class WebhookEventFilterConfig(EventFilterConfig):
    """The Event Filter configuration."""
WebhookEventSourceConfig (EventSourceConfig)

The Event Source configuration.

Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class WebhookEventSourceConfig(EventSourceConfig):
    """The Event Source configuration."""