Skip to content

Actions

zenml.actions

Actions allow configuring a given action for later execution.

Modules

base_action

Base implementation of actions.

Classes
ActionConfig

Bases: BasePluginConfig

Allows configuring the action configuration.

BaseActionFlavor

Bases: BasePluginFlavor, ABC

Base Action Flavor to register Action Configurations.

Functions
get_action_config_schema() -> Dict[str, Any] classmethod

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

Source code in src/zenml/actions/base_action.py
61
62
63
64
65
66
67
68
@classmethod
def get_action_config_schema(cls) -> Dict[str, Any]:
    """The config schema for a flavor.

    Returns:
        The config schema.
    """
    return cls.ACTION_CONFIG_CLASS.model_json_schema()
get_flavor_response_model(hydrate: bool) -> ActionFlavorResponse classmethod

Convert the Flavor into a Response Model.

Parameters:

Name Type Description Default
hydrate bool

Whether the model should be hydrated.

required

Returns:

Type Description
ActionFlavorResponse

The response model.

Source code in src/zenml/actions/base_action.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@classmethod
def get_flavor_response_model(cls, hydrate: bool) -> ActionFlavorResponse:
    """Convert the Flavor into a Response Model.

    Args:
        hydrate: Whether the model should be hydrated.

    Returns:
        The response model.
    """
    metadata = None
    if hydrate:
        metadata = ActionFlavorResponseMetadata(
            config_schema=cls.get_action_config_schema(),
        )
    return ActionFlavorResponse(
        body=ActionFlavorResponseBody(),
        metadata=metadata,
        name=cls.FLAVOR,
        type=cls.TYPE,
        subtype=cls.SUBTYPE,
    )
BaseActionHandler(event_hub: Optional[BaseEventHub] = None)

Bases: BasePlugin, ABC

Implementation for an action handler.

Event source handler initialization.

Parameters:

Name Type Description Default
event_hub Optional[BaseEventHub]

Optional event hub to use to initialize the event source handler. If not set during initialization, it may be set at a later time by calling set_event_hub. An event hub must be configured before the event handler needs to dispatch events.

None
Source code in src/zenml/actions/base_action.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
    """Event source handler initialization.

    Args:
        event_hub: Optional event hub to use to initialize the event source
            handler. If not set during initialization, it may be set
            at a later time by calling `set_event_hub`. An event hub must
            be configured before the event handler needs to dispatch events.
    """
    super().__init__()
    if event_hub is None:
        from zenml.event_hub.event_hub import (
            event_hub as default_event_hub,
        )

        # TODO: for now, we use the default internal event hub. In
        # the future, this should be configurable.
        event_hub = default_event_hub

    self.set_event_hub(event_hub)
Attributes
config_class: Type[ActionConfig] abstractmethod property

Returns the BasePluginConfig config.

Returns:

Type Description
Type[ActionConfig]

The configuration.

event_hub: BaseEventHub property

Get the event hub used to dispatch events.

Returns:

Type Description
BaseEventHub

The event hub.

Raises:

Type Description
RuntimeError

if an event hub isn't configured.

flavor_class: Type[BaseActionFlavor] abstractmethod property

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BaseActionFlavor]

The flavor class of the plugin.

Functions
create_action(action: ActionRequest) -> ActionResponse

Process a action request and create the action in the database.

Parameters:

Name Type Description Default
action ActionRequest

Action request.

required

Raises:

Type Description
Exception

If the implementation specific processing before creating the action fails.

Returns:

Type Description
ActionResponse

The created action.

Source code in src/zenml/actions/base_action.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def create_action(self, action: ActionRequest) -> ActionResponse:
    """Process a action request and create the action in the database.

    Args:
        action: Action request.

    Raises:
        Exception: If the implementation specific processing before creating
            the action fails.

    Returns:
        The created action.
    """
    # Validate and instantiate the configuration from the request
    config = self.validate_action_configuration(action.configuration)
    # Call the implementation specific method to validate the request
    # before it is sent to the database
    self._validate_action_request(action=action, config=config)
    # Serialize the configuration back into the request
    action.configuration = config.model_dump(exclude_none=True)
    # Create the action in the database
    action_response = self.zen_store.create_action(action=action)
    try:
        # Instantiate the configuration from the response
        config = self.validate_action_configuration(action.configuration)
        # Call the implementation specific method to process the created
        # action
        self._process_action_request(action=action_response, config=config)
        # Add any implementation specific related resources to the action
        # response
        self._populate_action_response_resources(
            action=action_response,
            config=config,
        )
    except Exception:
        # If the action creation fails, delete the action from
        # the database
        logger.exception(
            f"Failed to create action {action_response}. "
            f"Deleting the action."
        )
        self.zen_store.delete_action(action_id=action_response.id)
        raise

    # Serialize the configuration back into the response
    action_response.set_configuration(config.model_dump(exclude_none=True))
    # Return the response to the user
    return action_response
delete_action(action: ActionResponse, force: bool = False) -> None

Process action delete request and delete the action in the database.

Parameters:

Name Type Description Default
action ActionResponse

The action to delete.

required
force bool

Whether to force delete the action from the database even if the action handler fails to delete the event source.

False

Raises:

Type Description
Exception

If the implementation specific processing before deleting the action fails.

Source code in src/zenml/actions/base_action.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def delete_action(
    self,
    action: ActionResponse,
    force: bool = False,
) -> None:
    """Process action delete request and delete the action in the database.

    Args:
        action: The action to delete.
        force: Whether to force delete the action from the database
            even if the action handler fails to delete the event
            source.

    Raises:
        Exception: If the implementation specific processing before deleting
            the action fails.
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_action_configuration(action.configuration)
    try:
        # Call the implementation specific method to process the deleted
        # action before it is deleted from the database
        self._process_action_delete(
            action=action,
            config=config,
            force=force,
        )
    except Exception:
        logger.exception(f"Failed to delete action {action}. ")
        if not force:
            raise

        logger.warning(f"Force deleting action {action}.")

    # Delete the action from the database
    self.zen_store.delete_action(
        action_id=action.id,
    )
event_hub_callback(config: Dict[str, Any], trigger_execution: TriggerExecutionResponse, auth_context: AuthContext) -> None

Callback to be used by the event hub to dispatch events to the action handler.

Parameters:

Name Type Description Default
config Dict[str, Any]

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in src/zenml/actions/base_action.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def event_hub_callback(
    self,
    config: Dict[str, Any],
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Callback to be used by the event hub to dispatch events to the action handler.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
    try:
        config_obj = self.config_class(**config)
    except ValueError as e:
        logger.exception(
            f"Action handler {self.flavor_class.FLAVOR} of type "
            f"{self.flavor_class.SUBTYPE} received an invalid configuration "
            f"from the event hub: {e}."
        )
        return

    try:
        # TODO: this would be a great place to convert the event back into its
        # original form and pass it to the action handler.
        self.run(
            config=config_obj,
            trigger_execution=trigger_execution,
            auth_context=auth_context,
        )
    except Exception:
        # Don't let the event hub crash if the action handler fails
        # TODO: we might want to return a value here indicating to the event
        # hub that the action handler failed to execute the action. This can
        # be used by the event hub to retry the action handler or to log the
        # failure.
        logger.exception(
            f"Action handler {self.flavor_class.FLAVOR} of type "
            f"{self.flavor_class.SUBTYPE} failed to execute the action "
            f"with configuration {config}."
        )
extract_resources(action_config: ActionConfig, hydrate: bool = False) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]

Extract related resources for this action.

Parameters:

Name Type Description Default
action_config ActionConfig

Action configuration from which to extract related resources.

required
hydrate bool

Whether to hydrate the resource models.

False

Returns:

Type Description
Dict[ResourceType, BaseResponse[Any, Any, Any]]

List of resources related to the action.

Source code in src/zenml/actions/base_action.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def extract_resources(
    self,
    action_config: ActionConfig,
    hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
    """Extract related resources for this action.

    Args:
        action_config: Action configuration from which to extract related
            resources.
        hydrate: Whether to hydrate the resource models.

    Returns:
        List of resources related to the action.
    """
    return {}
get_action(action: ActionResponse, hydrate: bool = False) -> ActionResponse

Process a action response before it is returned to the user.

Parameters:

Name Type Description Default
action ActionResponse

The action fetched from the database.

required
hydrate bool

Whether to hydrate the action.

False

Returns:

Type Description
ActionResponse

The action.

Source code in src/zenml/actions/base_action.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def get_action(
    self, action: ActionResponse, hydrate: bool = False
) -> ActionResponse:
    """Process a action response before it is returned to the user.

    Args:
        action: The action fetched from the database.
        hydrate: Whether to hydrate the action.

    Returns:
        The action.
    """
    if hydrate:
        # Instantiate the configuration from the response
        config = self.validate_action_configuration(action.configuration)
        # Call the implementation specific method to process the response
        self._process_action_response(action=action, config=config)
        # Serialize the configuration back into the response
        action.set_configuration(config.model_dump(exclude_none=True))
        # Add any implementation specific related resources to the action
        # response
        self._populate_action_response_resources(
            action=action, config=config
        )

    # Return the response to the user
    return action
run(config: ActionConfig, trigger_execution: TriggerExecutionResponse, auth_context: AuthContext) -> None abstractmethod

Execute an action.

Parameters:

Name Type Description Default
config ActionConfig

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in src/zenml/actions/base_action.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@abstractmethod
def run(
    self,
    config: ActionConfig,
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Execute an action.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
set_event_hub(event_hub: BaseEventHub) -> None

Configure an event hub for this event source plugin.

Parameters:

Name Type Description Default
event_hub BaseEventHub

Event hub to be used by this event handler to dispatch events.

required
Source code in src/zenml/actions/base_action.py
160
161
162
163
164
165
166
167
168
169
170
171
172
def set_event_hub(self, event_hub: BaseEventHub) -> None:
    """Configure an event hub for this event source plugin.

    Args:
        event_hub: Event hub to be used by this event handler to dispatch
            events.
    """
    self._event_hub = event_hub
    self._event_hub.subscribe_action_handler(
        action_flavor=self.flavor_class.FLAVOR,
        action_subtype=self.flavor_class.SUBTYPE,
        callback=self.event_hub_callback,
    )
update_action(action: ActionResponse, action_update: ActionUpdate) -> ActionResponse

Process action update and update the action in the database.

Parameters:

Name Type Description Default
action ActionResponse

The action to update.

required
action_update ActionUpdate

The update to be applied to the action.

required

Raises:

Type Description
Exception

If the implementation specific processing before updating the action fails.

Returns:

Type Description
ActionResponse

The updated action.

Source code in src/zenml/actions/base_action.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def update_action(
    self,
    action: ActionResponse,
    action_update: ActionUpdate,
) -> ActionResponse:
    """Process action update and update the action in the database.

    Args:
        action: The action to update.
        action_update: The update to be applied to the action.

    Raises:
        Exception: If the implementation specific processing before updating
            the action fails.

    Returns:
        The updated action.
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_action_configuration(action.configuration)
    # Validate and instantiate the configuration from the update request
    # NOTE: if supplied, the configuration update is a full replacement
    # of the original configuration
    config_update = config
    if action_update.configuration is not None:
        config_update = self.validate_action_configuration(
            action_update.configuration
        )
    # Call the implementation specific method to validate the update request
    # before it is sent to the database
    self._validate_action_update(
        action=action,
        config=config,
        action_update=action_update,
        config_update=config_update,
    )
    # Serialize the configuration update back into the update request
    action_update.configuration = config_update.model_dump(
        exclude_none=True
    )

    # Update the action in the database
    action_response = self.zen_store.update_action(
        action_id=action.id,
        action_update=action_update,
    )
    try:
        # Instantiate the configuration from the response
        response_config = self.validate_action_configuration(
            action_response.configuration
        )
        # Call the implementation specific method to process the update
        # request before it is sent to the database
        self._process_action_update(
            action=action_response,
            config=response_config,
            previous_action=action,
            previous_config=config,
        )
        # Add any implementation specific related resources to the action
        # response
        self._populate_action_response_resources(
            action=action_response, config=response_config
        )
    except Exception:
        # If the action update fails, roll back the action in
        # the database to the original state
        logger.exception(
            f"Failed to update action {action}. "
            f"Rolling back the action to the previous state."
        )
        self.zen_store.update_action(
            action_id=action.id,
            action_update=ActionUpdate.from_response(action),
        )
        raise

    # Serialize the configuration back into the response
    action_response.set_configuration(
        response_config.model_dump(exclude_none=True)
    )
    # Return the response to the user
    return action_response
validate_action_configuration(action_config: Dict[str, Any]) -> ActionConfig

Validate and return the action configuration.

Parameters:

Name Type Description Default
action_config Dict[str, Any]

The action configuration to validate.

required

Returns:

Type Description
ActionConfig

The validated action configuration.

Raises:

Type Description
ValueError

if the configuration is invalid.

Source code in src/zenml/actions/base_action.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def validate_action_configuration(
    self, action_config: Dict[str, Any]
) -> ActionConfig:
    """Validate and return the action configuration.

    Args:
        action_config: The action configuration to validate.

    Returns:
        The validated action configuration.

    Raises:
        ValueError: if the configuration is invalid.
    """
    try:
        return self.config_class(**action_config)
    except ValueError as e:
        raise ValueError(f"Invalid configuration for action: {e}.") from e
Functions

pipeline_run

Modules
pipeline_run_action

Example file of what an action Plugin could look like.

Classes
PipelineRunActionConfiguration

Bases: ActionConfig

Configuration class to configure a pipeline run action.

PipelineRunActionFlavor

Bases: BaseActionFlavor

Enables users to configure pipeline run action.

PipelineRunActionHandler(event_hub: Optional[BaseEventHub] = None)

Bases: BaseActionHandler

Action handler for running pipelines.

Source code in src/zenml/actions/base_action.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
    """Event source handler initialization.

    Args:
        event_hub: Optional event hub to use to initialize the event source
            handler. If not set during initialization, it may be set
            at a later time by calling `set_event_hub`. An event hub must
            be configured before the event handler needs to dispatch events.
    """
    super().__init__()
    if event_hub is None:
        from zenml.event_hub.event_hub import (
            event_hub as default_event_hub,
        )

        # TODO: for now, we use the default internal event hub. In
        # the future, this should be configurable.
        event_hub = default_event_hub

    self.set_event_hub(event_hub)
Attributes
config_class: Type[PipelineRunActionConfiguration] property

Returns the BasePluginConfig config.

Returns:

Type Description
Type[PipelineRunActionConfiguration]

The configuration.

flavor_class: Type[BaseActionFlavor] property

Returns the flavor class of the plugin.

Returns:

Type Description
Type[BaseActionFlavor]

The flavor class of the plugin.

Functions
extract_resources(action_config: ActionConfig, hydrate: bool = False) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]

Extract related resources for this action.

Parameters:

Name Type Description Default
action_config ActionConfig

Action configuration from which to extract related resources.

required
hydrate bool

Flag deciding whether to hydrate the resources.

False

Returns:

Type Description
Dict[ResourceType, BaseResponse[Any, Any, Any]]

List of resources related to the action.

Raises:

Type Description
ValueError

In case the specified template does not exist.

Source code in src/zenml/actions/pipeline_run/pipeline_run_action.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def extract_resources(
    self,
    action_config: ActionConfig,
    hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
    """Extract related resources for this action.

    Args:
        action_config: Action configuration from which to extract related
            resources.
        hydrate: Flag deciding whether to hydrate the resources.

    Returns:
        List of resources related to the action.

    Raises:
        ValueError: In case the specified template does not exist.
    """
    assert isinstance(action_config, PipelineRunActionConfiguration)

    zen_store = GlobalConfiguration().zen_store

    try:
        template = zen_store.get_run_template(
            template_id=action_config.template_id, hydrate=hydrate
        )
    except KeyError:
        raise ValueError(
            f"No template found with id {action_config.template_id}."
        )

    resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
        ResourceType.RUN_TEMPLATE: template
    }

    if template.pipeline is not None:
        pipeline = zen_store.get_pipeline(
            pipeline_id=template.pipeline.id, hydrate=hydrate
        )
        resources[ResourceType.PIPELINE] = pipeline

    return resources
run(config: ActionConfig, trigger_execution: TriggerExecutionResponse, auth_context: AuthContext) -> None

Execute an action.

Parameters:

Name Type Description Default
config ActionConfig

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in src/zenml/actions/pipeline_run/pipeline_run_action.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def run(
    self,
    config: ActionConfig,
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Execute an action.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
    from zenml.zen_server.utils import zen_store

    assert isinstance(config, PipelineRunActionConfiguration)

    template = zen_store().get_run_template(config.template_id)
    logger.debug("Running template:", template)
    run_template(
        template=template,
        run_config=config.run_config,
        auth_context=auth_context,
    )
Functions