Event Hub
zenml.event_hub
special
ZenML Event Hub module.
The Event Hub is responsible for receiving all Events and dispatching them to the relevant Subscribers/Triggers.
base_event_hub
Base class for event hub implementations.
BaseEventHub (ABC)
Base class for event hub implementations.
The event hub is responsible for relaying events from event sources to action handlers. It functions similarly to a pub/sub system where event source handlers publish events and action handlers subscribe to them.
The event hub also serves to decouple event sources from action handlers, allowing them to be configured independently and their implementations to be unaware of each other.
Source code in zenml/event_hub/base_event_hub.py
class BaseEventHub(ABC):
"""Base class for event hub implementations.
The event hub is responsible for relaying events from event sources to
action handlers. It functions similarly to a pub/sub system where
event source handlers publish events and action handlers subscribe to them.
The event hub also serves to decouple event sources from action handlers,
allowing them to be configured independently and their implementations to be
unaware of each other.
"""
action_handlers: Dict[Tuple[str, str], ActionHandlerCallback] = {}
@property
def zen_store(self) -> "SqlZenStore":
"""Returns the active zen store.
Returns:
The active zen store.
Raises:
ValueError: If the active zen store is not a SQL zen store.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStore
zen_store = GlobalConfiguration().zen_store
if not isinstance(zen_store, SqlZenStore):
raise ValueError("The event hub requires a SQL zen store.")
return zen_store
def subscribe_action_handler(
self,
action_flavor: str,
action_subtype: str,
callback: ActionHandlerCallback,
) -> None:
"""Subscribe an action handler to the event hub.
Args:
action_flavor: the flavor of the action to trigger.
action_subtype: the subtype of the action to trigger.
callback: the action to trigger when the trigger is activated.
"""
self.action_handlers[(action_flavor, action_subtype)] = callback
def unsubscribe_action_handler(
self,
action_flavor: str,
action_subtype: str,
) -> None:
"""Unsubscribe an action handler from the event hub.
Args:
action_flavor: the flavor of the action to trigger.
action_subtype: the subtype of the action to trigger.
"""
self.action_handlers.pop((action_flavor, action_subtype), None)
def trigger_action(
self,
event: BaseEvent,
event_source: EventSourceResponse,
trigger: TriggerResponse,
action_callback: ActionHandlerCallback,
) -> None:
"""Trigger an action.
Args:
event: The event.
event_source: The event source that produced the event.
trigger: The trigger that was activated.
action_callback: The action to trigger.
"""
request = TriggerExecutionRequest(
trigger=trigger.id, event_metadata=event.model_dump()
)
action_config = trigger.action.configuration
trigger_execution = self.zen_store.create_trigger_execution(request)
# Generate an API token that can be used by external workloads
# implementing the action to authenticate with the server. This token
# is associated with the service account configured for the trigger
# and has a validity defined by the trigger's authentication window.
token = JWTToken(
user_id=trigger.action.service_account.id,
)
expires: Optional[datetime] = None
if trigger.action.auth_window:
expires = datetime.utcnow() + timedelta(
minutes=trigger.action.auth_window
)
encoded_token = token.encode(expires=expires)
auth_context = AuthContext(
user=trigger.action.service_account,
access_token=token,
encoded_access_token=encoded_token,
)
try:
# TODO: We need to make this async, as this might take quite some
# time per trigger. We can either use threads starting here, or
# use fastapi background tasks that get passed here instead of
# running the event hub as a background tasks in the webhook
# endpoints
action_callback(
action_config,
trigger_execution,
auth_context,
)
except Exception:
# Don't let action errors stop the event hub from working
logger.exception(
f"An error occurred while executing trigger {trigger}."
)
@abstractmethod
def activate_trigger(self, trigger: TriggerResponse) -> None:
"""Add a trigger to the event hub.
Configure the event hub to trigger an action when an event is received.
Args:
trigger: the trigger to activate.
"""
@abstractmethod
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
"""Remove a trigger from the event hub.
Configure the event hub to stop triggering an action when an event is
received.
Args:
trigger: the trigger to deactivate.
"""
@abstractmethod
def publish_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> None:
"""Publish an event to the event hub.
Args:
event: The event.
event_source: The event source that produced the event.
"""
zen_store: SqlZenStore
property
readonly
Returns the active zen store.
Returns:
Type | Description |
---|---|
SqlZenStore |
The active zen store. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the active zen store is not a SQL zen store. |
activate_trigger(self, trigger)
Add a trigger to the event hub.
Configure the event hub to trigger an action when an event is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
TriggerResponse |
the trigger to activate. |
required |
Source code in zenml/event_hub/base_event_hub.py
@abstractmethod
def activate_trigger(self, trigger: TriggerResponse) -> None:
"""Add a trigger to the event hub.
Configure the event hub to trigger an action when an event is received.
Args:
trigger: the trigger to activate.
"""
deactivate_trigger(self, trigger)
Remove a trigger from the event hub.
Configure the event hub to stop triggering an action when an event is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
TriggerResponse |
the trigger to deactivate. |
required |
Source code in zenml/event_hub/base_event_hub.py
@abstractmethod
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
"""Remove a trigger from the event hub.
Configure the event hub to stop triggering an action when an event is
received.
Args:
trigger: the trigger to deactivate.
"""
publish_event(self, event, event_source)
Publish an event to the event hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The event. |
required |
event_source |
EventSourceResponse |
The event source that produced the event. |
required |
Source code in zenml/event_hub/base_event_hub.py
@abstractmethod
def publish_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> None:
"""Publish an event to the event hub.
Args:
event: The event.
event_source: The event source that produced the event.
"""
subscribe_action_handler(self, action_flavor, action_subtype, callback)
Subscribe an action handler to the event hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_flavor |
str |
the flavor of the action to trigger. |
required |
action_subtype |
str |
the subtype of the action to trigger. |
required |
callback |
Callable[[Dict[str, Any], zenml.models.v2.core.trigger_execution.TriggerExecutionResponse, zenml.zen_server.auth.AuthContext], NoneType] |
the action to trigger when the trigger is activated. |
required |
Source code in zenml/event_hub/base_event_hub.py
def subscribe_action_handler(
self,
action_flavor: str,
action_subtype: str,
callback: ActionHandlerCallback,
) -> None:
"""Subscribe an action handler to the event hub.
Args:
action_flavor: the flavor of the action to trigger.
action_subtype: the subtype of the action to trigger.
callback: the action to trigger when the trigger is activated.
"""
self.action_handlers[(action_flavor, action_subtype)] = callback
trigger_action(self, event, event_source, trigger, action_callback)
Trigger an action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The event. |
required |
event_source |
EventSourceResponse |
The event source that produced the event. |
required |
trigger |
TriggerResponse |
The trigger that was activated. |
required |
action_callback |
Callable[[Dict[str, Any], zenml.models.v2.core.trigger_execution.TriggerExecutionResponse, zenml.zen_server.auth.AuthContext], NoneType] |
The action to trigger. |
required |
Source code in zenml/event_hub/base_event_hub.py
def trigger_action(
self,
event: BaseEvent,
event_source: EventSourceResponse,
trigger: TriggerResponse,
action_callback: ActionHandlerCallback,
) -> None:
"""Trigger an action.
Args:
event: The event.
event_source: The event source that produced the event.
trigger: The trigger that was activated.
action_callback: The action to trigger.
"""
request = TriggerExecutionRequest(
trigger=trigger.id, event_metadata=event.model_dump()
)
action_config = trigger.action.configuration
trigger_execution = self.zen_store.create_trigger_execution(request)
# Generate an API token that can be used by external workloads
# implementing the action to authenticate with the server. This token
# is associated with the service account configured for the trigger
# and has a validity defined by the trigger's authentication window.
token = JWTToken(
user_id=trigger.action.service_account.id,
)
expires: Optional[datetime] = None
if trigger.action.auth_window:
expires = datetime.utcnow() + timedelta(
minutes=trigger.action.auth_window
)
encoded_token = token.encode(expires=expires)
auth_context = AuthContext(
user=trigger.action.service_account,
access_token=token,
encoded_access_token=encoded_token,
)
try:
# TODO: We need to make this async, as this might take quite some
# time per trigger. We can either use threads starting here, or
# use fastapi background tasks that get passed here instead of
# running the event hub as a background tasks in the webhook
# endpoints
action_callback(
action_config,
trigger_execution,
auth_context,
)
except Exception:
# Don't let action errors stop the event hub from working
logger.exception(
f"An error occurred while executing trigger {trigger}."
)
unsubscribe_action_handler(self, action_flavor, action_subtype)
Unsubscribe an action handler from the event hub.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_flavor |
str |
the flavor of the action to trigger. |
required |
action_subtype |
str |
the subtype of the action to trigger. |
required |
Source code in zenml/event_hub/base_event_hub.py
def unsubscribe_action_handler(
self,
action_flavor: str,
action_subtype: str,
) -> None:
"""Unsubscribe an action handler from the event hub.
Args:
action_flavor: the flavor of the action to trigger.
action_subtype: the subtype of the action to trigger.
"""
self.action_handlers.pop((action_flavor, action_subtype), None)
event_hub
Base class for all the Event Hub.
InternalEventHub (BaseEventHub)
Internal in-server event hub implementation.
The internal in-server event hub uses the database as a source of truth for configured triggers and triggers actions by calling the action handlers directly.
Source code in zenml/event_hub/event_hub.py
class InternalEventHub(BaseEventHub):
"""Internal in-server event hub implementation.
The internal in-server event hub uses the database as a source of truth for
configured triggers and triggers actions by calling the action handlers
directly.
"""
def activate_trigger(self, trigger: TriggerResponse) -> None:
"""Add a trigger to the event hub.
Configure the event hub to trigger an action when an event is received.
Args:
trigger: the trigger to activate.
"""
# We don't need to do anything here to change the event hub
# configuration. The in-server event hub already uses the database
# as the source of truth regarding configured active triggers.
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
"""Remove a trigger from the event hub.
Configure the event hub to stop triggering an action when an event is
received.
Args:
trigger: the trigger to deactivate.
"""
# We don't need to do anything here to change the event hub
# configuration. The in-server event hub already uses the database
# as the source of truth regarding configured active triggers.
def publish_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> None:
"""Process an incoming event and trigger all configured actions.
This will first check for any subscribers/triggers for this event,
then log the event for later reference and finally perform the
configured action(s).
Args:
event: The event.
event_source: The event source that produced the event.
"""
triggers = self.get_matching_active_triggers_for_event(
event=event, event_source=event_source
)
for trigger in triggers:
action_callback = self.action_handlers.get(
(trigger.action_flavor, trigger.action_subtype)
)
if not action_callback:
logger.error(
f"The event hub could not deliver event to action handler "
f"for flavor {trigger.action_flavor} and subtype "
f"{trigger.action_subtype} because no handler was found."
)
continue
self.trigger_action(
event=event,
event_source=event_source,
trigger=trigger,
action_callback=action_callback,
)
def get_matching_active_triggers_for_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> List[TriggerResponse]:
"""Get all triggers that match an incoming event.
Args:
event: The inbound event.
event_source: The event source which emitted the event.
Returns:
The list of matching triggers.
"""
# get all event sources configured for this flavor
triggers: List[TriggerResponse] = depaginate(
partial(
self.zen_store.list_triggers,
trigger_filter_model=TriggerFilter(
event_source_id=event_source.id, is_active=True
),
hydrate=True,
)
)
trigger_list: List[TriggerResponse] = []
for trigger in triggers:
# For now, the matching of trigger filters vs event is implemented
# in each filter class. This is not ideal and should be refactored
# to a more generic solution that doesn't require the plugin
# implementation to be imported here.
try:
plugin_flavor = plugin_flavor_registry().get_flavor_class(
name=event_source.flavor,
_type=PluginType.EVENT_SOURCE,
subtype=event_source.plugin_subtype,
)
except KeyError:
logger.exception(
f"Could not find plugin flavor for event source "
f"{event_source.id} and flavor {event_source.flavor}. "
f"Skipping trigger {trigger.id}."
)
continue
assert issubclass(plugin_flavor, BaseEventSourceFlavor)
event_filter_config_class = plugin_flavor.EVENT_FILTER_CONFIG_CLASS
try:
event_filter = event_filter_config_class(
**trigger.event_filter if trigger.event_filter else {}
)
except ValidationError:
logger.exception(
f"Could not instantiate event filter config class for "
f"event source {event_source.id}. Skipping trigger "
f"{trigger.id}."
)
continue
if event_filter.event_matches_filter(event=event):
trigger_list.append(trigger)
logger.debug(
f"For event {event} and event source {event_source}, "
f"the following triggers matched: {trigger_list}"
)
return trigger_list
activate_trigger(self, trigger)
Add a trigger to the event hub.
Configure the event hub to trigger an action when an event is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
TriggerResponse |
the trigger to activate. |
required |
Source code in zenml/event_hub/event_hub.py
def activate_trigger(self, trigger: TriggerResponse) -> None:
"""Add a trigger to the event hub.
Configure the event hub to trigger an action when an event is received.
Args:
trigger: the trigger to activate.
"""
# We don't need to do anything here to change the event hub
# configuration. The in-server event hub already uses the database
# as the source of truth regarding configured active triggers.
deactivate_trigger(self, trigger)
Remove a trigger from the event hub.
Configure the event hub to stop triggering an action when an event is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trigger |
TriggerResponse |
the trigger to deactivate. |
required |
Source code in zenml/event_hub/event_hub.py
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
"""Remove a trigger from the event hub.
Configure the event hub to stop triggering an action when an event is
received.
Args:
trigger: the trigger to deactivate.
"""
# We don't need to do anything here to change the event hub
# configuration. The in-server event hub already uses the database
# as the source of truth regarding configured active triggers.
get_matching_active_triggers_for_event(self, event, event_source)
Get all triggers that match an incoming event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The inbound event. |
required |
event_source |
EventSourceResponse |
The event source which emitted the event. |
required |
Returns:
Type | Description |
---|---|
List[zenml.models.v2.core.trigger.TriggerResponse] |
The list of matching triggers. |
Source code in zenml/event_hub/event_hub.py
def get_matching_active_triggers_for_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> List[TriggerResponse]:
"""Get all triggers that match an incoming event.
Args:
event: The inbound event.
event_source: The event source which emitted the event.
Returns:
The list of matching triggers.
"""
# get all event sources configured for this flavor
triggers: List[TriggerResponse] = depaginate(
partial(
self.zen_store.list_triggers,
trigger_filter_model=TriggerFilter(
event_source_id=event_source.id, is_active=True
),
hydrate=True,
)
)
trigger_list: List[TriggerResponse] = []
for trigger in triggers:
# For now, the matching of trigger filters vs event is implemented
# in each filter class. This is not ideal and should be refactored
# to a more generic solution that doesn't require the plugin
# implementation to be imported here.
try:
plugin_flavor = plugin_flavor_registry().get_flavor_class(
name=event_source.flavor,
_type=PluginType.EVENT_SOURCE,
subtype=event_source.plugin_subtype,
)
except KeyError:
logger.exception(
f"Could not find plugin flavor for event source "
f"{event_source.id} and flavor {event_source.flavor}. "
f"Skipping trigger {trigger.id}."
)
continue
assert issubclass(plugin_flavor, BaseEventSourceFlavor)
event_filter_config_class = plugin_flavor.EVENT_FILTER_CONFIG_CLASS
try:
event_filter = event_filter_config_class(
**trigger.event_filter if trigger.event_filter else {}
)
except ValidationError:
logger.exception(
f"Could not instantiate event filter config class for "
f"event source {event_source.id}. Skipping trigger "
f"{trigger.id}."
)
continue
if event_filter.event_matches_filter(event=event):
trigger_list.append(trigger)
logger.debug(
f"For event {event} and event source {event_source}, "
f"the following triggers matched: {trigger_list}"
)
return trigger_list
publish_event(self, event, event_source)
Process an incoming event and trigger all configured actions.
This will first check for any subscribers/triggers for this event, then log the event for later reference and finally perform the configured action(s).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
BaseEvent |
The event. |
required |
event_source |
EventSourceResponse |
The event source that produced the event. |
required |
Source code in zenml/event_hub/event_hub.py
def publish_event(
self,
event: BaseEvent,
event_source: EventSourceResponse,
) -> None:
"""Process an incoming event and trigger all configured actions.
This will first check for any subscribers/triggers for this event,
then log the event for later reference and finally perform the
configured action(s).
Args:
event: The event.
event_source: The event source that produced the event.
"""
triggers = self.get_matching_active_triggers_for_event(
event=event, event_source=event_source
)
for trigger in triggers:
action_callback = self.action_handlers.get(
(trigger.action_flavor, trigger.action_subtype)
)
if not action_callback:
logger.error(
f"The event hub could not deliver event to action handler "
f"for flavor {trigger.action_flavor} and subtype "
f"{trigger.action_subtype} because no handler was found."
)
continue
self.trigger_action(
event=event,
event_source=event_source,
trigger=trigger,
action_callback=action_callback,
)