Skip to content

Event Sources

zenml.event_sources

Base Classes for Event Sources.

Modules

base_event

Base implementation for events.

Classes
BaseEvent

Bases: BaseModel

Base class for all inbound events.

base_event_source

Base implementation for event sources.

Classes
BaseEventSourceFlavor

Bases: BasePluginFlavor, ABC

Base Event Plugin Flavor to access an event plugin along with its configurations.

Functions
get_event_filter_config_schema() -> Dict[str, Any] classmethod

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

Source code in src/zenml/event_sources/base_event_source.py
653
654
655
656
657
658
659
660
@classmethod
def get_event_filter_config_schema(cls) -> Dict[str, Any]:
    """The config schema for a flavor.

    Returns:
        The config schema.
    """
    return cls.EVENT_SOURCE_CONFIG_CLASS.model_json_schema()
get_event_source_config_schema() -> Dict[str, Any] classmethod

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

Source code in src/zenml/event_sources/base_event_source.py
662
663
664
665
666
667
668
669
@classmethod
def get_event_source_config_schema(cls) -> Dict[str, Any]:
    """The config schema for a flavor.

    Returns:
        The config schema.
    """
    return cls.EVENT_FILTER_CONFIG_CLASS.model_json_schema()
get_flavor_response_model(hydrate: bool) -> EventSourceFlavorResponse classmethod

Convert the Flavor into a Response Model.

Parameters:

Name Type Description Default
hydrate bool

Whether the model should be hydrated.

required

Returns:

Type Description
EventSourceFlavorResponse

The Flavor Response model for the Event Source implementation

Source code in src/zenml/event_sources/base_event_source.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
@classmethod
def get_flavor_response_model(
    cls, hydrate: bool
) -> EventSourceFlavorResponse:
    """Convert the Flavor into a Response Model.

    Args:
        hydrate: Whether the model should be hydrated.

    Returns:
        The Flavor Response model for the Event Source implementation
    """
    metadata = None
    if hydrate:
        metadata = EventSourceFlavorResponseMetadata(
            source_config_schema=cls.get_event_source_config_schema(),
            filter_config_schema=cls.get_event_filter_config_schema(),
        )
    return EventSourceFlavorResponse(
        body=EventSourceFlavorResponseBody(),
        metadata=metadata,
        name=cls.FLAVOR,
        type=cls.TYPE,
        subtype=cls.SUBTYPE,
    )
BaseEventSourceHandler(event_hub: Optional[BaseEventHub] = None)

Bases: BasePlugin, ABC

Base event source handler implementation.

This class provides a base implementation for event source handlers.

The base event source handler acts as an intermediary between the REST API endpoints and the ZenML store for all operations related to event sources. It implements all operations related creating, updating, deleting, and fetching event sources from the database. It also provides a set of methods that can be overridden by concrete event source handlers that need to react to or participate in the lifecycle of an event source:

  • _validate_event_source_request: validate and/or modify an event source before it is created (before it is persisted in the database)
  • _process_event_source_request: react to a new event source being created (after it is persisted in the database)
  • _validate_event_source_update: validate and/or modify an event source update before it is applied (before the update is saved in the database)
  • _process_event_source_update: react to an event source being updated (after the update is saved in the database)
  • _process_event_source_delete: react to an event source being deleted (before it is deleted from the database)
  • _process_event_source_response: modify an event source before it is returned to the user

In addition to optionally overriding these methods, every event source handler must define and provide configuration classes for the event source and the event filter. These are used to validate and instantiate the configuration from the user requests.

Finally, since event source handlers are also sources of events, the base class provides methods that implementations can use to dispatch events to the central event hub where they can be processed and eventually trigger actions.

Event source handler initialization.

Parameters:

Name Type Description Default
event_hub Optional[BaseEventHub]

Optional event hub to use to initialize the event source handler. If not set during initialization, it may be set at a later time by calling set_event_hub. An event hub must be configured before the event handler needs to dispatch events.

None
Source code in src/zenml/event_sources/base_event_source.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
    """Event source handler initialization.

    Args:
        event_hub: Optional event hub to use to initialize the event source
            handler. If not set during initialization, it may be set
            at a later time by calling `set_event_hub`. An event hub must
            be configured before the event handler needs to dispatch events.
    """
    super().__init__()
    if event_hub is None:
        from zenml.event_hub.event_hub import (
            event_hub as default_event_hub,
        )

        # TODO: for now, we use the default internal event hub. In
        # the future, this should be configurable.
        event_hub = default_event_hub

    self.set_event_hub(event_hub)
Attributes
config_class: Type[EventSourceConfig] abstractmethod property

Returns the event source configuration class.

Returns:

Type Description
Type[EventSourceConfig]

The configuration.

event_hub: BaseEventHub property

Get the event hub used to dispatch events.

Returns:

Type Description
BaseEventHub

The event hub.

Raises:

Type Description
RuntimeError

if an event hub isn't configured.

filter_class: Type[EventFilterConfig] abstractmethod property

Returns the event filter configuration class.

Returns:

Type Description
Type[EventFilterConfig]

The event filter configuration class.

flavor_class: Type[BaseEventSourceFlavor] abstractmethod property

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BaseEventSourceFlavor]

The flavor class of the plugin.

Functions
create_event_source(event_source: EventSourceRequest) -> EventSourceResponse

Process an event source request and create the event source in the database.

Parameters:

Name Type Description Default
event_source EventSourceRequest

Event source request.

required

Returns:

Type Description
EventSourceResponse

The created event source.

noqa: DAR401
Source code in src/zenml/event_sources/base_event_source.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def create_event_source(
    self, event_source: EventSourceRequest
) -> EventSourceResponse:
    """Process an event source request and create the event source in the database.

    Args:
        event_source: Event source request.

    Returns:
        The created event source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the request
    config = self.validate_event_source_configuration(
        event_source.configuration
    )
    # Call the implementation specific method to validate the request
    # before it is sent to the database
    self._validate_event_source_request(
        event_source=event_source, config=config
    )
    # Serialize the configuration back into the request
    event_source.configuration = config.model_dump(exclude_none=True)
    # Create the event source in the database
    event_source_response = self.zen_store.create_event_source(
        event_source=event_source
    )
    try:
        # Instantiate the configuration from the response
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        # Call the implementation specific method to process the created
        # event source
        self._process_event_source_request(
            event_source=event_source_response, config=config
        )
    except Exception:
        # If the event source creation fails, delete the event source from
        # the database
        logger.exception(
            f"Failed to create event source {event_source_response}. "
            f"Deleting the event source."
        )
        self.zen_store.delete_event_source(
            event_source_id=event_source_response.id
        )
        raise

    # Serialize the configuration back into the response
    event_source_response.set_configuration(
        config.model_dump(exclude_none=True)
    )

    # Return the response to the user
    return event_source_response
delete_event_source(event_source: EventSourceResponse, force: bool = False) -> None

Process an event source delete request and delete the event source in the database.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source to delete.

required
force bool

Whether to force delete the event source from the database even if the event source handler fails to delete the event source.

False
noqa: DAR401
Source code in src/zenml/event_sources/base_event_source.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def delete_event_source(
    self,
    event_source: EventSourceResponse,
    force: bool = False,
) -> None:
    """Process an event source delete request and delete the event source in the database.

    Args:
        event_source: The event source to delete.
        force: Whether to force delete the event source from the database
            even if the event source handler fails to delete the event
            source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_event_source_configuration(
        event_source.configuration
    )
    try:
        # Call the implementation specific method to process the deleted
        # event source before it is deleted from the database
        self._process_event_source_delete(
            event_source=event_source,
            config=config,
            force=force,
        )
    except Exception:
        logger.exception(f"Failed to delete event source {event_source}. ")
        if not force:
            raise

        logger.warning(f"Force deleting event source {event_source}.")

    # Delete the event source from the database
    self.zen_store.delete_event_source(
        event_source_id=event_source.id,
    )
dispatch_event(event: BaseEvent, event_source: EventSourceResponse) -> None

Dispatch an event to all active triggers that match the event.

Parameters:

Name Type Description Default
event BaseEvent

The event to dispatch.

required
event_source EventSourceResponse

The event source that produced the event.

required
Source code in src/zenml/event_sources/base_event_source.py
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
def dispatch_event(
    self,
    event: BaseEvent,
    event_source: EventSourceResponse,
) -> None:
    """Dispatch an event to all active triggers that match the event.

    Args:
        event: The event to dispatch.
        event_source: The event source that produced the event.
    """
    if not event_source.is_active:
        logger.debug(
            f"Event source {event_source.id} is not active. Skipping event "
            f"dispatch."
        )
        return

    self.event_hub.publish_event(
        event=event,
        event_source=event_source,
    )
get_event_source(event_source: EventSourceResponse, hydrate: bool = False) -> EventSourceResponse

Process an event source response before it is returned to the user.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source fetched from the database.

required
hydrate bool

Whether to hydrate the event source.

False

Returns:

Type Description
EventSourceResponse

The event source.

Source code in src/zenml/event_sources/base_event_source.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_event_source(
    self, event_source: EventSourceResponse, hydrate: bool = False
) -> EventSourceResponse:
    """Process an event source response before it is returned to the user.

    Args:
        event_source: The event source fetched from the database.
        hydrate: Whether to hydrate the event source.

    Returns:
        The event source.
    """
    if hydrate:
        # Instantiate the configuration from the response
        config = self.validate_event_source_configuration(
            event_source.configuration
        )
        # Call the implementation specific method to process the response
        self._process_event_source_response(
            event_source=event_source, config=config
        )
        # Serialize the configuration back into the response
        event_source.set_configuration(
            config.model_dump(exclude_none=True)
        )

    # Return the response to the user
    return event_source
set_event_hub(event_hub: BaseEventHub) -> None

Configure an event hub for this event source plugin.

Parameters:

Name Type Description Default
event_hub BaseEventHub

Event hub to be used by this event handler to dispatch events.

required
Source code in src/zenml/event_sources/base_event_source.py
180
181
182
183
184
185
186
187
def set_event_hub(self, event_hub: BaseEventHub) -> None:
    """Configure an event hub for this event source plugin.

    Args:
        event_hub: Event hub to be used by this event handler to dispatch
            events.
    """
    self._event_hub = event_hub
update_event_source(event_source: EventSourceResponse, event_source_update: EventSourceUpdate) -> EventSourceResponse

Process an event source update request and update the event source in the database.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source to update.

required
event_source_update EventSourceUpdate

The update to be applied to the event source.

required

Returns:

Type Description
EventSourceResponse

The updated event source.

noqa: DAR401
Source code in src/zenml/event_sources/base_event_source.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def update_event_source(
    self,
    event_source: EventSourceResponse,
    event_source_update: EventSourceUpdate,
) -> EventSourceResponse:
    """Process an event source update request and update the event source in the database.

    Args:
        event_source: The event source to update.
        event_source_update: The update to be applied to the event source.

    Returns:
        The updated event source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_event_source_configuration(
        event_source.configuration
    )
    # Validate and instantiate the configuration from the update request
    # NOTE: if supplied, the configuration update is a full replacement
    # of the original configuration
    config_update = config
    if event_source_update.configuration is not None:
        config_update = self.validate_event_source_configuration(
            event_source_update.configuration
        )
    # Call the implementation specific method to validate the update request
    # before it is sent to the database
    self._validate_event_source_update(
        event_source=event_source,
        config=config,
        event_source_update=event_source_update,
        config_update=config_update,
    )
    # Serialize the configuration update back into the update request
    event_source_update.configuration = config_update.model_dump(
        exclude_none=True
    )

    # Update the event source in the database
    event_source_response = self.zen_store.update_event_source(
        event_source_id=event_source.id,
        event_source_update=event_source_update,
    )
    try:
        # Instantiate the configuration from the response
        response_config = self.validate_event_source_configuration(
            event_source_response.configuration
        )
        # Call the implementation specific method to process the update
        # request before it is sent to the database
        self._process_event_source_update(
            event_source=event_source_response,
            config=response_config,
            previous_event_source=event_source,
            previous_config=config,
        )
    except Exception:
        # If the event source update fails, roll back the event source in
        # the database to the original state
        logger.exception(
            f"Failed to update event source {event_source}. "
            f"Rolling back the event source to the previous state."
        )
        self.zen_store.update_event_source(
            event_source_id=event_source.id,
            event_source_update=EventSourceUpdate.from_response(
                event_source
            ),
        )
        raise

    # Serialize the configuration back into the response
    event_source_response.set_configuration(
        response_config.model_dump(exclude_none=True)
    )
    # Return the response to the user
    return event_source_response
validate_event_filter_configuration(configuration: Dict[str, Any]) -> EventFilterConfig

Validate and return the configuration of an event filter.

Parameters:

Name Type Description Default
configuration Dict[str, Any]

The configuration to validate.

required

Returns:

Type Description
EventFilterConfig

Instantiated event filter config.

Raises:

Type Description
ValueError

if the configuration is invalid.

Source code in src/zenml/event_sources/base_event_source.py
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def validate_event_filter_configuration(
    self,
    configuration: Dict[str, Any],
) -> EventFilterConfig:
    """Validate and return the configuration of an event filter.

    Args:
        configuration: The configuration to validate.

    Returns:
        Instantiated event filter config.

    Raises:
        ValueError: if the configuration is invalid.
    """
    try:
        return self.filter_class(**configuration)
    except ValueError as e:
        raise ValueError(
            f"Invalid configuration for event filter: {e}."
        ) from e
validate_event_source_configuration(event_source_config: Dict[str, Any]) -> EventSourceConfig

Validate and return the event source configuration.

Parameters:

Name Type Description Default
event_source_config Dict[str, Any]

The event source configuration to validate.

required

Returns:

Type Description
EventSourceConfig

The validated event source configuration.

Raises:

Type Description
ValueError

if the configuration is invalid.

Source code in src/zenml/event_sources/base_event_source.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
def validate_event_source_configuration(
    self, event_source_config: Dict[str, Any]
) -> EventSourceConfig:
    """Validate and return the event source configuration.

    Args:
        event_source_config: The event source configuration to validate.

    Returns:
        The validated event source configuration.

    Raises:
        ValueError: if the configuration is invalid.
    """
    try:
        return self.config_class(**event_source_config)
    except ValueError as e:
        raise ValueError(
            f"Invalid configuration for event source: {e}."
        ) from e
EventFilterConfig

Bases: BaseModel, ABC

The Event Filter configuration.

Functions
event_matches_filter(event: BaseEvent) -> bool abstractmethod

All implementations need to implement this check.

If the filter matches the inbound event instance, this should return True, else False.

Parameters:

Name Type Description Default
event BaseEvent

The inbound event instance.

required

Returns:

Type Description
bool

Whether the filter matches the event.

Source code in src/zenml/event_sources/base_event_source.py
57
58
59
60
61
62
63
64
65
66
67
68
69
@abstractmethod
def event_matches_filter(self, event: BaseEvent) -> bool:
    """All implementations need to implement this check.

    If the filter matches the inbound event instance, this should
    return True, else False.

    Args:
        event: The inbound event instance.

    Returns:
        Whether the filter matches the event.
    """
EventSourceConfig

Bases: BasePluginConfig

The Event Source configuration.

Functions

webhooks

Base Classes for Webhook Event Sources.

Modules
base_webhook_event_source

Abstract BaseEvent class that all Event implementations must implement.

Classes
BaseWebhookEvent

Bases: BaseEvent

Base class for all inbound events.

BaseWebhookEventSourceFlavor

Bases: BaseEventSourceFlavor, ABC

Base Event Plugin Flavor to access an event plugin along with its configurations.

BaseWebhookEventSourceHandler(event_hub: Optional[BaseEventHub] = None)

Bases: BaseEventSourceHandler, ABC

Base implementation for all Webhook event sources.

Source code in src/zenml/event_sources/base_event_source.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
    """Event source handler initialization.

    Args:
        event_hub: Optional event hub to use to initialize the event source
            handler. If not set during initialization, it may be set
            at a later time by calling `set_event_hub`. An event hub must
            be configured before the event handler needs to dispatch events.
    """
    super().__init__()
    if event_hub is None:
        from zenml.event_hub.event_hub import (
            event_hub as default_event_hub,
        )

        # TODO: for now, we use the default internal event hub. In
        # the future, this should be configurable.
        event_hub = default_event_hub

    self.set_event_hub(event_hub)
Attributes
config_class: Type[WebhookEventSourceConfig] abstractmethod property

Returns the webhook event source configuration class.

Returns:

Type Description
Type[WebhookEventSourceConfig]

The configuration.

filter_class: Type[WebhookEventFilterConfig] abstractmethod property

Returns the webhook event filter configuration class.

Returns:

Type Description
Type[WebhookEventFilterConfig]

The event filter configuration class.

flavor_class: Type[BaseWebhookEventSourceFlavor] abstractmethod property

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BaseWebhookEventSourceFlavor]

The flavor class of the plugin.

Functions
is_valid_signature(raw_body: bytes, secret_token: str, signature_header: str) -> bool

Verify the SHA256 signature of the payload.

Parameters:

Name Type Description Default
raw_body bytes

original request body to verify

required
secret_token str

secret token used to generate the signature

required
signature_header str

signature header to verify (x-hub-signature-256)

required

Returns:

Type Description
bool

Whether if the signature is valid.

Source code in src/zenml/event_sources/webhooks/base_webhook_event_source.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def is_valid_signature(
    self, raw_body: bytes, secret_token: str, signature_header: str
) -> bool:
    """Verify the SHA256 signature of the payload.

    Args:
        raw_body: original request body to verify
        secret_token: secret token used to generate the signature
        signature_header: signature header to verify (x-hub-signature-256)

    Returns:
        Whether if the signature is valid.
    """
    hash_object = hmac.new(
        secret_token.encode("utf-8"),
        msg=raw_body,
        digestmod=hashlib.sha256,
    )
    expected_signature = "sha256=" + hash_object.hexdigest()

    if not hmac.compare_digest(expected_signature, signature_header):
        return False
    return True
process_webhook_event(event_source: EventSourceResponse, raw_body: bytes, headers: Dict[str, str]) -> None

Process an incoming webhook event.

Parameters:

Name Type Description Default
event_source EventSourceResponse

The event source that the event belongs to.

required
raw_body bytes

The raw inbound webhook event.

required
headers Dict[str, str]

The headers of the inbound webhook event.

required
Source code in src/zenml/event_sources/webhooks/base_webhook_event_source.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def process_webhook_event(
    self,
    event_source: EventSourceResponse,
    raw_body: bytes,
    headers: Dict[str, str],
) -> None:
    """Process an incoming webhook event.

    Args:
        event_source: The event source that the event belongs to.
        raw_body: The raw inbound webhook event.
        headers: The headers of the inbound webhook event.
    """
    json_body = self._load_payload(raw_body=raw_body, headers=headers)

    webhook_secret = self._get_webhook_secret(event_source)
    if webhook_secret:
        self._validate_webhook_event_signature(
            raw_body=raw_body,
            headers=headers,
            webhook_secret=webhook_secret,
        )

    event = self._interpret_event(json_body)

    self.dispatch_event(
        event=event,
        event_source=event_source,
    )
WebhookEventFilterConfig

Bases: EventFilterConfig

The Event Filter configuration.

WebhookEventSourceConfig

Bases: EventSourceConfig

The Event Source configuration.

Functions