Skip to content

Event Hub

zenml.event_hub

ZenML Event Hub module.

The Event Hub is responsible for receiving all Events and dispatching them to the relevant Subscribers/Triggers.

Modules

base_event_hub

Base class for event hub implementations.

Classes
BaseEventHub

Bases: ABC

Base class for event hub implementations.

The event hub is responsible for relaying events from event sources to action handlers. It functions similarly to a pub/sub system where event source handlers publish events and action handlers subscribe to them.

The event hub also serves to decouple event sources from action handlers, allowing them to be configured independently and their implementations to be unaware of each other.

Attributes
zen_store: SqlZenStore property

Returns the active zen store.

Returns:

Type Description
SqlZenStore

The active zen store.

Raises:

Type Description
ValueError

If the active zen store is not a SQL zen store.

Functions
activate_trigger(trigger: TriggerResponse) -> None abstractmethod

Add a trigger to the event hub.

Configure the event hub to trigger an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to activate.

required
Source code in src/zenml/event_hub/base_event_hub.py
163
164
165
166
167
168
169
170
171
@abstractmethod
def activate_trigger(self, trigger: TriggerResponse) -> None:
    """Add a trigger to the event hub.

    Configure the event hub to trigger an action when an event is received.

    Args:
        trigger: the trigger to activate.
    """
deactivate_trigger(trigger: TriggerResponse) -> None abstractmethod

Remove a trigger from the event hub.

Configure the event hub to stop triggering an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to deactivate.

required
Source code in src/zenml/event_hub/base_event_hub.py
173
174
175
176
177
178
179
180
181
182
@abstractmethod
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
    """Remove a trigger from the event hub.

    Configure the event hub to stop triggering an action when an event is
    received.

    Args:
        trigger: the trigger to deactivate.
    """
publish_event(event: BaseEvent, event_source: EventSourceResponse) -> None abstractmethod

Publish an event to the event hub.

Parameters:

Name Type Description Default
event BaseEvent

The event.

required
event_source EventSourceResponse

The event source that produced the event.

required
Source code in src/zenml/event_hub/base_event_hub.py
184
185
186
187
188
189
190
191
192
193
194
195
@abstractmethod
def publish_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> None:
    """Publish an event to the event hub.

    Args:
        event: The event.
        event_source: The event source that produced the event.
    """
subscribe_action_handler(action_flavor: str, action_subtype: str, callback: ActionHandlerCallback) -> None

Subscribe an action handler to the event hub.

Parameters:

Name Type Description Default
action_flavor str

the flavor of the action to trigger.

required
action_subtype str

the subtype of the action to trigger.

required
callback ActionHandlerCallback

the action to trigger when the trigger is activated.

required
Source code in src/zenml/event_hub/base_event_hub.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def subscribe_action_handler(
    self,
    action_flavor: str,
    action_subtype: str,
    callback: ActionHandlerCallback,
) -> None:
    """Subscribe an action handler to the event hub.

    Args:
        action_flavor: the flavor of the action to trigger.
        action_subtype: the subtype of the action to trigger.
        callback: the action to trigger when the trigger is activated.
    """
    self.action_handlers[(action_flavor, action_subtype)] = callback
trigger_action(event: BaseEvent, event_source: EventSourceResponse, trigger: TriggerResponse, action_callback: ActionHandlerCallback) -> None

Trigger an action.

Parameters:

Name Type Description Default
event BaseEvent

The event.

required
event_source EventSourceResponse

The event source that produced the event.

required
trigger TriggerResponse

The trigger that was activated.

required
action_callback ActionHandlerCallback

The action to trigger.

required
Source code in src/zenml/event_hub/base_event_hub.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def trigger_action(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
    trigger: TriggerResponse,
    action_callback: ActionHandlerCallback,
) -> None:
    """Trigger an action.

    Args:
        event: The event.
        event_source: The event source that produced the event.
        trigger: The trigger that was activated.
        action_callback: The action to trigger.
    """
    request = TriggerExecutionRequest(
        trigger=trigger.id, event_metadata=event.model_dump()
    )

    action_config = trigger.action.configuration

    trigger_execution = self.zen_store.create_trigger_execution(request)

    # Generate an API token that can be used by external workloads
    # implementing the action to authenticate with the server. This token
    # is associated with the service account configured for the trigger
    # and has a validity defined by the trigger's authentication window.
    token = JWTToken(
        user_id=trigger.action.service_account.id,
    )
    expires: Optional[datetime] = None
    if trigger.action.auth_window:
        expires = utc_now() + timedelta(minutes=trigger.action.auth_window)
    encoded_token = token.encode(expires=expires)
    auth_context = AuthContext(
        user=trigger.action.service_account,
        access_token=token,
        encoded_access_token=encoded_token,
    )

    try:
        # TODO: We need to make this async, as this might take quite some
        # time per trigger. We can either use threads starting here, or
        # use fastapi background tasks that get passed here instead of
        # running the event hub as a background tasks in the webhook
        # endpoints
        action_callback(
            action_config,
            trigger_execution,
            auth_context,
        )
    except Exception:
        # Don't let action errors stop the event hub from working
        logger.exception(
            f"An error occurred while executing trigger {trigger}."
        )
unsubscribe_action_handler(action_flavor: str, action_subtype: str) -> None

Unsubscribe an action handler from the event hub.

Parameters:

Name Type Description Default
action_flavor str

the flavor of the action to trigger.

required
action_subtype str

the subtype of the action to trigger.

required
Source code in src/zenml/event_hub/base_event_hub.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def unsubscribe_action_handler(
    self,
    action_flavor: str,
    action_subtype: str,
) -> None:
    """Unsubscribe an action handler from the event hub.

    Args:
        action_flavor: the flavor of the action to trigger.
        action_subtype: the subtype of the action to trigger.
    """
    self.action_handlers.pop((action_flavor, action_subtype), None)
Functions

event_hub

Base class for all the Event Hub.

Classes
InternalEventHub

Bases: BaseEventHub

Internal in-server event hub implementation.

The internal in-server event hub uses the database as a source of truth for configured triggers and triggers actions by calling the action handlers directly.

Functions
activate_trigger(trigger: TriggerResponse) -> None

Add a trigger to the event hub.

Configure the event hub to trigger an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to activate.

required
Source code in src/zenml/event_hub/event_hub.py
48
49
50
51
52
53
54
55
def activate_trigger(self, trigger: TriggerResponse) -> None:
    """Add a trigger to the event hub.

    Configure the event hub to trigger an action when an event is received.

    Args:
        trigger: the trigger to activate.
    """
deactivate_trigger(trigger: TriggerResponse) -> None

Remove a trigger from the event hub.

Configure the event hub to stop triggering an action when an event is received.

Parameters:

Name Type Description Default
trigger TriggerResponse

the trigger to deactivate.

required
Source code in src/zenml/event_hub/event_hub.py
60
61
62
63
64
65
66
67
68
def deactivate_trigger(self, trigger: TriggerResponse) -> None:
    """Remove a trigger from the event hub.

    Configure the event hub to stop triggering an action when an event is
    received.

    Args:
        trigger: the trigger to deactivate.
    """
get_matching_active_triggers_for_event(event: BaseEvent, event_source: EventSourceResponse) -> List[TriggerResponse]

Get all triggers that match an incoming event.

Parameters:

Name Type Description Default
event BaseEvent

The inbound event.

required
event_source EventSourceResponse

The event source which emitted the event.

required

Returns:

Type Description
List[TriggerResponse]

The list of matching triggers.

Source code in src/zenml/event_hub/event_hub.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def get_matching_active_triggers_for_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> List[TriggerResponse]:
    """Get all triggers that match an incoming event.

    Args:
        event: The inbound event.
        event_source: The event source which emitted the event.

    Returns:
        The list of matching triggers.
    """
    # get all event sources configured for this flavor
    triggers: List[TriggerResponse] = depaginate(
        self.zen_store.list_triggers,
        trigger_filter_model=TriggerFilter(
            project=event_source.project.id,
            event_source_id=event_source.id,
            is_active=True,
        ),
        hydrate=True,
    )

    trigger_list: List[TriggerResponse] = []

    for trigger in triggers:
        # For now, the matching of trigger filters vs event is implemented
        # in each filter class. This is not ideal and should be refactored
        # to a more generic solution that doesn't require the plugin
        # implementation to be imported here.
        try:
            plugin_flavor = plugin_flavor_registry().get_flavor_class(
                name=event_source.flavor,
                _type=PluginType.EVENT_SOURCE,
                subtype=event_source.plugin_subtype,
            )
        except KeyError:
            logger.exception(
                f"Could not find plugin flavor for event source "
                f"{event_source.id} and flavor {event_source.flavor}. "
                f"Skipping trigger {trigger.id}."
            )
            continue

        assert issubclass(plugin_flavor, BaseEventSourceFlavor)

        event_filter_config_class = plugin_flavor.EVENT_FILTER_CONFIG_CLASS
        try:
            event_filter = event_filter_config_class(
                **trigger.event_filter if trigger.event_filter else {}
            )
        except ValidationError:
            logger.exception(
                f"Could not instantiate event filter config class for "
                f"event source {event_source.id}. Skipping trigger "
                f"{trigger.id}."
            )
            continue

        if event_filter.event_matches_filter(event=event):
            trigger_list.append(trigger)

    logger.debug(
        f"For event {event} and event source {event_source}, "
        f"the following triggers matched: {trigger_list}"
    )

    return trigger_list
publish_event(event: BaseEvent, event_source: EventSourceResponse) -> None

Process an incoming event and trigger all configured actions.

This will first check for any subscribers/triggers for this event, then log the event for later reference and finally perform the configured action(s).

Parameters:

Name Type Description Default
event BaseEvent

The event.

required
event_source EventSourceResponse

The event source that produced the event.

required
Source code in src/zenml/event_hub/event_hub.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def publish_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> None:
    """Process an incoming event and trigger all configured actions.

    This will first check for any subscribers/triggers for this event,
    then log the event for later reference and finally perform the
    configured action(s).

    Args:
        event: The event.
        event_source: The event source that produced the event.
    """
    triggers = self.get_matching_active_triggers_for_event(
        event=event, event_source=event_source
    )

    for trigger in triggers:
        action_callback = self.action_handlers.get(
            (trigger.action_flavor, trigger.action_subtype)
        )
        if not action_callback:
            logger.error(
                f"The event hub could not deliver event to action handler "
                f"for flavor {trigger.action_flavor} and subtype "
                f"{trigger.action_subtype} because no handler was found."
            )
            continue

        self.trigger_action(
            event=event,
            event_source=event_source,
            trigger=trigger,
            action_callback=action_callback,
        )
Functions