Event Sources
zenml.event_sources
special
Base Classes for Event Sources.
base_event
Base implementation for events.
BaseEvent (BaseModel)
Base class for all inbound events.
Source code in zenml/event_sources/base_event.py
class BaseEvent(BaseModel):
"""Base class for all inbound events."""
base_event_source
Base implementation for event sources.
BaseEventSourceFlavor (BasePluginFlavor, ABC)
Base Event Plugin Flavor to access an event plugin along with its configurations.
Source code in zenml/event_sources/base_event_source.py
class BaseEventSourceFlavor(BasePluginFlavor, ABC):
"""Base Event Plugin Flavor to access an event plugin along with its configurations."""
TYPE: ClassVar[PluginType] = PluginType.EVENT_SOURCE
# EventPlugin specific
EVENT_SOURCE_CONFIG_CLASS: ClassVar[Type[EventSourceConfig]]
EVENT_FILTER_CONFIG_CLASS: ClassVar[Type[EventFilterConfig]]
@classmethod
def get_event_filter_config_schema(cls) -> Dict[str, Any]:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return cls.EVENT_SOURCE_CONFIG_CLASS.model_json_schema()
@classmethod
def get_event_source_config_schema(cls) -> Dict[str, Any]:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return cls.EVENT_FILTER_CONFIG_CLASS.model_json_schema()
@classmethod
def get_flavor_response_model(
cls, hydrate: bool
) -> EventSourceFlavorResponse:
"""Convert the Flavor into a Response Model.
Args:
hydrate: Whether the model should be hydrated.
Returns:
The Flavor Response model for the Event Source implementation
"""
metadata = None
if hydrate:
metadata = EventSourceFlavorResponseMetadata(
source_config_schema=cls.get_event_source_config_schema(),
filter_config_schema=cls.get_event_filter_config_schema(),
)
return EventSourceFlavorResponse(
body=EventSourceFlavorResponseBody(),
metadata=metadata,
name=cls.FLAVOR,
type=cls.TYPE,
subtype=cls.SUBTYPE,
)
get_event_filter_config_schema()
classmethod
The config schema for a flavor.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The config schema. |
Source code in zenml/event_sources/base_event_source.py
@classmethod
def get_event_filter_config_schema(cls) -> Dict[str, Any]:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return cls.EVENT_SOURCE_CONFIG_CLASS.model_json_schema()
get_event_source_config_schema()
classmethod
The config schema for a flavor.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The config schema. |
Source code in zenml/event_sources/base_event_source.py
@classmethod
def get_event_source_config_schema(cls) -> Dict[str, Any]:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return cls.EVENT_FILTER_CONFIG_CLASS.model_json_schema()
get_flavor_response_model(hydrate)
classmethod
Convert the Flavor into a Response Model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hydrate |
bool |
Whether the model should be hydrated. |
required |
Returns:
Type | Description |
---|---|
EventSourceFlavorResponse |
The Flavor Response model for the Event Source implementation |
Source code in zenml/event_sources/base_event_source.py
@classmethod
def get_flavor_response_model(
cls, hydrate: bool
) -> EventSourceFlavorResponse:
"""Convert the Flavor into a Response Model.
Args:
hydrate: Whether the model should be hydrated.
Returns:
The Flavor Response model for the Event Source implementation
"""
metadata = None
if hydrate:
metadata = EventSourceFlavorResponseMetadata(
source_config_schema=cls.get_event_source_config_schema(),
filter_config_schema=cls.get_event_filter_config_schema(),
)
return EventSourceFlavorResponse(
body=EventSourceFlavorResponseBody(),
metadata=metadata,
name=cls.FLAVOR,
type=cls.TYPE,
subtype=cls.SUBTYPE,
)
BaseEventSourceHandler (BasePlugin, ABC)
Base event source handler implementation.
This class provides a base implementation for event source handlers.
The base event source handler acts as an intermediary between the REST API endpoints and the ZenML store for all operations related to event sources. It implements all operations related creating, updating, deleting, and fetching event sources from the database. It also provides a set of methods that can be overridden by concrete event source handlers that need to react to or participate in the lifecycle of an event source:
_validate_event_source_request
: validate and/or modify an event source before it is created (before it is persisted in the database)_process_event_source_request
: react to a new event source being created (after it is persisted in the database)_validate_event_source_update
: validate and/or modify an event source update before it is applied (before the update is saved in the database)_process_event_source_update
: react to an event source being updated (after the update is saved in the database)_process_event_source_delete
: react to an event source being deleted (before it is deleted from the database)_process_event_source_response
: modify an event source before it is returned to the user
In addition to optionally overriding these methods, every event source handler must define and provide configuration classes for the event source and the event filter. These are used to validate and instantiate the configuration from the user requests.
Finally, since event source handlers are also sources of events, the base class provides methods that implementations can use to dispatch events to the central event hub where they can be processed and eventually trigger actions.
Source code in zenml/event_sources/base_event_source.py
class BaseEventSourceHandler(BasePlugin, ABC):
"""Base event source handler implementation.
This class provides a base implementation for event source handlers.
The base event source handler acts as an intermediary between the REST API
endpoints and the ZenML store for all operations related to event sources.
It implements all operations related creating, updating, deleting, and
fetching event sources from the database. It also provides a set of methods
that can be overridden by concrete event source handlers that need to
react to or participate in the lifecycle of an event source:
* `_validate_event_source_request`: validate and/or modify an event source
before it is created (before it is persisted in the database)
* `_process_event_source_request`: react to a new event source being created
(after it is persisted in the database)
* `_validate_event_source_update`: validate and/or modify an event source
update before it is applied (before the update is saved in the database)
* `_process_event_source_update`: react to an event source being updated
(after the update is saved in the database)
* `_process_event_source_delete`: react to an event source being deleted
(before it is deleted from the database)
* `_process_event_source_response`: modify an event source before it is
returned to the user
In addition to optionally overriding these methods, every event source
handler must define and provide configuration classes for the event source
and the event filter. These are used to validate and instantiate the
configuration from the user requests.
Finally, since event source handlers are also sources of events, the base
class provides methods that implementations can use to dispatch events to
the central event hub where they can be processed and eventually trigger
actions.
"""
_event_hub: Optional[BaseEventHub] = None
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
"""Event source handler initialization.
Args:
event_hub: Optional event hub to use to initialize the event source
handler. If not set during initialization, it may be set
at a later time by calling `set_event_hub`. An event hub must
be configured before the event handler needs to dispatch events.
"""
super().__init__()
if event_hub is None:
from zenml.event_hub.event_hub import (
event_hub as default_event_hub,
)
# TODO: for now, we use the default internal event hub. In
# the future, this should be configurable.
event_hub = default_event_hub
self.set_event_hub(event_hub)
@property
@abstractmethod
def config_class(self) -> Type[EventSourceConfig]:
"""Returns the event source configuration class.
Returns:
The configuration.
"""
@property
@abstractmethod
def filter_class(self) -> Type[EventFilterConfig]:
"""Returns the event filter configuration class.
Returns:
The event filter configuration class.
"""
@property
@abstractmethod
def flavor_class(self) -> "Type[BaseEventSourceFlavor]":
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
@property
def event_hub(self) -> BaseEventHub:
"""Get the event hub used to dispatch events.
Returns:
The event hub.
Raises:
RuntimeError: if an event hub isn't configured.
"""
if self._event_hub is None:
raise RuntimeError(
f"An event hub is not configured for the "
f"{self.flavor_class.FLAVOR} {self.flavor_class.SUBTYPE} "
f"event source handler."
)
return self._event_hub
def set_event_hub(self, event_hub: BaseEventHub) -> None:
"""Configure an event hub for this event source plugin.
Args:
event_hub: Event hub to be used by this event handler to dispatch
events.
"""
self._event_hub = event_hub
def create_event_source(
self, event_source: EventSourceRequest
) -> EventSourceResponse:
"""Process an event source request and create the event source in the database.
Args:
event_source: Event source request.
Returns:
The created event source.
# noqa: DAR401
"""
# Validate and instantiate the configuration from the request
config = self.validate_event_source_configuration(
event_source.configuration
)
# Call the implementation specific method to validate the request
# before it is sent to the database
self._validate_event_source_request(
event_source=event_source, config=config
)
# Serialize the configuration back into the request
event_source.configuration = config.model_dump(exclude_none=True)
# Create the event source in the database
event_source_response = self.zen_store.create_event_source(
event_source=event_source
)
try:
# Instantiate the configuration from the response
config = self.validate_event_source_configuration(
event_source.configuration
)
# Call the implementation specific method to process the created
# event source
self._process_event_source_request(
event_source=event_source_response, config=config
)
except Exception:
# If the event source creation fails, delete the event source from
# the database
logger.exception(
f"Failed to create event source {event_source_response}. "
f"Deleting the event source."
)
self.zen_store.delete_event_source(
event_source_id=event_source_response.id
)
raise
# Serialize the configuration back into the response
event_source_response.set_configuration(
config.model_dump(exclude_none=True)
)
# Return the response to the user
return event_source_response
def update_event_source(
self,
event_source: EventSourceResponse,
event_source_update: EventSourceUpdate,
) -> EventSourceResponse:
"""Process an event source update request and update the event source in the database.
Args:
event_source: The event source to update.
event_source_update: The update to be applied to the event source.
Returns:
The updated event source.
# noqa: DAR401
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_event_source_configuration(
event_source.configuration
)
# Validate and instantiate the configuration from the update request
# NOTE: if supplied, the configuration update is a full replacement
# of the original configuration
config_update = config
if event_source_update.configuration is not None:
config_update = self.validate_event_source_configuration(
event_source_update.configuration
)
# Call the implementation specific method to validate the update request
# before it is sent to the database
self._validate_event_source_update(
event_source=event_source,
config=config,
event_source_update=event_source_update,
config_update=config_update,
)
# Serialize the configuration update back into the update request
event_source_update.configuration = config_update.model_dump(
exclude_none=True
)
# Update the event source in the database
event_source_response = self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=event_source_update,
)
try:
# Instantiate the configuration from the response
response_config = self.validate_event_source_configuration(
event_source_response.configuration
)
# Call the implementation specific method to process the update
# request before it is sent to the database
self._process_event_source_update(
event_source=event_source_response,
config=response_config,
previous_event_source=event_source,
previous_config=config,
)
except Exception:
# If the event source update fails, roll back the event source in
# the database to the original state
logger.exception(
f"Failed to update event source {event_source}. "
f"Rolling back the event source to the previous state."
)
self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=EventSourceUpdate.from_response(
event_source
),
)
raise
# Serialize the configuration back into the response
event_source_response.set_configuration(
response_config.model_dump(exclude_none=True)
)
# Return the response to the user
return event_source_response
def delete_event_source(
self,
event_source: EventSourceResponse,
force: bool = False,
) -> None:
"""Process an event source delete request and delete the event source in the database.
Args:
event_source: The event source to delete.
force: Whether to force delete the event source from the database
even if the event source handler fails to delete the event
source.
# noqa: DAR401
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_event_source_configuration(
event_source.configuration
)
try:
# Call the implementation specific method to process the deleted
# event source before it is deleted from the database
self._process_event_source_delete(
event_source=event_source,
config=config,
force=force,
)
except Exception:
logger.exception(f"Failed to delete event source {event_source}. ")
if not force:
raise
logger.warning(f"Force deleting event source {event_source}.")
# Delete the event source from the database
self.zen_store.delete_event_source(
event_source_id=event_source.id,
)
def get_event_source(
self, event_source: EventSourceResponse, hydrate: bool = False
) -> EventSourceResponse:
"""Process an event source response before it is returned to the user.
Args:
event_source: The event source fetched from the database.
hydrate: Whether to hydrate the event source.
Returns:
The event source.
"""
if hydrate:
# Instantiate the configuration from the response
config = self.validate_event_source_configuration(
event_source.configuration
)
# Call the implementation specific method to process the response
self._process_event_source_response(
event_source=event_source, config=config
)
# Serialize the configuration back into the response
event_source.set_configuration(
config.model_dump(exclude_none=True)
)
# Return the response to the user
return event_source
def validate_event_source_configuration(
self, event_source_config: Dict[str, Any]
) -> EventSourceConfig:
"""Validate and return the event source configuration.
Args:
event_source_config: The event source configuration to validate.
Returns:
The validated event source configuration.
Raises:
ValueError: if the configuration is invalid.
"""
try:
return self.config_class(**event_source_config)
except ValueError as e:
raise ValueError(
f"Invalid configuration for event source: {e}."
) from e
def validate_event_filter_configuration(
self,
configuration: Dict[str, Any],
) -> EventFilterConfig:
"""Validate and return the configuration of an event filter.
Args:
configuration: The configuration to validate.
Returns:
Instantiated event filter config.
Raises:
ValueError: if the configuration is invalid.
"""
try:
return self.filter_class(**configuration)
except ValueError as e:
raise ValueError(
f"Invalid configuration for event filter: {e}."
) from e
def dispatch_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> None:
"""Dispatch an event to all active triggers that match the event.
Args:
event: The event to dispatch.
event_source: The event source that produced the event.
"""
if not event_source.is_active:
logger.debug(
f"Event source {event_source.id} is not active. Skipping event "
f"dispatch."
)
return
self.event_hub.publish_event(
event=event,
event_source=event_source,
)
def _validate_event_source_request(
self, event_source: EventSourceRequest, config: EventSourceConfig
) -> None:
"""Validate an event source request before it is created in the database.
Concrete event source handlers should override this method to add
implementation specific functionality pertaining to the validation of
a new event source. The implementation may also modify the event source
request and/or configuration in place to apply implementation specific
changes before the request is stored in the database.
If validation is required, the implementation should raise a
ValueError if the configuration is invalid. If any exceptions are raised
during the validation, the event source will not be created in the
database.
The resulted configuration is serialized back into the event source
request before it is stored in the database.
The implementation should not use this method to provision any external
resources, as the event source may not be created in the database if
the database level validation fails.
Args:
event_source: Event source request.
config: Event source configuration instantiated from the request.
"""
pass
def _process_event_source_request(
self, event_source: EventSourceResponse, config: EventSourceConfig
) -> None:
"""Process an event source request after it is created in the database.
Concrete event source handlers should override this method to add
implementation specific functionality pertaining to the creation of
a new event source. The implementation may also modify the event source
response and/or configuration in place to apply implementation specific
changes before the response is returned to the user.
The resulted configuration is serialized back into the event source
response before it is returned to the user.
The implementation should use this method to provision any external
resources required for the event source. If any of the provisioning
fails, the implementation should raise an exception and the event source
will be deleted from the database.
Args:
event_source: Newly created event source
config: Event source configuration instantiated from the response.
"""
pass
def _validate_event_source_update(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
event_source_update: EventSourceUpdate,
config_update: EventSourceConfig,
) -> None:
"""Validate an event source update before it is reflected in the database.
Concrete event source handlers should override this method to add
implementation specific functionality pertaining to validation of an
event source update request. The implementation may also modify the
event source update and/or configuration update in place to apply
implementation specific changes.
If validation is required, the implementation should raise a
ValueError if the configuration update is invalid. If any exceptions are
raised during the validation, the event source will not be updated in
the database.
The resulted configuration update is serialized back into the event
source update before it is stored in the database.
The implementation should not use this method to provision any external
resources, as the event source may not be updated in the database if
the database level validation fails.
Args:
event_source: Original event source before the update.
config: Event source configuration instantiated from the original
event source.
event_source_update: Event source update request.
config_update: Event source configuration instantiated from the
updated event source.
"""
pass
def _process_event_source_update(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
previous_event_source: EventSourceResponse,
previous_config: EventSourceConfig,
) -> None:
"""Process an event source after it is updated in the database.
Concrete event source handlers should override this method to add
implementation specific functionality pertaining to updating an existing
event source. The implementation may also modify the event source
and/or configuration in place to apply implementation specific
changes before the response is returned to the user.
The resulted configuration is serialized back into the event source
response before it is returned to the user.
The implementation should use this method to provision any external
resources required for the event source update. If any of the
provisioning fails, the implementation should raise an exception and the
event source will be rolled back to the previous state in the database.
Args:
event_source: Event source after the update.
config: Event source configuration instantiated from the updated
event source.
previous_event_source: Original event source before the update.
previous_config: Event source configuration instantiated from the
original event source.
"""
pass
def _process_event_source_delete(
self,
event_source: EventSourceResponse,
config: EventSourceConfig,
force: Optional[bool] = False,
) -> None:
"""Process an event source before it is deleted from the database.
Concrete event source handlers should override this method to add
implementation specific functionality pertaining to the deletion of an
event source.
The implementation should use this method to deprovision any external
resources required for the event source. If any of the deprovisioning
fails, the implementation should raise an exception and the
event source will kept in the database (unless force is True, in which
case the event source will be deleted from the database regardless of
any deprovisioning failures).
Args:
event_source: Event source before the deletion.
config: Validated instantiated event source configuration before
the deletion.
force: Whether to force deprovision the event source.
"""
pass
def _process_event_source_response(
self, event_source: EventSourceResponse, config: EventSourceConfig
) -> None:
"""Process an event source response before it is returned to the user.
Concrete event source handlers should override this method to add
implementation specific functionality pertaining to how the event source
response is returned to the user. The implementation may modify the
the event source response and/or configuration in place.
The resulted configuration is serialized back into the event source
response before it is returned to the user.
This method is applied to all event source responses fetched from the
database before they are returned to the user, with the exception of
those returned from the `create_event_source`, `update_event_source`,
and `delete_event_source` methods, which have their own specific
processing methods.
Args:
event_source: Event source response.
config: Event source configuration instantiated from the response.
"""
pass
config_class: Type[zenml.event_sources.base_event_source.EventSourceConfig]
property
readonly
Returns the event source configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.event_sources.base_event_source.EventSourceConfig] |
The configuration. |
event_hub: BaseEventHub
property
readonly
Get the event hub used to dispatch events.
Returns:
Type | Description |
---|---|
BaseEventHub |
The event hub. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an event hub isn't configured. |
filter_class: Type[zenml.event_sources.base_event_source.EventFilterConfig]
property
readonly
Returns the event filter configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.event_sources.base_event_source.EventFilterConfig] |
The event filter configuration class. |
flavor_class: Type[BaseEventSourceFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[BaseEventSourceFlavor] |
The flavor class of the plugin. |
__init__(self, event_hub=None)
special
Event source handler initialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_hub |
Optional[zenml.event_hub.base_event_hub.BaseEventHub] |
Optional event hub to use to initialize the event source
handler. If not set during initialization, it may be set
at a later time by calling |
None |
Source code in zenml/event_sources/base_event_source.py
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
"""Event source handler initialization.
Args:
event_hub: Optional event hub to use to initialize the event source
handler. If not set during initialization, it may be set
at a later time by calling `set_event_hub`. An event hub must
be configured before the event handler needs to dispatch events.
"""
super().__init__()
if event_hub is None:
from zenml.event_hub.event_hub import (
event_hub as default_event_hub,
)
# TODO: for now, we use the default internal event hub. In
# the future, this should be configurable.
event_hub = default_event_hub
self.set_event_hub(event_hub)
create_event_source(self, event_source)
Process an event source request and create the event source in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_source |
EventSourceRequest |
Event source request. |
required |
Returns:
Type | Description |
---|---|
EventSourceResponse |
The created event source. |
noqa: DAR401
Source code in zenml/event_sources/base_event_source.py
def create_event_source(
self, event_source: EventSourceRequest
) -> EventSourceResponse:
"""Process an event source request and create the event source in the database.
Args:
event_source: Event source request.
Returns:
The created event source.
# noqa: DAR401
"""
# Validate and instantiate the configuration from the request
config = self.validate_event_source_configuration(
event_source.configuration
)
# Call the implementation specific method to validate the request
# before it is sent to the database
self._validate_event_source_request(
event_source=event_source, config=config
)
# Serialize the configuration back into the request
event_source.configuration = config.model_dump(exclude_none=True)
# Create the event source in the database
event_source_response = self.zen_store.create_event_source(
event_source=event_source
)
try:
# Instantiate the configuration from the response
config = self.validate_event_source_configuration(
event_source.configuration
)
# Call the implementation specific method to process the created
# event source
self._process_event_source_request(
event_source=event_source_response, config=config
)
except Exception:
# If the event source creation fails, delete the event source from
# the database
logger.exception(
f"Failed to create event source {event_source_response}. "
f"Deleting the event source."
)
self.zen_store.delete_event_source(
event_source_id=event_source_response.id
)
raise
# Serialize the configuration back into the response
event_source_response.set_configuration(
config.model_dump(exclude_none=True)
)
# Return the response to the user
return event_source_response
delete_event_source(self, event_source, force=False)
Process an event source delete request and delete the event source in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_source |
EventSourceResponse |
The event source to delete. |
required |
force |
bool |
Whether to force delete the event source from the database even if the event source handler fails to delete the event source. |
False |
noqa: DAR401
Source code in zenml/event_sources/base_event_source.py
def delete_event_source(
self,
event_source: EventSourceResponse,
force: bool = False,
) -> None:
"""Process an event source delete request and delete the event source in the database.
Args:
event_source: The event source to delete.
force: Whether to force delete the event source from the database
even if the event source handler fails to delete the event
source.
# noqa: DAR401
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_event_source_configuration(
event_source.configuration
)
try:
# Call the implementation specific method to process the deleted
# event source before it is deleted from the database
self._process_event_source_delete(
event_source=event_source,
config=config,
force=force,
)
except Exception:
logger.exception(f"Failed to delete event source {event_source}. ")
if not force:
raise
logger.warning(f"Force deleting event source {event_source}.")
# Delete the event source from the database
self.zen_store.delete_event_source(
event_source_id=event_source.id,
)
dispatch_event(self, event, event_source)
Dispatch an event to all active triggers that match the event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The event to dispatch. |
required |
event_source |
EventSourceResponse |
The event source that produced the event. |
required |
Source code in zenml/event_sources/base_event_source.py
def dispatch_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> None:
"""Dispatch an event to all active triggers that match the event.
Args:
event: The event to dispatch.
event_source: The event source that produced the event.
"""
if not event_source.is_active:
logger.debug(
f"Event source {event_source.id} is not active. Skipping event "
f"dispatch."
)
return
self.event_hub.publish_event(
event=event,
event_source=event_source,
)
get_event_source(self, event_source, hydrate=False)
Process an event source response before it is returned to the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_source |
EventSourceResponse |
The event source fetched from the database. |
required |
hydrate |
bool |
Whether to hydrate the event source. |
False |
Returns:
Type | Description |
---|---|
EventSourceResponse |
The event source. |
Source code in zenml/event_sources/base_event_source.py
def get_event_source(
self, event_source: EventSourceResponse, hydrate: bool = False
) -> EventSourceResponse:
"""Process an event source response before it is returned to the user.
Args:
event_source: The event source fetched from the database.
hydrate: Whether to hydrate the event source.
Returns:
The event source.
"""
if hydrate:
# Instantiate the configuration from the response
config = self.validate_event_source_configuration(
event_source.configuration
)
# Call the implementation specific method to process the response
self._process_event_source_response(
event_source=event_source, config=config
)
# Serialize the configuration back into the response
event_source.set_configuration(
config.model_dump(exclude_none=True)
)
# Return the response to the user
return event_source
set_event_hub(self, event_hub)
Configure an event hub for this event source plugin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_hub |
BaseEventHub |
Event hub to be used by this event handler to dispatch events. |
required |
Source code in zenml/event_sources/base_event_source.py
def set_event_hub(self, event_hub: BaseEventHub) -> None:
"""Configure an event hub for this event source plugin.
Args:
event_hub: Event hub to be used by this event handler to dispatch
events.
"""
self._event_hub = event_hub
update_event_source(self, event_source, event_source_update)
Process an event source update request and update the event source in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_source |
EventSourceResponse |
The event source to update. |
required |
event_source_update |
EventSourceUpdate |
The update to be applied to the event source. |
required |
Returns:
Type | Description |
---|---|
EventSourceResponse |
The updated event source. |
noqa: DAR401
Source code in zenml/event_sources/base_event_source.py
def update_event_source(
self,
event_source: EventSourceResponse,
event_source_update: EventSourceUpdate,
) -> EventSourceResponse:
"""Process an event source update request and update the event source in the database.
Args:
event_source: The event source to update.
event_source_update: The update to be applied to the event source.
Returns:
The updated event source.
# noqa: DAR401
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_event_source_configuration(
event_source.configuration
)
# Validate and instantiate the configuration from the update request
# NOTE: if supplied, the configuration update is a full replacement
# of the original configuration
config_update = config
if event_source_update.configuration is not None:
config_update = self.validate_event_source_configuration(
event_source_update.configuration
)
# Call the implementation specific method to validate the update request
# before it is sent to the database
self._validate_event_source_update(
event_source=event_source,
config=config,
event_source_update=event_source_update,
config_update=config_update,
)
# Serialize the configuration update back into the update request
event_source_update.configuration = config_update.model_dump(
exclude_none=True
)
# Update the event source in the database
event_source_response = self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=event_source_update,
)
try:
# Instantiate the configuration from the response
response_config = self.validate_event_source_configuration(
event_source_response.configuration
)
# Call the implementation specific method to process the update
# request before it is sent to the database
self._process_event_source_update(
event_source=event_source_response,
config=response_config,
previous_event_source=event_source,
previous_config=config,
)
except Exception:
# If the event source update fails, roll back the event source in
# the database to the original state
logger.exception(
f"Failed to update event source {event_source}. "
f"Rolling back the event source to the previous state."
)
self.zen_store.update_event_source(
event_source_id=event_source.id,
event_source_update=EventSourceUpdate.from_response(
event_source
),
)
raise
# Serialize the configuration back into the response
event_source_response.set_configuration(
response_config.model_dump(exclude_none=True)
)
# Return the response to the user
return event_source_response
validate_event_filter_configuration(self, configuration)
Validate and return the configuration of an event filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
configuration |
Dict[str, Any] |
The configuration to validate. |
required |
Returns:
Type | Description |
---|---|
EventFilterConfig |
Instantiated event filter config. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the configuration is invalid. |
Source code in zenml/event_sources/base_event_source.py
def validate_event_filter_configuration(
self,
configuration: Dict[str, Any],
) -> EventFilterConfig:
"""Validate and return the configuration of an event filter.
Args:
configuration: The configuration to validate.
Returns:
Instantiated event filter config.
Raises:
ValueError: if the configuration is invalid.
"""
try:
return self.filter_class(**configuration)
except ValueError as e:
raise ValueError(
f"Invalid configuration for event filter: {e}."
) from e
validate_event_source_configuration(self, event_source_config)
Validate and return the event source configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_source_config |
Dict[str, Any] |
The event source configuration to validate. |
required |
Returns:
Type | Description |
---|---|
EventSourceConfig |
The validated event source configuration. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the configuration is invalid. |
Source code in zenml/event_sources/base_event_source.py
def validate_event_source_configuration(
self, event_source_config: Dict[str, Any]
) -> EventSourceConfig:
"""Validate and return the event source configuration.
Args:
event_source_config: The event source configuration to validate.
Returns:
The validated event source configuration.
Raises:
ValueError: if the configuration is invalid.
"""
try:
return self.config_class(**event_source_config)
except ValueError as e:
raise ValueError(
f"Invalid configuration for event source: {e}."
) from e
EventFilterConfig (BaseModel, ABC)
The Event Filter configuration.
Source code in zenml/event_sources/base_event_source.py
class EventFilterConfig(BaseModel, ABC):
"""The Event Filter configuration."""
@abstractmethod
def event_matches_filter(self, event: BaseEvent) -> bool:
"""All implementations need to implement this check.
If the filter matches the inbound event instance, this should
return True, else False.
Args:
event: The inbound event instance.
Returns:
Whether the filter matches the event.
"""
event_matches_filter(self, event)
All implementations need to implement this check.
If the filter matches the inbound event instance, this should return True, else False.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The inbound event instance. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the filter matches the event. |
Source code in zenml/event_sources/base_event_source.py
@abstractmethod
def event_matches_filter(self, event: BaseEvent) -> bool:
"""All implementations need to implement this check.
If the filter matches the inbound event instance, this should
return True, else False.
Args:
event: The inbound event instance.
Returns:
Whether the filter matches the event.
"""
EventSourceConfig (BasePluginConfig)
The Event Source configuration.
Source code in zenml/event_sources/base_event_source.py
class EventSourceConfig(BasePluginConfig):
"""The Event Source configuration."""
webhooks
special
Base Classes for Webhook Event Sources.
base_webhook_event_source
Abstract BaseEvent class that all Event implementations must implement.
BaseWebhookEvent (BaseEvent)
Base class for all inbound events.
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class BaseWebhookEvent(BaseEvent):
"""Base class for all inbound events."""
BaseWebhookEventSourceFlavor (BaseEventSourceFlavor, ABC)
Base Event Plugin Flavor to access an event plugin along with its configurations.
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class BaseWebhookEventSourceFlavor(BaseEventSourceFlavor, ABC):
"""Base Event Plugin Flavor to access an event plugin along with its configurations."""
SUBTYPE: ClassVar[PluginSubType] = PluginSubType.WEBHOOK
BaseWebhookEventSourceHandler (BaseEventSourceHandler, ABC)
Base implementation for all Webhook event sources.
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class BaseWebhookEventSourceHandler(BaseEventSourceHandler, ABC):
"""Base implementation for all Webhook event sources."""
@property
@abstractmethod
def config_class(self) -> Type[WebhookEventSourceConfig]:
"""Returns the webhook event source configuration class.
Returns:
The configuration.
"""
@property
@abstractmethod
def filter_class(self) -> Type[WebhookEventFilterConfig]:
"""Returns the webhook event filter configuration class.
Returns:
The event filter configuration class.
"""
@property
@abstractmethod
def flavor_class(self) -> "Type[BaseWebhookEventSourceFlavor]":
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
def is_valid_signature(
self, raw_body: bytes, secret_token: str, signature_header: str
) -> bool:
"""Verify the SHA256 signature of the payload.
Args:
raw_body: original request body to verify
secret_token: secret token used to generate the signature
signature_header: signature header to verify (x-hub-signature-256)
Returns:
Whether if the signature is valid.
"""
hash_object = hmac.new(
secret_token.encode("utf-8"),
msg=raw_body,
digestmod=hashlib.sha256,
)
expected_signature = "sha256=" + hash_object.hexdigest()
if not hmac.compare_digest(expected_signature, signature_header):
return False
return True
@abstractmethod
def _interpret_event(self, event: Dict[str, Any]) -> BaseEvent:
"""Converts the generic event body into a event-source specific pydantic model.
Args:
event: The generic event body
Return:
An instance of the event source specific pydantic model.
"""
@abstractmethod
def _get_webhook_secret(
self, event_source: EventSourceResponse
) -> Optional[str]:
"""Get the webhook secret for the event source.
Inheriting classes should implement this method to retrieve the webhook
secret associated with an event source. If a webhook secret is not
applicable for the event source, this method should return None.
Args:
event_source: The event source to retrieve the secret for.
Return:
The webhook secret associated with the event source, or None if a
secret is not applicable.
"""
def _validate_webhook_event_signature(
self, raw_body: bytes, headers: Dict[str, str], webhook_secret: str
) -> None:
"""Validate the signature of an incoming webhook event.
Args:
raw_body: The raw inbound webhook event.
headers: The headers of the inbound webhook event.
webhook_secret: The webhook secret to use for signature validation.
Raises:
AuthorizationException: If the signature validation fails.
"""
signature_header = headers.get("x-hub-signature-256") or headers.get(
"x-hub-signature"
)
if not signature_header:
raise AuthorizationException(
"x-hub-signature-256 or x-hub-signature header is missing!"
)
if not self.is_valid_signature(
raw_body=raw_body,
secret_token=webhook_secret,
signature_header=signature_header,
):
raise AuthorizationException(
"Webhook signature verification failed!"
)
def _load_payload(
self, raw_body: bytes, headers: Dict[str, str]
) -> Dict[Any, Any]:
"""Converts the raw body of the request into a python dictionary.
Args:
raw_body: The raw event body.
headers: The request headers.
Returns:
An instance of the event source specific pydantic model.
Raises:
ValueError: In case the body can not be parsed.
"""
# For now assume all webhook events are json encoded and parse
# the body as such.
try:
body_dict: Dict[Any, Any] = json.loads(raw_body)
return body_dict
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON body received: {e}")
def process_webhook_event(
self,
event_source: EventSourceResponse,
raw_body: bytes,
headers: Dict[str, str],
) -> None:
"""Process an incoming webhook event.
Args:
event_source: The event source that the event belongs to.
raw_body: The raw inbound webhook event.
headers: The headers of the inbound webhook event.
"""
json_body = self._load_payload(raw_body=raw_body, headers=headers)
webhook_secret = self._get_webhook_secret(event_source)
if webhook_secret:
self._validate_webhook_event_signature(
raw_body=raw_body,
headers=headers,
webhook_secret=webhook_secret,
)
event = self._interpret_event(json_body)
self.dispatch_event(
event=event,
event_source=event_source,
)
config_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventSourceConfig]
property
readonly
Returns the webhook event source configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventSourceConfig] |
The configuration. |
filter_class: Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventFilterConfig]
property
readonly
Returns the webhook event filter configuration class.
Returns:
Type | Description |
---|---|
Type[zenml.event_sources.webhooks.base_webhook_event_source.WebhookEventFilterConfig] |
The event filter configuration class. |
flavor_class: Type[BaseWebhookEventSourceFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[BaseWebhookEventSourceFlavor] |
The flavor class of the plugin. |
is_valid_signature(self, raw_body, secret_token, signature_header)
Verify the SHA256 signature of the payload.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raw_body |
bytes |
original request body to verify |
required |
secret_token |
str |
secret token used to generate the signature |
required |
signature_header |
str |
signature header to verify (x-hub-signature-256) |
required |
Returns:
Type | Description |
---|---|
bool |
Whether if the signature is valid. |
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
def is_valid_signature(
self, raw_body: bytes, secret_token: str, signature_header: str
) -> bool:
"""Verify the SHA256 signature of the payload.
Args:
raw_body: original request body to verify
secret_token: secret token used to generate the signature
signature_header: signature header to verify (x-hub-signature-256)
Returns:
Whether if the signature is valid.
"""
hash_object = hmac.new(
secret_token.encode("utf-8"),
msg=raw_body,
digestmod=hashlib.sha256,
)
expected_signature = "sha256=" + hash_object.hexdigest()
if not hmac.compare_digest(expected_signature, signature_header):
return False
return True
process_webhook_event(self, event_source, raw_body, headers)
Process an incoming webhook event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_source |
EventSourceResponse |
The event source that the event belongs to. |
required |
raw_body |
bytes |
The raw inbound webhook event. |
required |
headers |
Dict[str, str] |
The headers of the inbound webhook event. |
required |
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
def process_webhook_event(
self,
event_source: EventSourceResponse,
raw_body: bytes,
headers: Dict[str, str],
) -> None:
"""Process an incoming webhook event.
Args:
event_source: The event source that the event belongs to.
raw_body: The raw inbound webhook event.
headers: The headers of the inbound webhook event.
"""
json_body = self._load_payload(raw_body=raw_body, headers=headers)
webhook_secret = self._get_webhook_secret(event_source)
if webhook_secret:
self._validate_webhook_event_signature(
raw_body=raw_body,
headers=headers,
webhook_secret=webhook_secret,
)
event = self._interpret_event(json_body)
self.dispatch_event(
event=event,
event_source=event_source,
)
WebhookEventFilterConfig (EventFilterConfig)
The Event Filter configuration.
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class WebhookEventFilterConfig(EventFilterConfig):
"""The Event Filter configuration."""
WebhookEventSourceConfig (EventSourceConfig)
The Event Source configuration.
Source code in zenml/event_sources/webhooks/base_webhook_event_source.py
class WebhookEventSourceConfig(EventSourceConfig):
"""The Event Source configuration."""