Skip to content

Actions

zenml.actions special

Actions allow configuring a given action for later execution.

base_action

Base implementation of actions.

ActionConfig (BasePluginConfig) pydantic-model

Allows configuring the action configuration.

Source code in zenml/actions/base_action.py
class ActionConfig(BasePluginConfig):
    """Allows configuring the action configuration."""

BaseActionFlavor (BasePluginFlavor, ABC)

Base Action Flavor to register Action Configurations.

Source code in zenml/actions/base_action.py
class BaseActionFlavor(BasePluginFlavor, ABC):
    """Base Action Flavor to register Action Configurations."""

    TYPE: ClassVar[PluginType] = PluginType.ACTION

    # Action specific
    ACTION_CONFIG_CLASS: ClassVar[Type[ActionConfig]]

    @classmethod
    def get_action_config_schema(cls) -> Dict[str, Any]:
        """The config schema for a flavor.

        Returns:
            The config schema.
        """
        config_schema: Dict[str, Any] = json.loads(
            cls.ACTION_CONFIG_CLASS.schema_json()
        )
        return config_schema

    @classmethod
    def get_flavor_response_model(cls, hydrate: bool) -> ActionFlavorResponse:
        """Convert the Flavor into a Response Model.

        Args:
            hydrate: Whether the model should be hydrated.

        Returns:
            The response model.
        """
        metadata = None
        if hydrate:
            metadata = ActionFlavorResponseMetadata(
                config_schema=cls.get_action_config_schema(),
            )
        return ActionFlavorResponse(
            body=ActionFlavorResponseBody(),
            metadata=metadata,
            name=cls.FLAVOR,
            type=cls.TYPE,
            subtype=cls.SUBTYPE,
        )
get_action_config_schema() classmethod

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

Source code in zenml/actions/base_action.py
@classmethod
def get_action_config_schema(cls) -> Dict[str, Any]:
    """The config schema for a flavor.

    Returns:
        The config schema.
    """
    config_schema: Dict[str, Any] = json.loads(
        cls.ACTION_CONFIG_CLASS.schema_json()
    )
    return config_schema
get_flavor_response_model(hydrate) classmethod

Convert the Flavor into a Response Model.

Parameters:

Name Type Description Default
hydrate bool

Whether the model should be hydrated.

required

Returns:

Type Description
ActionFlavorResponse

The response model.

Source code in zenml/actions/base_action.py
@classmethod
def get_flavor_response_model(cls, hydrate: bool) -> ActionFlavorResponse:
    """Convert the Flavor into a Response Model.

    Args:
        hydrate: Whether the model should be hydrated.

    Returns:
        The response model.
    """
    metadata = None
    if hydrate:
        metadata = ActionFlavorResponseMetadata(
            config_schema=cls.get_action_config_schema(),
        )
    return ActionFlavorResponse(
        body=ActionFlavorResponseBody(),
        metadata=metadata,
        name=cls.FLAVOR,
        type=cls.TYPE,
        subtype=cls.SUBTYPE,
    )

BaseActionHandler (BasePlugin, ABC)

Implementation for an action handler.

Source code in zenml/actions/base_action.py
class BaseActionHandler(BasePlugin, ABC):
    """Implementation for an action handler."""

    _event_hub: Optional[BaseEventHub] = None

    def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
        """Event source handler initialization.

        Args:
            event_hub: Optional event hub to use to initialize the event source
                handler. If not set during initialization, it may be set
                at a later time by calling `set_event_hub`. An event hub must
                be configured before the event handler needs to dispatch events.
        """
        super().__init__()
        if event_hub is None:
            from zenml.event_hub.event_hub import (
                event_hub as default_event_hub,
            )

            # TODO: for now, we use the default internal event hub. In
            # the future, this should be configurable.
            event_hub = default_event_hub

        self.set_event_hub(event_hub)

    @property
    @abstractmethod
    def config_class(self) -> Type[ActionConfig]:
        """Returns the `BasePluginConfig` config.

        Returns:
            The configuration.
        """

    @property
    @abstractmethod
    def flavor_class(self) -> Type[BaseActionFlavor]:
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """

    @property
    def event_hub(self) -> BaseEventHub:
        """Get the event hub used to dispatch events.

        Returns:
            The event hub.

        Raises:
            RuntimeError: if an event hub isn't configured.
        """
        if self._event_hub is None:
            raise RuntimeError(
                f"An event hub is not configured for the "
                f"{self.flavor_class.FLAVOR} {self.flavor_class.SUBTYPE} "
                f"action handler."
            )

        return self._event_hub

    def set_event_hub(self, event_hub: BaseEventHub) -> None:
        """Configure an event hub for this event source plugin.

        Args:
            event_hub: Event hub to be used by this event handler to dispatch
                events.
        """
        self._event_hub = event_hub
        self._event_hub.subscribe_action_handler(
            action_flavor=self.flavor_class.FLAVOR,
            action_subtype=self.flavor_class.SUBTYPE,
            callback=self.event_hub_callback,
        )

    def event_hub_callback(
        self,
        config: Dict[str, Any],
        trigger_execution: TriggerExecutionResponse,
        auth_context: AuthContext,
    ) -> None:
        """Callback to be used by the event hub to dispatch events to the action handler.

        Args:
            config: The action configuration
            trigger_execution: The trigger execution
            auth_context: Authentication context with an API token that can be
                used by external workloads to authenticate with the server
                during the execution of the action. This API token is associated
                with the service account that was configured for the trigger
                that activated the action and has a validity defined by the
                trigger's authentication window.
        """
        try:
            config_obj = self.config_class(**config)
        except ValueError as e:
            logger.exception(
                f"Action handler {self.flavor_class.FLAVOR} of type "
                f"{self.flavor_class.SUBTYPE} received an invalid configuration "
                f"from the event hub: {e}."
            )
            return

        try:
            # TODO: this would be a great place to convert the event back into its
            # original form and pass it to the action handler.
            self.run(
                config=config_obj,
                trigger_execution=trigger_execution,
                auth_context=auth_context,
            )
        except Exception:
            # Don't let the event hub crash if the action handler fails
            # TODO: we might want to return a value here indicating to the event
            # hub that the action handler failed to execute the action. This can
            # be used by the event hub to retry the action handler or to log the
            # failure.
            logger.exception(
                f"Action handler {self.flavor_class.FLAVOR} of type "
                f"{self.flavor_class.SUBTYPE} failed to execute the action "
                f"with configuration {config}."
            )

    @abstractmethod
    def run(
        self,
        config: ActionConfig,
        trigger_execution: TriggerExecutionResponse,
        auth_context: AuthContext,
    ) -> None:
        """Execute an action.

        Args:
            config: The action configuration
            trigger_execution: The trigger execution
            auth_context: Authentication context with an API token that can be
                used by external workloads to authenticate with the server
                during the execution of the action. This API token is associated
                with the service account that was configured for the trigger
                that activated the action and has a validity defined by the
                trigger's authentication window.
        """

    def create_trigger(self, trigger: TriggerRequest) -> TriggerResponse:
        """Process a trigger request and create the trigger in the database.

        Args:
            trigger: Trigger request.

        Returns:
            The created trigger.

        # noqa: DAR401
        """
        # Validate and instantiate the configuration from the request
        config = self.validate_action_configuration(trigger.action)
        # Call the implementation specific method to validate the request
        # before it is sent to the database
        self._validate_trigger_request(trigger=trigger, config=config)
        # Serialize the configuration back into the request
        trigger.action = config.dict(exclude_none=True)
        # Create the trigger in the database
        trigger_response = self.zen_store.create_trigger(trigger=trigger)
        try:
            # Instantiate the configuration from the response
            config = self.validate_action_configuration(trigger.action)
            # Call the implementation specific method to process the created
            # trigger
            self._process_trigger_request(
                trigger=trigger_response, config=config
            )
            # Add any implementation specific related resources to the trigger
            # response
            self._populate_trigger_response_resources(
                trigger=trigger_response, config=config
            )
            # Activate the trigger in the event hub to effectively start
            # dispatching events to the action handler
            self.event_hub.activate_trigger(trigger=trigger_response)
        except Exception:
            # If the trigger creation fails, delete the trigger from
            # the database
            logger.exception(
                f"Failed to create trigger {trigger_response}. "
                f"Deleting the trigger."
            )
            self.zen_store.delete_trigger(trigger_id=trigger_response.id)
            raise

        # Serialize the configuration back into the response
        trigger_response.set_action(config.dict(exclude_none=True))
        # Return the response to the user
        return trigger_response

    def update_trigger(
        self,
        trigger: TriggerResponse,
        trigger_update: TriggerUpdate,
    ) -> TriggerResponse:
        """Process a trigger update request and update the trigger in the database.

        Args:
            trigger: The trigger to update.
            trigger_update: The update to be applied to the trigger.

        Returns:
            The updated trigger.

        # noqa: DAR401
        """
        # Validate and instantiate the configuration from the original event
        # source
        config = self.validate_action_configuration(trigger.action)
        # Validate and instantiate the configuration from the update request
        # NOTE: if supplied, the configuration update is a full replacement
        # of the original configuration
        config_update = config
        if trigger_update.action is not None:
            config_update = self.validate_action_configuration(
                trigger_update.action
            )
        # Call the implementation specific method to validate the update request
        # before it is sent to the database
        self._validate_trigger_update(
            trigger=trigger,
            config=config,
            trigger_update=trigger_update,
            config_update=config_update,
        )
        # Serialize the configuration update back into the update request
        trigger_update.action = config_update.dict(exclude_none=True)

        # Update the trigger in the database
        trigger_response = self.zen_store.update_trigger(
            trigger_id=trigger.id,
            trigger_update=trigger_update,
        )
        try:
            # Instantiate the configuration from the response
            response_config = self.validate_action_configuration(
                trigger_response.action
            )
            # Call the implementation specific method to process the update
            # request before it is sent to the database
            self._process_trigger_update(
                trigger=trigger_response,
                config=response_config,
                previous_trigger=trigger,
                previous_config=config,
            )
            # Add any implementation specific related resources to the trigger
            # response
            self._populate_trigger_response_resources(
                trigger=trigger_response, config=response_config
            )
            # Deactivate the previous trigger and activate the updated trigger
            # in the event hub
            self.event_hub.deactivate_trigger(trigger=trigger)
            self.event_hub.activate_trigger(trigger=trigger_response)
        except Exception:
            # If the trigger update fails, roll back the trigger in
            # the database to the original state
            logger.exception(
                f"Failed to update trigger {trigger}. "
                f"Rolling back the trigger to the previous state."
            )
            self.zen_store.update_trigger(
                trigger_id=trigger.id,
                trigger_update=TriggerUpdate.from_response(trigger),
            )
            raise

        # Serialize the configuration back into the response
        trigger_response.set_action(response_config.dict(exclude_none=True))
        # Return the response to the user
        return trigger_response

    def delete_trigger(
        self,
        trigger: TriggerResponse,
        force: bool = False,
    ) -> None:
        """Process a trigger delete request and delete the trigger in the database.

        Args:
            trigger: The trigger to delete.
            force: Whether to force delete the trigger from the database
                even if the trigger handler fails to delete the event
                source.

        # noqa: DAR401
        """
        # Validate and instantiate the configuration from the original event
        # source
        config = self.validate_action_configuration(trigger.action)
        try:
            # Call the implementation specific method to process the deleted
            # trigger before it is deleted from the database
            self._process_trigger_delete(
                trigger=trigger,
                config=config,
                force=force,
            )
        except Exception:
            logger.exception(f"Failed to delete trigger {trigger}. ")
            if not force:
                raise

            logger.warning(f"Force deleting trigger {trigger}.")

        # Deactivate the trigger in the event hub
        self.event_hub.deactivate_trigger(trigger=trigger)

        # Delete the trigger from the database
        self.zen_store.delete_trigger(
            trigger_id=trigger.id,
        )

    def get_trigger(
        self, trigger: TriggerResponse, hydrate: bool = False
    ) -> TriggerResponse:
        """Process a trigger response before it is returned to the user.

        Args:
            trigger: The trigger fetched from the database.
            hydrate: Whether to hydrate the trigger.

        Returns:
            The trigger.
        """
        if hydrate:
            # Instantiate the configuration from the response
            config = self.validate_action_configuration(trigger.action)
            # Call the implementation specific method to process the response
            self._process_trigger_response(trigger=trigger, config=config)
            # Serialize the configuration back into the response
            trigger.set_action(config.dict(exclude_none=True))
            # Add any implementation specific related resources to the trigger
            # response
            self._populate_trigger_response_resources(
                trigger=trigger, config=config
            )

        # Return the response to the user
        return trigger

    def validate_action_configuration(
        self, action_config: Dict[str, Any]
    ) -> ActionConfig:
        """Validate and return the action configuration.

        Args:
            action_config: The actino configuration to validate.

        Returns:
            The validated action configuration.

        Raises:
            ValueError: if the configuration is invalid.
        """
        try:
            return self.config_class(**action_config)
        except ValueError as e:
            raise ValueError(f"Invalid configuration for action: {e}.") from e

    def extract_resources(
        self,
        action_config: ActionConfig,
    ) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
        """Extract related resources for this action.

        Args:
            action_config: Action configuration from which to extract related
                resources.

        Returns:
            List of resources related to the action.
        """
        return {}

    def _validate_trigger_request(
        self, trigger: TriggerRequest, config: ActionConfig
    ) -> None:
        """Validate a trigger request before it is created in the database.

        Concrete action handlers should override this method to add
        implementation specific functionality pertaining to the validation of
        a new trigger. The implementation may also modify the trigger
        request and/or configuration in place to apply implementation specific
        changes before the request is stored in the database.

        If validation is required, the implementation should raise a
        ValueError if the configuration is invalid. If any exceptions are raised
        during the validation, the trigger will not be created in the
        database.

        The resulted action configuration is serialized back into the trigger
        request before it is stored in the database.

        The implementation should not use this method to provision any external
        resources, as the trigger may not be created in the database if
        the database level validation fails.

        Args:
            trigger: Trigger request.
            config: Action configuration instantiated from the request.
        """
        pass

    def _process_trigger_request(
        self, trigger: TriggerResponse, config: ActionConfig
    ) -> None:
        """Process a trigger request after it is created in the database.

        Concrete action handlers should override this method to add
        implementation specific functionality pertaining to the creation of
        a new trigger. The implementation may also modify the trigger
        response and/or configuration in place to apply implementation specific
        changes before the response is returned to the user.

        The resulted configuration is serialized back into the trigger
        response before it is returned to the user.

        The implementation should use this method to provision any external
        resources required for the trigger. If any of the provisioning
        fails, the implementation should raise an exception and the trigger
        will be deleted from the database.

        Args:
            trigger: Newly created trigger
            config: Action configuration instantiated from the response.
        """
        pass

    def _validate_trigger_update(
        self,
        trigger: TriggerResponse,
        config: ActionConfig,
        trigger_update: TriggerUpdate,
        config_update: ActionConfig,
    ) -> None:
        """Validate a trigger update before it is reflected in the database.

        Concrete action handlers should override this method to add
        implementation specific functionality pertaining to validation of an
        trigger update request. The implementation may also modify the
        trigger update and/or configuration update in place to apply
        implementation specific changes.

        If validation is required, the implementation should raise a
        ValueError if the configuration update is invalid. If any exceptions are
        raised during the validation, the trigger will not be updated in
        the database.

        The resulted configuration update is serialized back into the event
        source update before it is stored in the database.

        The implementation should not use this method to provision any external
        resources, as the trigger may not be updated in the database if
        the database level validation fails.

        Args:
            trigger: Original trigger before the update.
            config: Action configuration instantiated from the original
                trigger.
            trigger_update: Trigger update request.
            config_update: Action configuration instantiated from the
                updated trigger.
        """
        pass

    def _process_trigger_update(
        self,
        trigger: TriggerResponse,
        config: ActionConfig,
        previous_trigger: TriggerResponse,
        previous_config: ActionConfig,
    ) -> None:
        """Process a trigger after it is updated in the database.

        Concrete action handlers should override this method to add
        implementation specific functionality pertaining to updating an existing
        trigger. The implementation may also modify the trigger
        and/or configuration in place to apply implementation specific
        changes before the response is returned to the user.

        The resulted configuration is serialized back into the trigger
        response before it is returned to the user.

        The implementation should use this method to provision any external
        resources required for the trigger update. If any of the
        provisioning fails, the implementation should raise an exception and the
        trigger will be rolled back to the previous state in the database.

        Args:
            trigger: Trigger after the update.
            config: Action configuration instantiated from the updated
                trigger.
            previous_trigger: Original trigger before the update.
            previous_config: Action configuration instantiated from the
                original trigger.
        """
        pass

    def _process_trigger_delete(
        self,
        trigger: TriggerResponse,
        config: ActionConfig,
        force: Optional[bool] = False,
    ) -> None:
        """Process a trigger before it is deleted from the database.

        Concrete action handlers should override this method to add
        implementation specific functionality pertaining to the deletion of an
        trigger.

        The implementation should use this method to deprovision any external
        resources required for the trigger. If any of the deprovisioning
        fails, the implementation should raise an exception and the
        trigger will kept in the database (unless force is True, in which
        case the trigger will be deleted from the database regardless of
        any deprovisioning failures).

        Args:
            trigger: Trigger before the deletion.
            config: Action configuration before the deletion.
            force: Whether to force deprovision the trigger.
        """
        pass

    def _process_trigger_response(
        self, trigger: TriggerResponse, config: ActionConfig
    ) -> None:
        """Process a trigger response before it is returned to the user.

        Concrete action handlers should override this method to add
        implementation specific functionality pertaining to how the trigger
        response is returned to the user. The implementation may modify the
        the trigger response and/or configuration in place.

        The resulted configuration is serialized back into the trigger
        response before it is returned to the user.

        This method is applied to all trigger responses fetched from the
        database before they are returned to the user, with the exception of
        those returned from the `create_trigger`, `update_trigger`,
        and `delete_trigger` methods, which have their own specific
        processing methods.

        Args:
            trigger: Trigger response.
            config: Action configuration instantiated from the response.
        """
        pass

    def _populate_trigger_response_resources(
        self, trigger: TriggerResponse, config: ActionConfig
    ) -> None:
        """Populate related resources for the trigger response.

        Concrete action handlers should override this method to add
        implementation specific related resources to the trigger response.

        This method is applied to all trigger responses fetched from the
        database before they are returned to the user, including
        those returned from the `create_trigger`, `update_trigger`,
        and `delete_trigger` methods.

        Args:
            trigger: Trigger response.
            config: Action configuration instantiated from the response.
        """
        if trigger.resources is None:
            # We only populate the resources if the resources field is already
            # set by the database by means of hydration.
            return
        extract_resources = self.extract_resources(config)
        for resource_type, resource in extract_resources.items():
            setattr(trigger.resources, str(resource_type), resource)
config_class: Type[zenml.actions.base_action.ActionConfig] property readonly

Returns the BasePluginConfig config.

Returns:

Type Description
Type[zenml.actions.base_action.ActionConfig]

The configuration.

event_hub: BaseEventHub property readonly

Get the event hub used to dispatch events.

Returns:

Type Description
BaseEventHub

The event hub.

Exceptions:

Type Description
RuntimeError

if an event hub isn't configured.

flavor_class: Type[zenml.actions.base_action.BaseActionFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[zenml.actions.base_action.BaseActionFlavor]

The flavor class of the plugin.

__init__(self, event_hub=None) special

Event source handler initialization.

Parameters:

Name Type Description Default
event_hub Optional[zenml.event_hub.base_event_hub.BaseEventHub]

Optional event hub to use to initialize the event source handler. If not set during initialization, it may be set at a later time by calling set_event_hub. An event hub must be configured before the event handler needs to dispatch events.

None
Source code in zenml/actions/base_action.py
def __init__(self, event_hub: Optional[BaseEventHub] = None) -> None:
    """Event source handler initialization.

    Args:
        event_hub: Optional event hub to use to initialize the event source
            handler. If not set during initialization, it may be set
            at a later time by calling `set_event_hub`. An event hub must
            be configured before the event handler needs to dispatch events.
    """
    super().__init__()
    if event_hub is None:
        from zenml.event_hub.event_hub import (
            event_hub as default_event_hub,
        )

        # TODO: for now, we use the default internal event hub. In
        # the future, this should be configurable.
        event_hub = default_event_hub

    self.set_event_hub(event_hub)
create_trigger(self, trigger)

Process a trigger request and create the trigger in the database.

Parameters:

Name Type Description Default
trigger TriggerRequest

Trigger request.

required

Returns:

Type Description
TriggerResponse

The created trigger.

noqa: DAR401
Source code in zenml/actions/base_action.py
def create_trigger(self, trigger: TriggerRequest) -> TriggerResponse:
    """Process a trigger request and create the trigger in the database.

    Args:
        trigger: Trigger request.

    Returns:
        The created trigger.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the request
    config = self.validate_action_configuration(trigger.action)
    # Call the implementation specific method to validate the request
    # before it is sent to the database
    self._validate_trigger_request(trigger=trigger, config=config)
    # Serialize the configuration back into the request
    trigger.action = config.dict(exclude_none=True)
    # Create the trigger in the database
    trigger_response = self.zen_store.create_trigger(trigger=trigger)
    try:
        # Instantiate the configuration from the response
        config = self.validate_action_configuration(trigger.action)
        # Call the implementation specific method to process the created
        # trigger
        self._process_trigger_request(
            trigger=trigger_response, config=config
        )
        # Add any implementation specific related resources to the trigger
        # response
        self._populate_trigger_response_resources(
            trigger=trigger_response, config=config
        )
        # Activate the trigger in the event hub to effectively start
        # dispatching events to the action handler
        self.event_hub.activate_trigger(trigger=trigger_response)
    except Exception:
        # If the trigger creation fails, delete the trigger from
        # the database
        logger.exception(
            f"Failed to create trigger {trigger_response}. "
            f"Deleting the trigger."
        )
        self.zen_store.delete_trigger(trigger_id=trigger_response.id)
        raise

    # Serialize the configuration back into the response
    trigger_response.set_action(config.dict(exclude_none=True))
    # Return the response to the user
    return trigger_response
delete_trigger(self, trigger, force=False)

Process a trigger delete request and delete the trigger in the database.

Parameters:

Name Type Description Default
trigger TriggerResponse

The trigger to delete.

required
force bool

Whether to force delete the trigger from the database even if the trigger handler fails to delete the event source.

False
noqa: DAR401
Source code in zenml/actions/base_action.py
def delete_trigger(
    self,
    trigger: TriggerResponse,
    force: bool = False,
) -> None:
    """Process a trigger delete request and delete the trigger in the database.

    Args:
        trigger: The trigger to delete.
        force: Whether to force delete the trigger from the database
            even if the trigger handler fails to delete the event
            source.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_action_configuration(trigger.action)
    try:
        # Call the implementation specific method to process the deleted
        # trigger before it is deleted from the database
        self._process_trigger_delete(
            trigger=trigger,
            config=config,
            force=force,
        )
    except Exception:
        logger.exception(f"Failed to delete trigger {trigger}. ")
        if not force:
            raise

        logger.warning(f"Force deleting trigger {trigger}.")

    # Deactivate the trigger in the event hub
    self.event_hub.deactivate_trigger(trigger=trigger)

    # Delete the trigger from the database
    self.zen_store.delete_trigger(
        trigger_id=trigger.id,
    )
event_hub_callback(self, config, trigger_execution, auth_context)

Callback to be used by the event hub to dispatch events to the action handler.

Parameters:

Name Type Description Default
config Dict[str, Any]

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in zenml/actions/base_action.py
def event_hub_callback(
    self,
    config: Dict[str, Any],
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Callback to be used by the event hub to dispatch events to the action handler.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
    try:
        config_obj = self.config_class(**config)
    except ValueError as e:
        logger.exception(
            f"Action handler {self.flavor_class.FLAVOR} of type "
            f"{self.flavor_class.SUBTYPE} received an invalid configuration "
            f"from the event hub: {e}."
        )
        return

    try:
        # TODO: this would be a great place to convert the event back into its
        # original form and pass it to the action handler.
        self.run(
            config=config_obj,
            trigger_execution=trigger_execution,
            auth_context=auth_context,
        )
    except Exception:
        # Don't let the event hub crash if the action handler fails
        # TODO: we might want to return a value here indicating to the event
        # hub that the action handler failed to execute the action. This can
        # be used by the event hub to retry the action handler or to log the
        # failure.
        logger.exception(
            f"Action handler {self.flavor_class.FLAVOR} of type "
            f"{self.flavor_class.SUBTYPE} failed to execute the action "
            f"with configuration {config}."
        )
extract_resources(self, action_config)

Extract related resources for this action.

Parameters:

Name Type Description Default
action_config ActionConfig

Action configuration from which to extract related resources.

required

Returns:

Type Description
Dict[zenml.zen_server.rbac.models.ResourceType, pydantic.generics.BaseResponse[Any, Any, Any]]

List of resources related to the action.

Source code in zenml/actions/base_action.py
def extract_resources(
    self,
    action_config: ActionConfig,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
    """Extract related resources for this action.

    Args:
        action_config: Action configuration from which to extract related
            resources.

    Returns:
        List of resources related to the action.
    """
    return {}
get_trigger(self, trigger, hydrate=False)

Process a trigger response before it is returned to the user.

Parameters:

Name Type Description Default
trigger TriggerResponse

The trigger fetched from the database.

required
hydrate bool

Whether to hydrate the trigger.

False

Returns:

Type Description
TriggerResponse

The trigger.

Source code in zenml/actions/base_action.py
def get_trigger(
    self, trigger: TriggerResponse, hydrate: bool = False
) -> TriggerResponse:
    """Process a trigger response before it is returned to the user.

    Args:
        trigger: The trigger fetched from the database.
        hydrate: Whether to hydrate the trigger.

    Returns:
        The trigger.
    """
    if hydrate:
        # Instantiate the configuration from the response
        config = self.validate_action_configuration(trigger.action)
        # Call the implementation specific method to process the response
        self._process_trigger_response(trigger=trigger, config=config)
        # Serialize the configuration back into the response
        trigger.set_action(config.dict(exclude_none=True))
        # Add any implementation specific related resources to the trigger
        # response
        self._populate_trigger_response_resources(
            trigger=trigger, config=config
        )

    # Return the response to the user
    return trigger
run(self, config, trigger_execution, auth_context)

Execute an action.

Parameters:

Name Type Description Default
config ActionConfig

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in zenml/actions/base_action.py
@abstractmethod
def run(
    self,
    config: ActionConfig,
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Execute an action.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
set_event_hub(self, event_hub)

Configure an event hub for this event source plugin.

Parameters:

Name Type Description Default
event_hub BaseEventHub

Event hub to be used by this event handler to dispatch events.

required
Source code in zenml/actions/base_action.py
def set_event_hub(self, event_hub: BaseEventHub) -> None:
    """Configure an event hub for this event source plugin.

    Args:
        event_hub: Event hub to be used by this event handler to dispatch
            events.
    """
    self._event_hub = event_hub
    self._event_hub.subscribe_action_handler(
        action_flavor=self.flavor_class.FLAVOR,
        action_subtype=self.flavor_class.SUBTYPE,
        callback=self.event_hub_callback,
    )
update_trigger(self, trigger, trigger_update)

Process a trigger update request and update the trigger in the database.

Parameters:

Name Type Description Default
trigger TriggerResponse

The trigger to update.

required
trigger_update TriggerUpdate

The update to be applied to the trigger.

required

Returns:

Type Description
TriggerResponse

The updated trigger.

noqa: DAR401
Source code in zenml/actions/base_action.py
def update_trigger(
    self,
    trigger: TriggerResponse,
    trigger_update: TriggerUpdate,
) -> TriggerResponse:
    """Process a trigger update request and update the trigger in the database.

    Args:
        trigger: The trigger to update.
        trigger_update: The update to be applied to the trigger.

    Returns:
        The updated trigger.

    # noqa: DAR401
    """
    # Validate and instantiate the configuration from the original event
    # source
    config = self.validate_action_configuration(trigger.action)
    # Validate and instantiate the configuration from the update request
    # NOTE: if supplied, the configuration update is a full replacement
    # of the original configuration
    config_update = config
    if trigger_update.action is not None:
        config_update = self.validate_action_configuration(
            trigger_update.action
        )
    # Call the implementation specific method to validate the update request
    # before it is sent to the database
    self._validate_trigger_update(
        trigger=trigger,
        config=config,
        trigger_update=trigger_update,
        config_update=config_update,
    )
    # Serialize the configuration update back into the update request
    trigger_update.action = config_update.dict(exclude_none=True)

    # Update the trigger in the database
    trigger_response = self.zen_store.update_trigger(
        trigger_id=trigger.id,
        trigger_update=trigger_update,
    )
    try:
        # Instantiate the configuration from the response
        response_config = self.validate_action_configuration(
            trigger_response.action
        )
        # Call the implementation specific method to process the update
        # request before it is sent to the database
        self._process_trigger_update(
            trigger=trigger_response,
            config=response_config,
            previous_trigger=trigger,
            previous_config=config,
        )
        # Add any implementation specific related resources to the trigger
        # response
        self._populate_trigger_response_resources(
            trigger=trigger_response, config=response_config
        )
        # Deactivate the previous trigger and activate the updated trigger
        # in the event hub
        self.event_hub.deactivate_trigger(trigger=trigger)
        self.event_hub.activate_trigger(trigger=trigger_response)
    except Exception:
        # If the trigger update fails, roll back the trigger in
        # the database to the original state
        logger.exception(
            f"Failed to update trigger {trigger}. "
            f"Rolling back the trigger to the previous state."
        )
        self.zen_store.update_trigger(
            trigger_id=trigger.id,
            trigger_update=TriggerUpdate.from_response(trigger),
        )
        raise

    # Serialize the configuration back into the response
    trigger_response.set_action(response_config.dict(exclude_none=True))
    # Return the response to the user
    return trigger_response
validate_action_configuration(self, action_config)

Validate and return the action configuration.

Parameters:

Name Type Description Default
action_config Dict[str, Any]

The actino configuration to validate.

required

Returns:

Type Description
ActionConfig

The validated action configuration.

Exceptions:

Type Description
ValueError

if the configuration is invalid.

Source code in zenml/actions/base_action.py
def validate_action_configuration(
    self, action_config: Dict[str, Any]
) -> ActionConfig:
    """Validate and return the action configuration.

    Args:
        action_config: The actino configuration to validate.

    Returns:
        The validated action configuration.

    Raises:
        ValueError: if the configuration is invalid.
    """
    try:
        return self.config_class(**action_config)
    except ValueError as e:
        raise ValueError(f"Invalid configuration for action: {e}.") from e

pipeline_run special

pipeline_run_action

Example file of what an action Plugin could look like.

PipelineRunActionConfiguration (ActionConfig) pydantic-model

Configuration class to configure a pipeline run action.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionConfiguration(ActionConfig):
    """Configuration class to configure a pipeline run action."""

    pipeline_deployment_id: UUID
    run_config: Optional[PipelineRunConfiguration] = None
PipelineRunActionFlavor (BaseActionFlavor)

Enables users to configure pipeline run action.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionFlavor(BaseActionFlavor):
    """Enables users to configure pipeline run action."""

    FLAVOR: ClassVar[str] = "builtin"
    SUBTYPE: ClassVar[PluginSubType] = PluginSubType.PIPELINE_RUN
    PLUGIN_CLASS: ClassVar[
        Type[PipelineRunActionHandler]
    ] = PipelineRunActionHandler

    # EventPlugin specific
    ACTION_CONFIG_CLASS: ClassVar[
        Type[PipelineRunActionConfiguration]
    ] = PipelineRunActionConfiguration
ACTION_CONFIG_CLASS (ActionConfig) pydantic-model

Configuration class to configure a pipeline run action.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionConfiguration(ActionConfig):
    """Configuration class to configure a pipeline run action."""

    pipeline_deployment_id: UUID
    run_config: Optional[PipelineRunConfiguration] = None
PLUGIN_CLASS (BaseActionHandler)

Action handler for running pipelines.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionHandler(BaseActionHandler):
    """Action handler for running pipelines."""

    @property
    def config_class(self) -> Type[PipelineRunActionConfiguration]:
        """Returns the `BasePluginConfig` config.

        Returns:
            The configuration.
        """
        return PipelineRunActionConfiguration

    @property
    def flavor_class(self) -> Type[BaseActionFlavor]:
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """
        return PipelineRunActionFlavor

    def run(
        self,
        config: ActionConfig,
        trigger_execution: TriggerExecutionResponse,
        auth_context: AuthContext,
    ) -> None:
        """Execute an action.

        Args:
            config: The action configuration
            trigger_execution: The trigger execution
            auth_context: Authentication context with an API token that can be
                used by external workloads to authenticate with the server
                during the execution of the action. This API token is associated
                with the service account that was configured for the trigger
                that activated the action and has a validity defined by the
                trigger's authentication window.
        """
        from zenml.zen_server.utils import zen_store

        assert isinstance(config, PipelineRunActionConfiguration)

        deployment = zen_store().get_deployment(config.pipeline_deployment_id)
        print("Running deployment:", deployment)
        run_pipeline(
            deployment=deployment,
            run_config=config.run_config,
            auth_context=auth_context,
        )

    def _validate_configuration(
        self, config: PipelineRunActionConfiguration
    ) -> None:
        """Validate a pipeline run action configuration.

        Args:
            config: Pipeline run action configuration.

        Raises:
            ValueError: In case no deployment can be found with the deployment_id
        """
        deployment_id = config.pipeline_deployment_id
        zen_store = GlobalConfiguration().zen_store

        try:
            zen_store.get_deployment(deployment_id=deployment_id)
        except KeyError:
            raise ValueError(f"No deployment found with id {deployment_id}.")

    def _validate_trigger_request(
        self, trigger: TriggerRequest, config: ActionConfig
    ) -> None:
        """Validate a trigger request before it is created in the database.

        Args:
            trigger: Trigger request.
            config: Action configuration instantiated from the request.
        """
        assert isinstance(config, PipelineRunActionConfiguration)

        self._validate_configuration(config)

        # If an expiration window is not set, we set it to the default value
        if trigger.auth_window is None:
            trigger.auth_window = server_config().pipeline_run_auth_window

    def _validate_trigger_update(
        self,
        trigger: TriggerResponse,
        config: ActionConfig,
        trigger_update: TriggerUpdate,
        config_update: ActionConfig,
    ) -> None:
        """Validate a trigger update before it is reflected in the database.

        Args:
            trigger: Original trigger before the update.
            config: Action configuration instantiated from the original
                trigger.
            trigger_update: Trigger update request.
            config_update: Action configuration instantiated from the
                updated trigger.
        """
        assert isinstance(config, PipelineRunActionConfiguration)

        self._validate_configuration(config)

    def extract_resources(
        self,
        action_config: ActionConfig,
    ) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
        """Extract related resources for this action.

        Args:
            action_config: Action configuration from which to extract related
                resources.

        Returns:
            List of resources related to the action.

        Raises:
            ValueError: In case the deployment_id does not exist.
        """
        assert isinstance(action_config, PipelineRunActionConfiguration)

        deployment_id = action_config.pipeline_deployment_id
        zen_store = GlobalConfiguration().zen_store

        try:
            deployment = zen_store.get_deployment(deployment_id=deployment_id)
        except KeyError:
            raise ValueError(f"No deployment found with id {deployment_id}.")

        resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
            ResourceType.PIPELINE_DEPLOYMENT: deployment
        }

        if deployment.pipeline is not None:
            pipeline = zen_store.get_pipeline(
                pipeline_id=deployment.pipeline.id
            )
            resources[ResourceType.PIPELINE] = pipeline

        return resources
config_class: Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration] property readonly

Returns the BasePluginConfig config.

Returns:

Type Description
Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration]

The configuration.

flavor_class: Type[zenml.actions.base_action.BaseActionFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[zenml.actions.base_action.BaseActionFlavor]

The flavor class of the plugin.

extract_resources(self, action_config)

Extract related resources for this action.

Parameters:

Name Type Description Default
action_config ActionConfig

Action configuration from which to extract related resources.

required

Returns:

Type Description
Dict[zenml.zen_server.rbac.models.ResourceType, pydantic.generics.BaseResponse[Any, Any, Any]]

List of resources related to the action.

Exceptions:

Type Description
ValueError

In case the deployment_id does not exist.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def extract_resources(
    self,
    action_config: ActionConfig,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
    """Extract related resources for this action.

    Args:
        action_config: Action configuration from which to extract related
            resources.

    Returns:
        List of resources related to the action.

    Raises:
        ValueError: In case the deployment_id does not exist.
    """
    assert isinstance(action_config, PipelineRunActionConfiguration)

    deployment_id = action_config.pipeline_deployment_id
    zen_store = GlobalConfiguration().zen_store

    try:
        deployment = zen_store.get_deployment(deployment_id=deployment_id)
    except KeyError:
        raise ValueError(f"No deployment found with id {deployment_id}.")

    resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
        ResourceType.PIPELINE_DEPLOYMENT: deployment
    }

    if deployment.pipeline is not None:
        pipeline = zen_store.get_pipeline(
            pipeline_id=deployment.pipeline.id
        )
        resources[ResourceType.PIPELINE] = pipeline

    return resources
run(self, config, trigger_execution, auth_context)

Execute an action.

Parameters:

Name Type Description Default
config ActionConfig

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def run(
    self,
    config: ActionConfig,
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Execute an action.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
    from zenml.zen_server.utils import zen_store

    assert isinstance(config, PipelineRunActionConfiguration)

    deployment = zen_store().get_deployment(config.pipeline_deployment_id)
    print("Running deployment:", deployment)
    run_pipeline(
        deployment=deployment,
        run_config=config.run_config,
        auth_context=auth_context,
    )
PipelineRunActionHandler (BaseActionHandler)

Action handler for running pipelines.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
class PipelineRunActionHandler(BaseActionHandler):
    """Action handler for running pipelines."""

    @property
    def config_class(self) -> Type[PipelineRunActionConfiguration]:
        """Returns the `BasePluginConfig` config.

        Returns:
            The configuration.
        """
        return PipelineRunActionConfiguration

    @property
    def flavor_class(self) -> Type[BaseActionFlavor]:
        """Returns the flavor class of the plugin.

        Returns:
            The flavor class of the plugin.
        """
        return PipelineRunActionFlavor

    def run(
        self,
        config: ActionConfig,
        trigger_execution: TriggerExecutionResponse,
        auth_context: AuthContext,
    ) -> None:
        """Execute an action.

        Args:
            config: The action configuration
            trigger_execution: The trigger execution
            auth_context: Authentication context with an API token that can be
                used by external workloads to authenticate with the server
                during the execution of the action. This API token is associated
                with the service account that was configured for the trigger
                that activated the action and has a validity defined by the
                trigger's authentication window.
        """
        from zenml.zen_server.utils import zen_store

        assert isinstance(config, PipelineRunActionConfiguration)

        deployment = zen_store().get_deployment(config.pipeline_deployment_id)
        print("Running deployment:", deployment)
        run_pipeline(
            deployment=deployment,
            run_config=config.run_config,
            auth_context=auth_context,
        )

    def _validate_configuration(
        self, config: PipelineRunActionConfiguration
    ) -> None:
        """Validate a pipeline run action configuration.

        Args:
            config: Pipeline run action configuration.

        Raises:
            ValueError: In case no deployment can be found with the deployment_id
        """
        deployment_id = config.pipeline_deployment_id
        zen_store = GlobalConfiguration().zen_store

        try:
            zen_store.get_deployment(deployment_id=deployment_id)
        except KeyError:
            raise ValueError(f"No deployment found with id {deployment_id}.")

    def _validate_trigger_request(
        self, trigger: TriggerRequest, config: ActionConfig
    ) -> None:
        """Validate a trigger request before it is created in the database.

        Args:
            trigger: Trigger request.
            config: Action configuration instantiated from the request.
        """
        assert isinstance(config, PipelineRunActionConfiguration)

        self._validate_configuration(config)

        # If an expiration window is not set, we set it to the default value
        if trigger.auth_window is None:
            trigger.auth_window = server_config().pipeline_run_auth_window

    def _validate_trigger_update(
        self,
        trigger: TriggerResponse,
        config: ActionConfig,
        trigger_update: TriggerUpdate,
        config_update: ActionConfig,
    ) -> None:
        """Validate a trigger update before it is reflected in the database.

        Args:
            trigger: Original trigger before the update.
            config: Action configuration instantiated from the original
                trigger.
            trigger_update: Trigger update request.
            config_update: Action configuration instantiated from the
                updated trigger.
        """
        assert isinstance(config, PipelineRunActionConfiguration)

        self._validate_configuration(config)

    def extract_resources(
        self,
        action_config: ActionConfig,
    ) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
        """Extract related resources for this action.

        Args:
            action_config: Action configuration from which to extract related
                resources.

        Returns:
            List of resources related to the action.

        Raises:
            ValueError: In case the deployment_id does not exist.
        """
        assert isinstance(action_config, PipelineRunActionConfiguration)

        deployment_id = action_config.pipeline_deployment_id
        zen_store = GlobalConfiguration().zen_store

        try:
            deployment = zen_store.get_deployment(deployment_id=deployment_id)
        except KeyError:
            raise ValueError(f"No deployment found with id {deployment_id}.")

        resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
            ResourceType.PIPELINE_DEPLOYMENT: deployment
        }

        if deployment.pipeline is not None:
            pipeline = zen_store.get_pipeline(
                pipeline_id=deployment.pipeline.id
            )
            resources[ResourceType.PIPELINE] = pipeline

        return resources
config_class: Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration] property readonly

Returns the BasePluginConfig config.

Returns:

Type Description
Type[zenml.actions.pipeline_run.pipeline_run_action.PipelineRunActionConfiguration]

The configuration.

flavor_class: Type[zenml.actions.base_action.BaseActionFlavor] property readonly

Returns the flavor class of the plugin.

Returns:

Type Description
Type[zenml.actions.base_action.BaseActionFlavor]

The flavor class of the plugin.

extract_resources(self, action_config)

Extract related resources for this action.

Parameters:

Name Type Description Default
action_config ActionConfig

Action configuration from which to extract related resources.

required

Returns:

Type Description
Dict[zenml.zen_server.rbac.models.ResourceType, pydantic.generics.BaseResponse[Any, Any, Any]]

List of resources related to the action.

Exceptions:

Type Description
ValueError

In case the deployment_id does not exist.

Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def extract_resources(
    self,
    action_config: ActionConfig,
) -> Dict[ResourceType, BaseResponse[Any, Any, Any]]:
    """Extract related resources for this action.

    Args:
        action_config: Action configuration from which to extract related
            resources.

    Returns:
        List of resources related to the action.

    Raises:
        ValueError: In case the deployment_id does not exist.
    """
    assert isinstance(action_config, PipelineRunActionConfiguration)

    deployment_id = action_config.pipeline_deployment_id
    zen_store = GlobalConfiguration().zen_store

    try:
        deployment = zen_store.get_deployment(deployment_id=deployment_id)
    except KeyError:
        raise ValueError(f"No deployment found with id {deployment_id}.")

    resources: Dict[ResourceType, BaseResponse[Any, Any, Any]] = {
        ResourceType.PIPELINE_DEPLOYMENT: deployment
    }

    if deployment.pipeline is not None:
        pipeline = zen_store.get_pipeline(
            pipeline_id=deployment.pipeline.id
        )
        resources[ResourceType.PIPELINE] = pipeline

    return resources
run(self, config, trigger_execution, auth_context)

Execute an action.

Parameters:

Name Type Description Default
config ActionConfig

The action configuration

required
trigger_execution TriggerExecutionResponse

The trigger execution

required
auth_context AuthContext

Authentication context with an API token that can be used by external workloads to authenticate with the server during the execution of the action. This API token is associated with the service account that was configured for the trigger that activated the action and has a validity defined by the trigger's authentication window.

required
Source code in zenml/actions/pipeline_run/pipeline_run_action.py
def run(
    self,
    config: ActionConfig,
    trigger_execution: TriggerExecutionResponse,
    auth_context: AuthContext,
) -> None:
    """Execute an action.

    Args:
        config: The action configuration
        trigger_execution: The trigger execution
        auth_context: Authentication context with an API token that can be
            used by external workloads to authenticate with the server
            during the execution of the action. This API token is associated
            with the service account that was configured for the trigger
            that activated the action and has a validity defined by the
            trigger's authentication window.
    """
    from zenml.zen_server.utils import zen_store

    assert isinstance(config, PipelineRunActionConfiguration)

    deployment = zen_store().get_deployment(config.pipeline_deployment_id)
    print("Running deployment:", deployment)
    run_pipeline(
        deployment=deployment,
        run_config=config.run_config,
        auth_context=auth_context,
    )