Actions
zenml.actions
special
Actions allow configuring a given action for later execution.
base_action
Base implementation of actions.
ActionConfig (BasePluginConfig)
Allows configuring the action configuration.
Source code in zenml/actions/base_action.py
class ActionConfig(BasePluginConfig):
"""Allows configuring the action configuration."""
BaseActionFlavor (BasePluginFlavor, ABC)
Base Action Flavor to register Action Configurations.
Source code in zenml/actions/base_action.py
class BaseActionFlavor(BasePluginFlavor, ABC):
"""Base Action Flavor to register Action Configurations."""
TYPE: ClassVar[PluginType] = PluginType.ACTION
# Action specific
ACTION_CONFIG_CLASS: ClassVar[Type[ActionConfig]]
@classmethod
def get_action_config_schema(cls) -> Dict[str, Any]:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return cls.ACTION_CONFIG_CLASS.model_json_schema()
@classmethod
def get_flavor_response_model(cls, hydrate: bool) -> ActionFlavorResponse:
"""Convert the Flavor into a Response Model.
Args:
hydrate: Whether the model should be hydrated.
Returns:
The response model.
"""
metadata = None
if hydrate:
metadata = ActionFlavorResponseMetadata(
config_schema=cls.get_action_config_schema(),
)
return ActionFlavorResponse(
body=ActionFlavorResponseBody(),
metadata=metadata,
name=cls.FLAVOR,
type=cls.TYPE,
subtype=cls.SUBTYPE,
)
get_action_config_schema()
classmethod
The config schema for a flavor.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The config schema. |
Source code in zenml/actions/base_action.py
@classmethod
def get_action_config_schema(cls) -> Dict[str, Any]:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return cls.ACTION_CONFIG_CLASS.model_json_schema()
get_flavor_response_model(hydrate)
classmethod
Convert the Flavor into a Response Model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hydrate |
bool |
Whether the model should be hydrated. |
required |
Returns:
Type | Description |
---|---|
ActionFlavorResponse |
The response model. |
Source code in zenml/actions/base_action.py
@classmethod
def get_flavor_response_model(cls, hydrate: bool) -> ActionFlavorResponse:
"""Convert the Flavor into a Response Model.
Args:
hydrate: Whether the model should be hydrated.
Returns:
The response model.
"""
metadata = None
if hydrate:
metadata = ActionFlavorResponseMetadata(
config_schema=cls.get_action_config_schema(),
)
return ActionFlavorResponse(
body=ActionFlavorResponseBody(),
metadata=metadata,
name=cls.FLAVOR,
type=cls.TYPE,
subtype=cls.SUBTYPE,
)
BaseActionHandler (BasePlugin, ABC)
Implementation for an action handler.
Source code in zenml/actions/base_action.py
class BaseActionHandler(BasePlugin, ABC):
"""Implementation for an action handler."""
_event_hub: Optional[BaseEventHub] = None
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
"""Event source handler initialization.
Args:
event_hub: Optional event hub to use to initialize the event source
handler. If not set during initialization, it may be set
at a later time by calling `set_event_hub`. An event hub must
be configured before the event handler needs to dispatch events.
"""
super().__init__()
if event_hub is None:
from zenml.event_hub.event_hub import (
event_hub as default_event_hub,
)
# TODO: for now, we use the default internal event hub. In
# the future, this should be configurable.
event_hub = default_event_hub
self.set_event_hub(event_hub)
@property
@abstractmethod
def config_class(self) -> Type[ActionConfig]:
"""Returns the `BasePluginConfig` config.
Returns:
The configuration.
"""
@property
@abstractmethod
def flavor_class(self) -> Type[BaseActionFlavor]:
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
@property
def event_hub(self) -> BaseEventHub:
"""Get the event hub used to dispatch events.
Returns:
The event hub.
Raises:
RuntimeError: if an event hub isn't configured.
"""
if self._event_hub is None:
raise RuntimeError(
f"An event hub is not configured for the "
f"{self.flavor_class.FLAVOR} {self.flavor_class.SUBTYPE} "
f"action handler."
)
return self._event_hub
def set_event_hub(self, event_hub: BaseEventHub) -> None:
"""Configure an event hub for this event source plugin.
Args:
event_hub: Event hub to be used by this event handler to dispatch
events.
"""
self._event_hub = event_hub
self._event_hub.subscribe_action_handler(
action_flavor=self.flavor_class.FLAVOR,
action_subtype=self.flavor_class.SUBTYPE,
callback=self.event_hub_callback,
)
def event_hub_callback(
self,
config: Dict[str, Any],
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Callback to be used by the event hub to dispatch events to the action handler.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
try:
config_obj = self.config_class(**config)
except ValueError as e:
logger.exception(
f"Action handler {self.flavor_class.FLAVOR} of type "
f"{self.flavor_class.SUBTYPE} received an invalid configuration "
f"from the event hub: {e}."
)
return
try:
# TODO: this would be a great place to convert the event back into its
# original form and pass it to the action handler.
self.run(
config=config_obj,
trigger_execution=trigger_execution,
auth_context=auth_context,
)
except Exception:
# Don't let the event hub crash if the action handler fails
# TODO: we might want to return a value here indicating to the event
# hub that the action handler failed to execute the action. This can
# be used by the event hub to retry the action handler or to log the
# failure.
logger.exception(
f"Action handler {self.flavor_class.FLAVOR} of type "
f"{self.flavor_class.SUBTYPE} failed to execute the action "
f"with configuration {config}."
)
@abstractmethod
def run(
self,
config: ActionConfig,
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Execute an action.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
def create_action(self, action: ActionRequest) -> ActionResponse:
"""Process a action request and create the action in the database.
Args:
action: Action request.
Raises:
Exception: If the implementation specific processing before creating
the action fails.
Returns:
The created action.
"""
# Validate and instantiate the configuration from the request
config = self.validate_action_configuration(action.configuration)
# Call the implementation specific method to validate the request
# before it is sent to the database
self._validate_action_request(action=action, config=config)
# Serialize the configuration back into the request
action.configuration = config.model_dump(exclude_none=True)
# Create the action in the database
action_response = self.zen_store.create_action(action=action)
try:
# Instantiate the configuration from the response
config = self.validate_action_configuration(action.configuration)
# Call the implementation specific method to process the created
# action
self._process_action_request(action=action_response, config=config)
# Add any implementation specific related resources to the action
# response
self._populate_action_response_resources(
action=action_response,
config=config,
)
except Exception:
# If the action creation fails, delete the action from
# the database
logger.exception(
f"Failed to create action {action_response}. "
f"Deleting the action."
)
self.zen_store.delete_action(action_id=action_response.id)
raise
# Serialize the configuration back into the response
action_response.set_configuration(config.model_dump(exclude_none=True))
# Return the response to the user
return action_response
def update_action(
self,
action: ActionResponse,
action_update: ActionUpdate,
) -> ActionResponse:
"""Process action update and update the action in the database.
Args:
action: The action to update.
action_update: The update to be applied to the action.
Raises:
Exception: If the implementation specific processing before updating
the action fails.
Returns:
The updated action.
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_action_configuration(action.configuration)
# Validate and instantiate the configuration from the update request
# NOTE: if supplied, the configuration update is a full replacement
# of the original configuration
config_update = config
if action_update.configuration is not None:
config_update = self.validate_action_configuration(
action_update.configuration
)
# Call the implementation specific method to validate the update request
# before it is sent to the database
self._validate_action_update(
action=action,
config=config,
action_update=action_update,
config_update=config_update,
)
# Serialize the configuration update back into the update request
action_update.configuration = config_update.model_dump(
exclude_none=True
)
# Update the action in the database
action_response = self.zen_store.update_action(
action_id=action.id,
action_update=action_update,
)
try:
# Instantiate the configuration from the response
response_config = self.validate_action_configuration(
action_response.configuration
)
# Call the implementation specific method to process the update
# request before it is sent to the database
self._process_action_update(
action=action_response,
config=response_config,
previous_action=action,
previous_config=config,
)
# Add any implementation specific related resources to the action
# response
self._populate_action_response_resources(
action=action_response, config=response_config
)
except Exception:
# If the action update fails, roll back the action in
# the database to the original state
logger.exception(
f"Failed to update action {action}. "
f"Rolling back the action to the previous state."
)
self.zen_store.update_action(
action_id=action.id,
action_update=ActionUpdate.from_response(action),
)
raise
# Serialize the configuration back into the response
action_response.set_configuration(
response_config.model_dump(exclude_none=True)
)
# Return the response to the user
return action_response
def delete_action(
self,
action: ActionResponse,
force: bool = False,
) -> None:
"""Process action delete request and delete the action in the database.
Args:
action: The action to delete.
force: Whether to force delete the action from the database
even if the action handler fails to delete the event
source.
Raises:
Exception: If the implementation specific processing before deleting
the action fails.
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_action_configuration(action.configuration)
try:
# Call the implementation specific method to process the deleted
# action before it is deleted from the database
self._process_action_delete(
action=action,
config=config,
force=force,
)
except Exception:
logger.exception(f"Failed to delete action {action}. ")
if not force:
raise
logger.warning(f"Force deleting action {action}.")
# Delete the action from the database
self.zen_store.delete_action(
action_id=action.id,
)
def get_action(
self, action: ActionResponse, hydrate: bool = False
) -> ActionResponse:
"""Process a action response before it is returned to the user.
Args:
action: The action fetched from the database.
hydrate: Whether to hydrate the action.
Returns:
The action.
"""
if hydrate:
# Instantiate the configuration from the response
config = self.validate_action_configuration(action.configuration)
# Call the implementation specific method to process the response
self._process_action_response(action=action, config=config)
# Serialize the configuration back into the response
action.set_configuration(config.model_dump(exclude_none=True))
# Add any implementation specific related resources to the action
# response
self._populate_action_response_resources(
action=action, config=config
)
# Return the response to the user
return action
def validate_action_configuration(
self, action_config: Dict[str, Any]
) -> ActionConfig:
"""Validate and return the action configuration.
Args:
action_config: The action configuration to validate.
Returns:
The validated action configuration.
Raises:
ValueError: if the configuration is invalid.
"""
try:
return self.config_class(**action_config)
except ValueError as e:
raise ValueError(f"Invalid configuration for action: {e}.") from e
def extract_resources(
self,
action_config: ActionConfig,
hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
"""Extract related resources for this action.
Args:
action_config: Action configuration from which to extract related
resources.
hydrate: Whether to hydrate the resource models.
Returns:
List of resources related to the action.
"""
return {}
def _validate_action_request(
self, action: ActionRequest, config: ActionConfig
) -> None:
"""Validate an action request before it is created in the database.
Concrete action handlers should override this method to add
implementation specific functionality pertaining to the validation of
a new action. The implementation may also modify the action
request and/or configuration in place to apply implementation specific
changes before the request is stored in the database.
If validation is required, the implementation should raise a
ValueError if the configuration is invalid. If any exceptions are raised
during the validation, the action will not be created in the
database.
The resulted action configuration is serialized back into the action
request before it is stored in the database.
The implementation should not use this method to provision any external
resources, as the action may not be created in the database if
the database level validation fails.
Args:
action: Action request.
config: Action configuration instantiated from the request.
"""
pass
def _process_action_request(
self, action: ActionResponse, config: ActionConfig
) -> None:
"""Process an action request after it is created in the database.
Concrete action handlers should override this method to add
implementation specific functionality pertaining to the creation of
a new action. The implementation may also modify the action
response and/or configuration in place to apply implementation specific
changes before the response is returned to the user.
The resulted configuration is serialized back into the action
response before it is returned to the user.
The implementation should use this method to provision any external
resources required for the action. If any of the provisioning
fails, the implementation should raise an exception and the action
will be deleted from the database.
Args:
action: Newly created action
config: Action configuration instantiated from the response.
"""
pass
def _validate_action_update(
self,
action: ActionResponse,
config: ActionConfig,
action_update: ActionUpdate,
config_update: ActionConfig,
) -> None:
"""Validate an action update before it is reflected in the database.
Concrete action handlers should override this method to add
implementation specific functionality pertaining to validation of an
action update request. The implementation may also modify the
action update and/or configuration update in place to apply
implementation specific changes.
If validation is required, the implementation should raise a
ValueError if the configuration update is invalid. If any exceptions are
raised during the validation, the action will not be updated in
the database.
The resulted configuration update is serialized back into the event
source update before it is stored in the database.
The implementation should not use this method to provision any external
resources, as the action may not be updated in the database if
the database level validation fails.
Args:
action: Original action before the update.
config: Action configuration instantiated from the original
trigger.
action_update: Action update request.
config_update: Action configuration instantiated from the
updated action.
"""
pass
def _process_action_update(
self,
action: ActionResponse,
config: ActionConfig,
previous_action: ActionResponse,
previous_config: ActionConfig,
) -> None:
"""Process an action after it is updated in the database.
Concrete action handlers should override this method to add
implementation specific functionality pertaining to updating an existing
action. The implementation may also modify the action
and/or configuration in place to apply implementation specific
changes before the response is returned to the user.
The resulted configuration is serialized back into the action
response before it is returned to the user.
The implementation should use this method to provision any external
resources required for the action update. If any of the
provisioning fails, the implementation should raise an exception and the
action will be rolled back to the previous state in the database.
Args:
action: Action after the update.
config: Action configuration instantiated from the updated
action.
previous_action: Original action before the update.
previous_config: Action configuration instantiated from the
original action.
"""
pass
def _process_action_delete(
self,
action: ActionResponse,
config: ActionConfig,
force: Optional[bool] = False,
) -> None:
"""Process an action before it is deleted from the database.
Concrete action handlers should override this method to add
implementation specific functionality pertaining to the deletion of an
action.
The implementation should use this method to deprovision any external
resources required for the action. If any of the deprovisioning
fails, the implementation should raise an exception and the
action will kept in the database (unless force is True, in which
case the action will be deleted from the database regardless of
any deprovisioning failures).
Args:
action: Action before the deletion.
config: Action configuration before the deletion.
force: Whether to force deprovision the action.
"""
pass
def _process_action_response(
self, action: ActionResponse, config: ActionConfig
) -> None:
"""Process an action response before it is returned to the user.
Concrete action handlers should override this method to add
implementation specific functionality pertaining to how the action
response is returned to the user. The implementation may modify the
the action response and/or configuration in place.
The resulted configuration is serialized back into the action
response before it is returned to the user.
This method is applied to all action responses fetched from the
database before they are returned to the user, with the exception of
those returned from the `create_action`, `update_action`,
and `delete_action` methods, which have their own specific
processing methods.
Args:
action: Action response.
config: Action configuration instantiated from the response.
"""
pass
def _populate_action_response_resources(
self,
action: ActionResponse,
config: ActionConfig,
hydrate: bool = False,
) -> None:
"""Populate related resources for the action response.
Concrete action handlers should override this method to add
implementation specific related resources to the action response.
This method is applied to all action responses fetched from the
database before they are returned to the user, including
those returned from the `create_action`, `update_action`,
and `delete_action` methods.
Args:
action: Action response.
config: Action configuration instantiated from the response.
hydrate: Whether to hydrate the resource models.
"""
if action.resources is None:
# We only populate the resources if the resources field is already
# set by the database by means of hydration.
return
extract_resources = self.extract_resources(config, hydrate=hydrate)
for resource_type, resource in extract_resources.items():
setattr(action.resources, str(resource_type), resource)
config_class: Type[zenml.actions.base_action.ActionConfig]
property
readonly
Returns the BasePluginConfig
config.
Returns:
Type | Description |
---|---|
Type[zenml.actions.base_action.ActionConfig] |
The configuration. |
event_hub: BaseEventHub
property
readonly
Get the event hub used to dispatch events.
Returns:
Type | Description |
---|---|
BaseEventHub |
The event hub. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an event hub isn't configured. |
flavor_class: Type[zenml.actions.base_action.BaseActionFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[zenml.actions.base_action.BaseActionFlavor] |
The flavor class of the plugin. |
__init__(self, event_hub=None)
special
Event source handler initialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_hub |
Optional[zenml.event_hub.base_event_hub.BaseEventHub] |
Optional event hub to use to initialize the event source
handler. If not set during initialization, it may be set
at a later time by calling |
None |
Source code in zenml/actions/base_action.py
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
"""Event source handler initialization.
Args:
event_hub: Optional event hub to use to initialize the event source
handler. If not set during initialization, it may be set
at a later time by calling `set_event_hub`. An event hub must
be configured before the event handler needs to dispatch events.
"""
super().__init__()
if event_hub is None:
from zenml.event_hub.event_hub import (
event_hub as default_event_hub,
)
# TODO: for now, we use the default internal event hub. In
# the future, this should be configurable.
event_hub = default_event_hub
self.set_event_hub(event_hub)
create_action(self, action)
Process a action request and create the action in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action |
ActionRequest |
Action request. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
If the implementation specific processing before creating the action fails. |
Returns:
Type | Description |
---|---|
ActionResponse |
The created action. |
Source code in zenml/actions/base_action.py
def create_action(self, action: ActionRequest) -> ActionResponse:
"""Process a action request and create the action in the database.
Args:
action: Action request.
Raises:
Exception: If the implementation specific processing before creating
the action fails.
Returns:
The created action.
"""
# Validate and instantiate the configuration from the request
config = self.validate_action_configuration(action.configuration)
# Call the implementation specific method to validate the request
# before it is sent to the database
self._validate_action_request(action=action, config=config)
# Serialize the configuration back into the request
action.configuration = config.model_dump(exclude_none=True)
# Create the action in the database
action_response = self.zen_store.create_action(action=action)
try:
# Instantiate the configuration from the response
config = self.validate_action_configuration(action.configuration)
# Call the implementation specific method to process the created
# action
self._process_action_request(action=action_response, config=config)
# Add any implementation specific related resources to the action
# response
self._populate_action_response_resources(
action=action_response,
config=config,
)
except Exception:
# If the action creation fails, delete the action from
# the database
logger.exception(
f"Failed to create action {action_response}. "
f"Deleting the action."
)
self.zen_store.delete_action(action_id=action_response.id)
raise
# Serialize the configuration back into the response
action_response.set_configuration(config.model_dump(exclude_none=True))
# Return the response to the user
return action_response
delete_action(self, action, force=False)
Process action delete request and delete the action in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action |
ActionResponse |
The action to delete. |
required |
force |
bool |
Whether to force delete the action from the database even if the action handler fails to delete the event source. |
False |
Exceptions:
Type | Description |
---|---|
Exception |
If the implementation specific processing before deleting the action fails. |
Source code in zenml/actions/base_action.py
def delete_action(
self,
action: ActionResponse,
force: bool = False,
) -> None:
"""Process action delete request and delete the action in the database.
Args:
action: The action to delete.
force: Whether to force delete the action from the database
even if the action handler fails to delete the event
source.
Raises:
Exception: If the implementation specific processing before deleting
the action fails.
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_action_configuration(action.configuration)
try:
# Call the implementation specific method to process the deleted
# action before it is deleted from the database
self._process_action_delete(
action=action,
config=config,
force=force,
)
except Exception:
logger.exception(f"Failed to delete action {action}. ")
if not force:
raise
logger.warning(f"Force deleting action {action}.")
# Delete the action from the database
self.zen_store.delete_action(
action_id=action.id,
)
event_hub_callback(self, config, trigger_execution, auth_context)
Callback to be used by the event hub to dispatch events to the action handler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Dict[str, Any] |
The action configuration |
required |
trigger_execution |
TriggerExecutionResponse |
The trigger execution |
required |
auth_context |
AuthContext |
Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window. |
required |
Source code in zenml/actions/base_action.py
def event_hub_callback(
self,
config: Dict[str, Any],
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Callback to be used by the event hub to dispatch events to the action handler.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
try:
config_obj = self.config_class(**config)
except ValueError as e:
logger.exception(
f"Action handler {self.flavor_class.FLAVOR} of type "
f"{self.flavor_class.SUBTYPE} received an invalid configuration "
f"from the event hub: {e}."
)
return
try:
# TODO: this would be a great place to convert the event back into its
# original form and pass it to the action handler.
self.run(
config=config_obj,
trigger_execution=trigger_execution,
auth_context=auth_context,
)
except Exception:
# Don't let the event hub crash if the action handler fails
# TODO: we might want to return a value here indicating to the event
# hub that the action handler failed to execute the action. This can
# be used by the event hub to retry the action handler or to log the
# failure.
logger.exception(
f"Action handler {self.flavor_class.FLAVOR} of type "
f"{self.flavor_class.SUBTYPE} failed to execute the action "
f"with configuration {config}."
)
extract_resources(self, action_config, hydrate=False)
Extract related resources for this action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_config |
ActionConfig |
Action configuration from which to extract related resources. |
required |
hydrate |
bool |
Whether to hydrate the resource models. |
False |
Returns:
Type | Description |
---|---|
Dict[zenml.zen_server.rbac.models.ResourceType, zenml.models.v2.base.base.BaseResponse[Any, Any, Any]] |
List of resources related to the action. |
Source code in zenml/actions/base_action.py
def extract_resources(
self,
action_config: ActionConfig,
hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
"""Extract related resources for this action.
Args:
action_config: Action configuration from which to extract related
resources.
hydrate: Whether to hydrate the resource models.
Returns:
List of resources related to the action.
"""
return {}
get_action(self, action, hydrate=False)
Process a action response before it is returned to the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action |
ActionResponse |
The action fetched from the database. |
required |
hydrate |
bool |
Whether to hydrate the action. |
False |
Returns:
Type | Description |
---|---|
ActionResponse |
The action. |
Source code in zenml/actions/base_action.py
def get_action(
self, action: ActionResponse, hydrate: bool = False
) -> ActionResponse:
"""Process a action response before it is returned to the user.
Args:
action: The action fetched from the database.
hydrate: Whether to hydrate the action.
Returns:
The action.
"""
if hydrate:
# Instantiate the configuration from the response
config = self.validate_action_configuration(action.configuration)
# Call the implementation specific method to process the response
self._process_action_response(action=action, config=config)
# Serialize the configuration back into the response
action.set_configuration(config.model_dump(exclude_none=True))
# Add any implementation specific related resources to the action
# response
self._populate_action_response_resources(
action=action, config=config
)
# Return the response to the user
return action
run(self, config, trigger_execution, auth_context)
Execute an action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ActionConfig |
The action configuration |
required |
trigger_execution |
TriggerExecutionResponse |
The trigger execution |
required |
auth_context |
AuthContext |
Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window. |
required |
Source code in zenml/actions/base_action.py
@abstractmethod
def run(
self,
config: ActionConfig,
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Execute an action.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
set_event_hub(self, event_hub)
Configure an event hub for this event source plugin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_hub |
BaseEventHub |
Event hub to be used by this event handler to dispatch events. |
required |
Source code in zenml/actions/base_action.py
def set_event_hub(self, event_hub: BaseEventHub) -> None:
"""Configure an event hub for this event source plugin.
Args:
event_hub: Event hub to be used by this event handler to dispatch
events.
"""
self._event_hub = event_hub
self._event_hub.subscribe_action_handler(
action_flavor=self.flavor_class.FLAVOR,
action_subtype=self.flavor_class.SUBTYPE,
callback=self.event_hub_callback,
)
update_action(self, action, action_update)
Process action update and update the action in the database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action |
ActionResponse |
The action to update. |
required |
action_update |
ActionUpdate |
The update to be applied to the action. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
If the implementation specific processing before updating the action fails. |
Returns:
Type | Description |
---|---|
ActionResponse |
The updated action. |
Source code in zenml/actions/base_action.py
def update_action(
self,
action: ActionResponse,
action_update: ActionUpdate,
) -> ActionResponse:
"""Process action update and update the action in the database.
Args:
action: The action to update.
action_update: The update to be applied to the action.
Raises:
Exception: If the implementation specific processing before updating
the action fails.
Returns:
The updated action.
"""
# Validate and instantiate the configuration from the original event
# source
config = self.validate_action_configuration(action.configuration)
# Validate and instantiate the configuration from the update request
# NOTE: if supplied, the configuration update is a full replacement
# of the original configuration
config_update = config
if action_update.configuration is not None:
config_update = self.validate_action_configuration(
action_update.configuration
)
# Call the implementation specific method to validate the update request
# before it is sent to the database
self._validate_action_update(
action=action,
config=config,
action_update=action_update,
config_update=config_update,
)
# Serialize the configuration update back into the update request
action_update.configuration = config_update.model_dump(
exclude_none=True
)
# Update the action in the database
action_response = self.zen_store.update_action(
action_id=action.id,
action_update=action_update,
)
try:
# Instantiate the configuration from the response
response_config = self.validate_action_configuration(
action_response.configuration
)
# Call the implementation specific method to process the update
# request before it is sent to the database
self._process_action_update(
action=action_response,
config=response_config,
previous_action=action,
previous_config=config,
)
# Add any implementation specific related resources to the action
# response
self._populate_action_response_resources(
action=action_response, config=response_config
)
except Exception:
# If the action update fails, roll back the action in
# the database to the original state
logger.exception(
f"Failed to update action {action}. "
f"Rolling back the action to the previous state."
)
self.zen_store.update_action(
action_id=action.id,
action_update=ActionUpdate.from_response(action),
)
raise
# Serialize the configuration back into the response
action_response.set_configuration(
response_config.model_dump(exclude_none=True)
)
# Return the response to the user
return action_response
validate_action_configuration(self, action_config)
Validate and return the action configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_config |
Dict[str, Any] |
The action configuration to validate. |
required |
Returns:
Type | Description |
---|---|
ActionConfig |
The validated action configuration. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the configuration is invalid. |
Source code in zenml/actions/base_action.py
def validate_action_configuration(
self, action_config: Dict[str, Any]
) -> ActionConfig:
"""Validate and return the action configuration.
Args:
action_config: The action configuration to validate.
Returns:
The validated action configuration.
Raises:
ValueError: if the configuration is invalid.
"""
try:
return self.config_class(**action_config)
except ValueError as e:
raise ValueError(f"Invalid configuration for action: {e}.") from e
pipeline_run
special
pipeline_run_action
Example file of what an action Plugin could look like.
PipelineRunActionConfiguration (ActionConfig)
Configuration class to configure a pipeline run action.
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionConfiguration(ActionConfig):
"""Configuration class to configure a pipeline run action."""
pipeline_deployment_id: UUID
run_config: Optional[PipelineRunConfiguration] = None
PipelineRunActionFlavor (BaseActionFlavor)
Enables users to configure pipeline run action.
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionFlavor(BaseActionFlavor):
"""Enables users to configure pipeline run action."""
FLAVOR: ClassVar[str] = "builtin"
SUBTYPE: ClassVar[PluginSubType] = PluginSubType.PIPELINE_RUN
PLUGIN_CLASS: ClassVar[Type[PipelineRunActionHandler]] = (
PipelineRunActionHandler
)
# EventPlugin specific
ACTION_CONFIG_CLASS: ClassVar[Type[PipelineRunActionConfiguration]] = (
PipelineRunActionConfiguration
)
ACTION_CONFIG_CLASS (ActionConfig)
Configuration class to configure a pipeline run action.
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionConfiguration(ActionConfig):
"""Configuration class to configure a pipeline run action."""
pipeline_deployment_id: UUID
run_config: Optional[PipelineRunConfiguration] = None
PLUGIN_CLASS (BaseActionHandler)
Action handler for running pipelines.
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionHandler(BaseActionHandler):
"""Action handler for running pipelines."""
@property
def config_class(self) -> Type[PipelineRunActionConfiguration]:
"""Returns the `BasePluginConfig` config.
Returns:
The configuration.
"""
return PipelineRunActionConfiguration
@property
def flavor_class(self) -> Type[BaseActionFlavor]:
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
return PipelineRunActionFlavor
def run(
self,
config: ActionConfig,
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Execute an action.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
from zenml.zen_server.utils import zen_store
assert isinstance(config, PipelineRunActionConfiguration)
deployment = zen_store().get_deployment(config.pipeline_deployment_id)
logger.debug("Running deployment:", deployment)
run_pipeline(
deployment=deployment,
run_config=config.run_config,
auth_context=auth_context,
)
def _validate_configuration(
self, config: PipelineRunActionConfiguration
) -> None:
"""Validate a pipeline run action configuration.
Args:
config: Pipeline run action configuration.
Raises:
ValueError: In case no deployment can be found with the deployment_id
"""
deployment_id = config.pipeline_deployment_id
zen_store = GlobalConfiguration().zen_store
try:
zen_store.get_deployment(deployment_id=deployment_id)
except KeyError:
raise ValueError(f"No deployment found with id {deployment_id}.")
def _validate_action_request(
self, action: ActionRequest, config: ActionConfig
) -> None:
"""Validate an action request before it is created in the database.
Args:
action: Action request.
config: Action configuration instantiated from the request.
"""
assert isinstance(config, PipelineRunActionConfiguration)
self._validate_configuration(config)
# If an expiration window is not set, we set it to the default value
if action.auth_window is None:
action.auth_window = server_config().pipeline_run_auth_window
def _validate_action_update(
self,
action: ActionResponse,
config: ActionConfig,
action_update: ActionUpdate,
config_update: ActionConfig,
) -> None:
"""Validate an action update before it is reflected in the database.
Args:
action: Original action before the update.
config: Action configuration instantiated from the original
action.
action_update: Action update request.
config_update: Action configuration instantiated from the
updated action.
"""
assert isinstance(config, PipelineRunActionConfiguration)
self._validate_configuration(config)
def extract_resources(
self,
action_config: ActionConfig,
hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
"""Extract related resources for this action.
Args:
action_config: Action configuration from which to extract related
resources.
hydrate: Flag deciding whether to hydrate the resources.
Returns:
List of resources related to the action.
Raises:
ValueError: In case the deployment_id does not exist.
"""
assert isinstance(action_config, PipelineRunActionConfiguration)
deployment_id = action_config.pipeline_deployment_id
zen_store = GlobalConfiguration().zen_store
try:
deployment = zen_store.get_deployment(
deployment_id=deployment_id, hydrate=hydrate
)
except KeyError:
raise ValueError(f"No deployment found with id {deployment_id}.")
resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
ResourceType.PIPELINE_DEPLOYMENT: deployment
}
if deployment.pipeline is not None:
pipeline = zen_store.get_pipeline(
pipeline_id=deployment.pipeline.id, hydrate=hydrate
)
resources[ResourceType.PIPELINE] = pipeline
return resources
config_class: Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration]
property
readonly
Returns the BasePluginConfig
config.
Returns:
Type | Description |
---|---|
Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration] |
The configuration. |
flavor_class: Type[zenml.actions.base_action.BaseActionFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[zenml.actions.base_action.BaseActionFlavor] |
The flavor class of the plugin. |
extract_resources(self, action_config, hydrate=False)
Extract related resources for this action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_config |
ActionConfig |
Action configuration from which to extract related resources. |
required |
hydrate |
bool |
Flag deciding whether to hydrate the resources. |
False |
Returns:
Type | Description |
---|---|
Dict[zenml.zen_server.rbac.models.ResourceType, zenml.models.v2.base.base.BaseResponse[Any, Any, Any]] |
List of resources related to the action. |
Exceptions:
Type | Description |
---|---|
ValueError |
In case the deployment_id does not exist. |
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def extract_resources(
self,
action_config: ActionConfig,
hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
"""Extract related resources for this action.
Args:
action_config: Action configuration from which to extract related
resources.
hydrate: Flag deciding whether to hydrate the resources.
Returns:
List of resources related to the action.
Raises:
ValueError: In case the deployment_id does not exist.
"""
assert isinstance(action_config, PipelineRunActionConfiguration)
deployment_id = action_config.pipeline_deployment_id
zen_store = GlobalConfiguration().zen_store
try:
deployment = zen_store.get_deployment(
deployment_id=deployment_id, hydrate=hydrate
)
except KeyError:
raise ValueError(f"No deployment found with id {deployment_id}.")
resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
ResourceType.PIPELINE_DEPLOYMENT: deployment
}
if deployment.pipeline is not None:
pipeline = zen_store.get_pipeline(
pipeline_id=deployment.pipeline.id, hydrate=hydrate
)
resources[ResourceType.PIPELINE] = pipeline
return resources
run(self, config, trigger_execution, auth_context)
Execute an action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ActionConfig |
The action configuration |
required |
trigger_execution |
TriggerExecutionResponse |
The trigger execution |
required |
auth_context |
AuthContext |
Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window. |
required |
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def run(
self,
config: ActionConfig,
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Execute an action.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
from zenml.zen_server.utils import zen_store
assert isinstance(config, PipelineRunActionConfiguration)
deployment = zen_store().get_deployment(config.pipeline_deployment_id)
logger.debug("Running deployment:", deployment)
run_pipeline(
deployment=deployment,
run_config=config.run_config,
auth_context=auth_context,
)
PipelineRunActionHandler (BaseActionHandler)
Action handler for running pipelines.
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionHandler(BaseActionHandler):
"""Action handler for running pipelines."""
@property
def config_class(self) -> Type[PipelineRunActionConfiguration]:
"""Returns the `BasePluginConfig` config.
Returns:
The configuration.
"""
return PipelineRunActionConfiguration
@property
def flavor_class(self) -> Type[BaseActionFlavor]:
"""Returns the flavor class of the plugin.
Returns:
The flavor class of the plugin.
"""
return PipelineRunActionFlavor
def run(
self,
config: ActionConfig,
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Execute an action.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
from zenml.zen_server.utils import zen_store
assert isinstance(config, PipelineRunActionConfiguration)
deployment = zen_store().get_deployment(config.pipeline_deployment_id)
logger.debug("Running deployment:", deployment)
run_pipeline(
deployment=deployment,
run_config=config.run_config,
auth_context=auth_context,
)
def _validate_configuration(
self, config: PipelineRunActionConfiguration
) -> None:
"""Validate a pipeline run action configuration.
Args:
config: Pipeline run action configuration.
Raises:
ValueError: In case no deployment can be found with the deployment_id
"""
deployment_id = config.pipeline_deployment_id
zen_store = GlobalConfiguration().zen_store
try:
zen_store.get_deployment(deployment_id=deployment_id)
except KeyError:
raise ValueError(f"No deployment found with id {deployment_id}.")
def _validate_action_request(
self, action: ActionRequest, config: ActionConfig
) -> None:
"""Validate an action request before it is created in the database.
Args:
action: Action request.
config: Action configuration instantiated from the request.
"""
assert isinstance(config, PipelineRunActionConfiguration)
self._validate_configuration(config)
# If an expiration window is not set, we set it to the default value
if action.auth_window is None:
action.auth_window = server_config().pipeline_run_auth_window
def _validate_action_update(
self,
action: ActionResponse,
config: ActionConfig,
action_update: ActionUpdate,
config_update: ActionConfig,
) -> None:
"""Validate an action update before it is reflected in the database.
Args:
action: Original action before the update.
config: Action configuration instantiated from the original
action.
action_update: Action update request.
config_update: Action configuration instantiated from the
updated action.
"""
assert isinstance(config, PipelineRunActionConfiguration)
self._validate_configuration(config)
def extract_resources(
self,
action_config: ActionConfig,
hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
"""Extract related resources for this action.
Args:
action_config: Action configuration from which to extract related
resources.
hydrate: Flag deciding whether to hydrate the resources.
Returns:
List of resources related to the action.
Raises:
ValueError: In case the deployment_id does not exist.
"""
assert isinstance(action_config, PipelineRunActionConfiguration)
deployment_id = action_config.pipeline_deployment_id
zen_store = GlobalConfiguration().zen_store
try:
deployment = zen_store.get_deployment(
deployment_id=deployment_id, hydrate=hydrate
)
except KeyError:
raise ValueError(f"No deployment found with id {deployment_id}.")
resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
ResourceType.PIPELINE_DEPLOYMENT: deployment
}
if deployment.pipeline is not None:
pipeline = zen_store.get_pipeline(
pipeline_id=deployment.pipeline.id, hydrate=hydrate
)
resources[ResourceType.PIPELINE] = pipeline
return resources
config_class: Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration]
property
readonly
Returns the BasePluginConfig
config.
Returns:
Type | Description |
---|---|
Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration] |
The configuration. |
flavor_class: Type[zenml.actions.base_action.BaseActionFlavor]
property
readonly
Returns the flavor class of the plugin.
Returns:
Type | Description |
---|---|
Type[zenml.actions.base_action.BaseActionFlavor] |
The flavor class of the plugin. |
extract_resources(self, action_config, hydrate=False)
Extract related resources for this action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
action_config |
ActionConfig |
Action configuration from which to extract related resources. |
required |
hydrate |
bool |
Flag deciding whether to hydrate the resources. |
False |
Returns:
Type | Description |
---|---|
Dict[zenml.zen_server.rbac.models.ResourceType, zenml.models.v2.base.base.BaseResponse[Any, Any, Any]] |
List of resources related to the action. |
Exceptions:
Type | Description |
---|---|
ValueError |
In case the deployment_id does not exist. |
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def extract_resources(
self,
action_config: ActionConfig,
hydrate: bool = False,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
"""Extract related resources for this action.
Args:
action_config: Action configuration from which to extract related
resources.
hydrate: Flag deciding whether to hydrate the resources.
Returns:
List of resources related to the action.
Raises:
ValueError: In case the deployment_id does not exist.
"""
assert isinstance(action_config, PipelineRunActionConfiguration)
deployment_id = action_config.pipeline_deployment_id
zen_store = GlobalConfiguration().zen_store
try:
deployment = zen_store.get_deployment(
deployment_id=deployment_id, hydrate=hydrate
)
except KeyError:
raise ValueError(f"No deployment found with id {deployment_id}.")
resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
ResourceType.PIPELINE_DEPLOYMENT: deployment
}
if deployment.pipeline is not None:
pipeline = zen_store.get_pipeline(
pipeline_id=deployment.pipeline.id, hydrate=hydrate
)
resources[ResourceType.PIPELINE] = pipeline
return resources
run(self, config, trigger_execution, auth_context)
Execute an action.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ActionConfig |
The action configuration |
required |
trigger_execution |
TriggerExecutionResponse |
The trigger execution |
required |
auth_context |
AuthContext |
Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window. |
required |
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def run(
self,
config: ActionConfig,
trigger_execution: TriggerExecutionResponse,
auth_context: AuthContext,
) -> None:
"""Execute an action.
Args:
config: The action configuration
trigger_execution: The trigger execution
auth_context: Authentication context with an API token that can be
used by external workloads to authenticate with the server
during the execution of the action. This API token is associated
with the service account that was configured for the trigger
that activated the action and has a validity defined by the
trigger's authentication window.
"""
from zenml.zen_server.utils import zen_store
assert isinstance(config, PipelineRunActionConfiguration)
deployment = zen_store().get_deployment(config.pipeline_deployment_id)
logger.debug("Running deployment:", deployment)
run_pipeline(
deployment=deployment,
run_config=config.run_config,
auth_context=auth_context,
)