Skip to content

Stack

zenml.stack special

Initialization of the ZenML Stack.

The stack is essentially all the configuration for the infrastructure of your MLOps platform.

A stack is made up of multiple components. Some examples are:

  • An Artifact Store
  • An Orchestrator
  • A Step Operator (Optional)
  • A Container Registry (Optional)

authentication_mixin

Stack component mixin for authentication.

AuthenticationConfigMixin (StackComponentConfig) pydantic-model

Base config for authentication mixins.

Any stack component that implements AuthenticationMixin should have a config that inherits from this class.

Attributes:

Name Type Description
authentication_secret Optional[str]

Name of the secret that stores the authentication credentials.

Source code in zenml/stack/authentication_mixin.py
class AuthenticationConfigMixin(StackComponentConfig):
    """Base config for authentication mixins.

    Any stack component that implements `AuthenticationMixin` should have a
    config that inherits from this class.

    Attributes:
        authentication_secret: Name of the secret that stores the
            authentication credentials.
    """

    authentication_secret: Optional[str] = None

AuthenticationMixin (StackComponent)

Stack component mixin for authentication.

Any stack component that implements this mixin should have a config that inherits from AuthenticationConfigMixin.

Source code in zenml/stack/authentication_mixin.py
class AuthenticationMixin(StackComponent):
    """Stack component mixin for authentication.

    Any stack component that implements this mixin should have a config that
    inherits from `AuthenticationConfigMixin`.
    """

    @property
    def config(self) -> AuthenticationConfigMixin:
        """Returns the `AuthenticationConfigMixin` config.

        Returns:
            The configuration.
        """
        return cast(AuthenticationConfigMixin, self._config)

    def get_authentication_secret(
        self, expected_schema_type: Type[T]
    ) -> Optional[T]:
        """Gets the secret referred to by the authentication secret attribute.

        Args:
            expected_schema_type: The expected secret schema class.

        Returns:
            The secret object if the `authentication_secret` attribute is set,
            `None` otherwise.

        Raises:
            RuntimeError: If no secrets manager exists in the active stack.
            TypeError: If the secret is not of the expected schema type.
        """
        if not self.config.authentication_secret:
            return None

        # Try to resolve the secret using the secret store first
        try:
            store_secret = Client().get_secret_by_name_and_scope(
                name=self.config.authentication_secret,
            )
        except (KeyError, NotImplementedError):
            pass
        else:
            return expected_schema_type(
                name=self.config.authentication_secret,
                **store_secret.secret_values,
            )

        active_stack = Client().active_stack
        secrets_manager = active_stack.secrets_manager
        if not secrets_manager:
            raise RuntimeError(
                f"Unable to retrieve secret '{self.config.authentication_secret}' "
                "because the active stack does not have a secrets manager."
            )

        secret = secrets_manager.get_secret(self.config.authentication_secret)

        if not isinstance(secret, expected_schema_type):
            raise TypeError(
                f"Authentication secret has type {secret.TYPE} but a secret of "
                f"type {expected_schema_type.TYPE} was expected. To solve this "
                f"issue, register a secret with name "
                f"{self.config.authentication_secret} of type "
                f"{expected_schema_type.TYPE} using the following command: \n "
                f"`zenml secrets-manager secret register {self.config.authentication_secret} "
                f"--schema={expected_schema_type.TYPE} ...`"
            )

        return secret
config: AuthenticationConfigMixin property readonly

Returns the AuthenticationConfigMixin config.

Returns:

Type Description
AuthenticationConfigMixin

The configuration.

get_authentication_secret(self, expected_schema_type)

Gets the secret referred to by the authentication secret attribute.

Parameters:

Name Type Description Default
expected_schema_type Type[~T]

The expected secret schema class.

required

Returns:

Type Description
Optional[~T]

The secret object if the authentication_secret attribute is set, None otherwise.

Exceptions:

Type Description
RuntimeError

If no secrets manager exists in the active stack.

TypeError

If the secret is not of the expected schema type.

Source code in zenml/stack/authentication_mixin.py
def get_authentication_secret(
    self, expected_schema_type: Type[T]
) -> Optional[T]:
    """Gets the secret referred to by the authentication secret attribute.

    Args:
        expected_schema_type: The expected secret schema class.

    Returns:
        The secret object if the `authentication_secret` attribute is set,
        `None` otherwise.

    Raises:
        RuntimeError: If no secrets manager exists in the active stack.
        TypeError: If the secret is not of the expected schema type.
    """
    if not self.config.authentication_secret:
        return None

    # Try to resolve the secret using the secret store first
    try:
        store_secret = Client().get_secret_by_name_and_scope(
            name=self.config.authentication_secret,
        )
    except (KeyError, NotImplementedError):
        pass
    else:
        return expected_schema_type(
            name=self.config.authentication_secret,
            **store_secret.secret_values,
        )

    active_stack = Client().active_stack
    secrets_manager = active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Unable to retrieve secret '{self.config.authentication_secret}' "
            "because the active stack does not have a secrets manager."
        )

    secret = secrets_manager.get_secret(self.config.authentication_secret)

    if not isinstance(secret, expected_schema_type):
        raise TypeError(
            f"Authentication secret has type {secret.TYPE} but a secret of "
            f"type {expected_schema_type.TYPE} was expected. To solve this "
            f"issue, register a secret with name "
            f"{self.config.authentication_secret} of type "
            f"{expected_schema_type.TYPE} using the following command: \n "
            f"`zenml secrets-manager secret register {self.config.authentication_secret} "
            f"--schema={expected_schema_type.TYPE} ...`"
        )

    return secret

flavor

Base ZenML Flavor implementation.

Flavor

Class for ZenML Flavors.

Source code in zenml/stack/flavor.py
class Flavor:
    """Class for ZenML Flavors."""

    @property
    @abstractmethod
    def name(self) -> str:
        """The flavor name.

        Returns:
            The flavor name.
        """

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return None

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return None

    @property
    def logo_url(self) -> Optional[str]:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return None

    @property
    @abstractmethod
    def type(self) -> StackComponentType:
        """The stack component type.

        Returns:
            The stack component type.
        """

    @property
    @abstractmethod
    def implementation_class(self) -> Type[StackComponent]:
        """Implementation class for this flavor.

        Returns:
            The implementation class for this flavor.
        """

    @property
    @abstractmethod
    def config_class(self) -> Type[StackComponentConfig]:
        """Returns `StackComponentConfig` config class.

        Returns:
            The config class.
        """

    @property
    def config_schema(self) -> Dict[str, Any]:
        """The config schema for a flavor.

        Returns:
            The config schema.
        """
        config_schema: Dict[str, Any] = json.loads(
            self.config_class.schema_json()
        )
        return config_schema

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return None

    @classmethod
    def from_model(cls, flavor_model: FlavorResponseModel) -> "Flavor":
        """Loads a flavor from a model.

        Args:
            flavor_model: The model to load from.

        Returns:
            The loaded flavor.
        """
        flavor = source_utils.load(flavor_model.source)()
        return cast(Flavor, flavor)

    def to_model(
        self,
        integration: Optional[str] = None,
        scoped_by_workspace: bool = True,
        is_custom: bool = True,
    ) -> FlavorRequestModel:
        """Converts a flavor to a model.

        Args:
            integration: The integration to use for the model.
            scoped_by_workspace: Whether this flavor should live in the scope
                of the active workspace
            is_custom: Whether the flavor is a custom flavor. Custom flavors
                are then scoped by user and workspace

        Returns:
            The model.
        """
        from zenml.client import Client

        client = Client()
        connector_requirements = self.service_connector_requirements
        connector_type = (
            connector_requirements.connector_type
            if connector_requirements
            else None
        )
        resource_type = (
            connector_requirements.resource_type
            if connector_requirements
            else None
        )
        resource_id_attr = (
            connector_requirements.resource_id_attr
            if connector_requirements
            else None
        )
        model = FlavorRequestModel(
            user=client.active_user.id if is_custom else None,
            workspace=client.active_workspace.id if is_custom else None,
            name=self.name,
            type=self.type,
            source=source_utils.resolve(self.__class__).import_path,
            config_schema=self.config_schema,
            connector_type=connector_type,
            connector_resource_type=resource_type,
            connector_resource_id_attr=resource_id_attr,
            integration=integration,
            logo_url=self.logo_url,
            docs_url=self.docs_url,
            sdk_docs_url=self.sdk_docs_url,
            is_custom=is_custom,
        )
        return model

    def generate_default_docs_url(self, component_name: str = "") -> str:
        """Generate the doc urls for all inbuilt and integration flavors.

        Note that this method is not going to be useful for custom flavors,
        which do not have any docs in the main zenml docs.

        Args:
            component_name: The name of the component for docs generation. Used
                for legacy documentation before ZenML v0.34.0.

        Returns:
            The complete url to the zenml documentation
        """
        from zenml import __version__

        component_type = self.type.plural.replace("_", "-")
        name = self.name.replace("_", "-")
        docs_component_name = component_name or name
        base = f"https://docs.zenml.io/v/{__version__}"
        return f"{base}/user-guide/component-guide/{component_type}/{docs_component_name}"

    def generate_default_sdk_docs_url(self) -> str:
        """Generate SDK docs url for a flavor.

        Returns:
            The complete url to the zenml SDK docs
        """
        from zenml import __version__

        base = f"https://sdkdocs.zenml.io/{__version__}"

        component_type = self.type.plural

        if "zenml.integrations" in self.__module__:
            # Get integration name out of module path which will look something
            #  like this "zenml.integrations.<integration>....
            integration = self.__module__.split(
                "zenml.integrations.", maxsplit=1
            )[1].split(".")[0]

            return (
                f"{base}/integration_code_docs"
                f"/integrations-{integration}/#{self.__module__}"
            )

        else:
            return (
                f"{base}/core_code_docs/core-{component_type}/"
                f"#{self.__module__}"
            )
config_class: Type[zenml.stack.stack_component.StackComponentConfig] property readonly

Returns StackComponentConfig config class.

Returns:

Type Description
Type[zenml.stack.stack_component.StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property readonly

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[zenml.stack.stack_component.StackComponent] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[zenml.stack.stack_component.StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str property readonly

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

type: StackComponentType property readonly

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

from_model(flavor_model) classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponseModel

The model to load from.

required

Returns:

Type Description
Flavor

The loaded flavor.

Source code in zenml/stack/flavor.py
@classmethod
def from_model(cls, flavor_model: FlavorResponseModel) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Returns:
        The loaded flavor.
    """
    flavor = source_utils.load(flavor_model.source)()
    return cast(Flavor, flavor)
generate_default_docs_url(self, component_name='')

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Parameters:

Name Type Description Default
component_name str

The name of the component for docs generation. Used for legacy documentation before ZenML v0.34.0.

''

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in zenml/stack/flavor.py
def generate_default_docs_url(self, component_name: str = "") -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Args:
        component_name: The name of the component for docs generation. Used
            for legacy documentation before ZenML v0.34.0.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")
    docs_component_name = component_name or name
    base = f"https://docs.zenml.io/v/{__version__}"
    return f"{base}/user-guide/component-guide/{component_type}/{docs_component_name}"
generate_default_sdk_docs_url(self)

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in zenml/stack/flavor.py
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(self, integration=None, scoped_by_workspace=True, is_custom=True)

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
scoped_by_workspace bool

Whether this flavor should live in the scope of the active workspace

True
is_custom bool

Whether the flavor is a custom flavor. Custom flavors are then scoped by user and workspace

True

Returns:

Type Description
FlavorRequestModel

The model.

Source code in zenml/stack/flavor.py
def to_model(
    self,
    integration: Optional[str] = None,
    scoped_by_workspace: bool = True,
    is_custom: bool = True,
) -> FlavorRequestModel:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        scoped_by_workspace: Whether this flavor should live in the scope
            of the active workspace
        is_custom: Whether the flavor is a custom flavor. Custom flavors
            are then scoped by user and workspace

    Returns:
        The model.
    """
    from zenml.client import Client

    client = Client()
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )
    model = FlavorRequestModel(
        user=client.active_user.id if is_custom else None,
        workspace=client.active_workspace.id if is_custom else None,
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

validate_flavor_source(source, component_type)

Import a StackComponent class from a given source and validate its type.

Parameters:

Name Type Description Default
source str

source path of the implementation

required
component_type StackComponentType

the type of the stack component

required

Returns:

Type Description
Type[Flavor]

the imported class

Exceptions:

Type Description
ValueError

If ZenML cannot find the given module path

TypeError

If the given module path does not point to a subclass of a StackComponent which has the right component type.

Source code in zenml/stack/flavor.py
def validate_flavor_source(
    source: str, component_type: StackComponentType
) -> Type["Flavor"]:
    """Import a StackComponent class from a given source and validate its type.

    Args:
        source: source path of the implementation
        component_type: the type of the stack component

    Returns:
        the imported class

    Raises:
        ValueError: If ZenML cannot find the given module path
        TypeError: If the given module path does not point to a subclass of a
            StackComponent which has the right component type.
    """
    from zenml.stack.stack_component import (
        StackComponent,
        StackComponentConfig,
    )
    from zenml.utils import source_utils

    try:
        flavor_class = source_utils.load(source)
    except (ValueError, AttributeError, ImportError) as e:
        raise ValueError(
            f"ZenML can not import the flavor class '{source}': {e}"
        )

    if not (
        isinstance(flavor_class, type) and issubclass(flavor_class, Flavor)
    ):
        raise TypeError(
            f"The source '{source}' does not point to a subclass of the ZenML"
            f"Flavor."
        )

    flavor = flavor_class()
    try:
        impl_class = flavor.implementation_class
    except (ModuleNotFoundError, ImportError, NotImplementedError) as e:
        raise ValueError(
            f"The implementation class defined within the "
            f"'{flavor_class.__name__}' can not be imported: {e}"
        )

    if not issubclass(impl_class, StackComponent):
        raise TypeError(
            f"The implementation class '{impl_class.__name__}' of a flavor "
            f"needs to be a subclass of the ZenML StackComponent."
        )

    if flavor.type != component_type:  # noqa
        raise TypeError(
            f"The source points to a {impl_class.type}, not a "  # noqa
            f"{component_type}."
        )

    try:
        conf_class = flavor.config_class
    except (ModuleNotFoundError, ImportError, NotImplementedError) as e:
        raise ValueError(
            f"The config class defined within the "
            f"'{flavor_class.__name__}' can not be imported: {e}"
        )

    if not issubclass(conf_class, StackComponentConfig):
        raise TypeError(
            f"The config class '{conf_class.__name__}' of a flavor "
            f"needs to be a subclass of the ZenML StackComponentConfig."
        )

    return flavor_class

flavor_registry

Implementation of the ZenML flavor registry.

FlavorRegistry

Registry for stack component flavors.

The flavors defined by ZenML must be registered here.

Source code in zenml/stack/flavor_registry.py
class FlavorRegistry:
    """Registry for stack component flavors.

    The flavors defined by ZenML must be registered here.
    """

    def __init__(self) -> None:
        """Initialization of the flavors."""
        self._flavors: DefaultDict[
            StackComponentType, Dict[str, FlavorResponseModel]
        ] = defaultdict(dict)

    def register_flavors(self, store: BaseZenStore) -> None:
        """Register all flavors to the DB.

        Args:
            store: The instance of a store to use for persistence
        """
        self.register_builtin_flavors(store=store)
        self.register_integration_flavors(store=store)

    @property
    def builtin_flavors(self) -> List[Type[Flavor]]:
        """A list of all default in-built flavors.

        Returns:
            A list of builtin flavors.
        """
        from zenml.artifact_stores import LocalArtifactStoreFlavor
        from zenml.container_registries import (
            AzureContainerRegistryFlavor,
            DefaultContainerRegistryFlavor,
            DockerHubContainerRegistryFlavor,
            GCPContainerRegistryFlavor,
            GitHubContainerRegistryFlavor,
        )
        from zenml.image_builders import LocalImageBuilderFlavor
        from zenml.orchestrators import (
            LocalDockerOrchestratorFlavor,
            LocalOrchestratorFlavor,
        )
        from zenml.secrets_managers import LocalSecretsManagerFlavor

        flavors = [
            LocalArtifactStoreFlavor,
            LocalOrchestratorFlavor,
            LocalDockerOrchestratorFlavor,
            DefaultContainerRegistryFlavor,
            AzureContainerRegistryFlavor,
            DockerHubContainerRegistryFlavor,
            GCPContainerRegistryFlavor,
            GitHubContainerRegistryFlavor,
            LocalSecretsManagerFlavor,
            LocalImageBuilderFlavor,
        ]
        return flavors

    @property
    def integration_flavors(self) -> List[Type[Flavor]]:
        """A list of all default integration flavors.

        Returns:
            A list of integration flavors.
        """
        integrated_flavors = []
        for _, integration in integration_registry.integrations.items():
            for flavor in integration.flavors():
                integrated_flavors.append(flavor)

        return integrated_flavors

    def register_builtin_flavors(self, store: BaseZenStore) -> None:
        """Registers the default built-in flavors.

        Args:
            store: The instance of the zen_store to use
        """
        for flavor in self.builtin_flavors:
            flavor_request_model = flavor().to_model(
                integration="built-in",
                scoped_by_workspace=False,
                is_custom=False,
            )
            existing_flavor = store.list_flavors(
                FlavorFilterModel(
                    name=flavor_request_model.name,
                    type=flavor_request_model.type,
                )
            )
            if len(existing_flavor) == 0:
                store.create_flavor(flavor_request_model)
            else:
                flavor_update_model = FlavorUpdateModel.parse_obj(
                    flavor_request_model
                )
                store.update_flavor(existing_flavor[0].id, flavor_update_model)

    @staticmethod
    def register_integration_flavors(store: BaseZenStore) -> None:
        """Registers the flavors implemented by integrations.

        Args:
            store: The instance of the zen_store to use
        """
        for name, integration in integration_registry.integrations.items():
            try:
                integrated_flavors = integration.flavors()
                for flavor in integrated_flavors:
                    flavor_request_model = flavor().to_model(
                        integration=name,
                        scoped_by_workspace=False,
                        is_custom=False,
                    )
                    existing_flavor = store.list_flavors(
                        FlavorFilterModel(
                            name=flavor_request_model.name,
                            type=flavor_request_model.type,
                        )
                    )
                    if len(existing_flavor) == 0:
                        store.create_flavor(flavor_request_model)
                    else:
                        flavor_update_model = FlavorUpdateModel.parse_obj(
                            flavor_request_model
                        )
                        store.update_flavor(
                            existing_flavor[0].id, flavor_update_model
                        )
            except Exception as e:
                logger.warning(
                    f"Integration {name} failed to register flavors. "
                    f"Error: {e}"
                )
builtin_flavors: List[Type[zenml.stack.flavor.Flavor]] property readonly

A list of all default in-built flavors.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

A list of builtin flavors.

integration_flavors: List[Type[zenml.stack.flavor.Flavor]] property readonly

A list of all default integration flavors.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

A list of integration flavors.

__init__(self) special

Initialization of the flavors.

Source code in zenml/stack/flavor_registry.py
def __init__(self) -> None:
    """Initialization of the flavors."""
    self._flavors: DefaultDict[
        StackComponentType, Dict[str, FlavorResponseModel]
    ] = defaultdict(dict)
register_builtin_flavors(self, store)

Registers the default built-in flavors.

Parameters:

Name Type Description Default
store BaseZenStore

The instance of the zen_store to use

required
Source code in zenml/stack/flavor_registry.py
def register_builtin_flavors(self, store: BaseZenStore) -> None:
    """Registers the default built-in flavors.

    Args:
        store: The instance of the zen_store to use
    """
    for flavor in self.builtin_flavors:
        flavor_request_model = flavor().to_model(
            integration="built-in",
            scoped_by_workspace=False,
            is_custom=False,
        )
        existing_flavor = store.list_flavors(
            FlavorFilterModel(
                name=flavor_request_model.name,
                type=flavor_request_model.type,
            )
        )
        if len(existing_flavor) == 0:
            store.create_flavor(flavor_request_model)
        else:
            flavor_update_model = FlavorUpdateModel.parse_obj(
                flavor_request_model
            )
            store.update_flavor(existing_flavor[0].id, flavor_update_model)
register_flavors(self, store)

Register all flavors to the DB.

Parameters:

Name Type Description Default
store BaseZenStore

The instance of a store to use for persistence

required
Source code in zenml/stack/flavor_registry.py
def register_flavors(self, store: BaseZenStore) -> None:
    """Register all flavors to the DB.

    Args:
        store: The instance of a store to use for persistence
    """
    self.register_builtin_flavors(store=store)
    self.register_integration_flavors(store=store)
register_integration_flavors(store) staticmethod

Registers the flavors implemented by integrations.

Parameters:

Name Type Description Default
store BaseZenStore

The instance of the zen_store to use

required
Source code in zenml/stack/flavor_registry.py
@staticmethod
def register_integration_flavors(store: BaseZenStore) -> None:
    """Registers the flavors implemented by integrations.

    Args:
        store: The instance of the zen_store to use
    """
    for name, integration in integration_registry.integrations.items():
        try:
            integrated_flavors = integration.flavors()
            for flavor in integrated_flavors:
                flavor_request_model = flavor().to_model(
                    integration=name,
                    scoped_by_workspace=False,
                    is_custom=False,
                )
                existing_flavor = store.list_flavors(
                    FlavorFilterModel(
                        name=flavor_request_model.name,
                        type=flavor_request_model.type,
                    )
                )
                if len(existing_flavor) == 0:
                    store.create_flavor(flavor_request_model)
                else:
                    flavor_update_model = FlavorUpdateModel.parse_obj(
                        flavor_request_model
                    )
                    store.update_flavor(
                        existing_flavor[0].id, flavor_update_model
                    )
        except Exception as e:
            logger.warning(
                f"Integration {name} failed to register flavors. "
                f"Error: {e}"
            )

stack

Implementation of the ZenML Stack class.

Stack

ZenML stack class.

A ZenML stack is a collection of multiple stack components that are required to run ZenML pipelines. Some of these components (orchestrator, and artifact store) are required to run any kind of pipeline, other components like the container registry are only required if other stack components depend on them.

Source code in zenml/stack/stack.py
class Stack:
    """ZenML stack class.

    A ZenML stack is a collection of multiple stack components that are
    required to run ZenML pipelines. Some of these components (orchestrator,
    and artifact store) are required to run any kind of
    pipeline, other components like the container registry are only required
    if other stack components depend on them.
    """

    def __init__(
        self,
        id: UUID,
        name: str,
        *,
        orchestrator: "BaseOrchestrator",
        artifact_store: "BaseArtifactStore",
        container_registry: Optional["BaseContainerRegistry"] = None,
        secrets_manager: Optional["BaseSecretsManager"] = None,
        step_operator: Optional["BaseStepOperator"] = None,
        feature_store: Optional["BaseFeatureStore"] = None,
        model_deployer: Optional["BaseModelDeployer"] = None,
        experiment_tracker: Optional["BaseExperimentTracker"] = None,
        alerter: Optional["BaseAlerter"] = None,
        annotator: Optional["BaseAnnotator"] = None,
        data_validator: Optional["BaseDataValidator"] = None,
        image_builder: Optional["BaseImageBuilder"] = None,
        model_registry: Optional["BaseModelRegistry"] = None,
    ):
        """Initializes and validates a stack instance.

        Args:
            id: Unique ID of the stack.
            name: Name of the stack.
            orchestrator: Orchestrator component of the stack.
            artifact_store: Artifact store component of the stack.
            container_registry: Container registry component of the stack.
            secrets_manager: Secrets manager component of the stack.
            step_operator: Step operator component of the stack.
            feature_store: Feature store component of the stack.
            model_deployer: Model deployer component of the stack.
            experiment_tracker: Experiment tracker component of the stack.
            alerter: Alerter component of the stack.
            annotator: Annotator component of the stack.
            data_validator: Data validator component of the stack.
            image_builder: Image builder component of the stack.
            model_registry: Model registry component of the stack.
        """
        self._id = id
        self._name = name
        self._orchestrator = orchestrator
        self._artifact_store = artifact_store
        self._container_registry = container_registry
        self._step_operator = step_operator
        self._secrets_manager = secrets_manager
        self._feature_store = feature_store
        self._model_deployer = model_deployer
        self._experiment_tracker = experiment_tracker
        self._alerter = alerter
        self._annotator = annotator
        self._data_validator = data_validator
        self._model_registry = model_registry
        self._image_builder = image_builder

    @classmethod
    def from_model(cls, stack_model: StackResponseModel) -> "Stack":
        """Creates a Stack instance from a StackModel.

        Args:
            stack_model: The StackModel to create the Stack from.

        Returns:
            The created Stack instance.
        """
        from zenml.stack import StackComponent

        stack_components = {
            type_: StackComponent.from_model(model[0])
            for type_, model in stack_model.components.items()
        }
        return Stack.from_components(
            id=stack_model.id,
            name=stack_model.name,
            components=stack_components,
        )

    @classmethod
    def from_components(
        cls,
        id: UUID,
        name: str,
        components: Dict[StackComponentType, "StackComponent"],
    ) -> "Stack":
        """Creates a stack instance from a dict of stack components.

        # noqa: DAR402

        Args:
            id: Unique ID of the stack.
            name: The name of the stack.
            components: The components of the stack.

        Returns:
            A stack instance consisting of the given components.

        Raises:
            TypeError: If a required component is missing or a component
                doesn't inherit from the expected base class.
        """
        from zenml.alerter import BaseAlerter
        from zenml.annotators import BaseAnnotator
        from zenml.artifact_stores import BaseArtifactStore
        from zenml.container_registries import BaseContainerRegistry
        from zenml.data_validators import BaseDataValidator
        from zenml.experiment_trackers import BaseExperimentTracker
        from zenml.feature_stores import BaseFeatureStore
        from zenml.image_builders import BaseImageBuilder
        from zenml.model_deployers import BaseModelDeployer
        from zenml.model_registries import BaseModelRegistry
        from zenml.orchestrators import BaseOrchestrator
        from zenml.secrets_managers import BaseSecretsManager
        from zenml.step_operators import BaseStepOperator

        def _raise_type_error(
            component: Optional["StackComponent"], expected_class: Type[Any]
        ) -> NoReturn:
            """Raises a TypeError that the component has an unexpected type.

            Args:
                component: The component that has an unexpected type.
                expected_class: The expected type of the component.

            Raises:
                TypeError: If the component has an unexpected type.
            """
            raise TypeError(
                f"Unable to create stack: Wrong stack component type "
                f"`{component.__class__.__name__}` (expected: subclass "
                f"of `{expected_class.__name__}`)"
            )

        orchestrator = components.get(StackComponentType.ORCHESTRATOR)
        if not isinstance(orchestrator, BaseOrchestrator):
            _raise_type_error(orchestrator, BaseOrchestrator)

        artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
        if not isinstance(artifact_store, BaseArtifactStore):
            _raise_type_error(artifact_store, BaseArtifactStore)

        container_registry = components.get(
            StackComponentType.CONTAINER_REGISTRY
        )
        if container_registry is not None and not isinstance(
            container_registry, BaseContainerRegistry
        ):
            _raise_type_error(container_registry, BaseContainerRegistry)

        secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
        if secrets_manager is not None and not isinstance(
            secrets_manager, BaseSecretsManager
        ):
            _raise_type_error(secrets_manager, BaseSecretsManager)

        step_operator = components.get(StackComponentType.STEP_OPERATOR)
        if step_operator is not None and not isinstance(
            step_operator, BaseStepOperator
        ):
            _raise_type_error(step_operator, BaseStepOperator)

        feature_store = components.get(StackComponentType.FEATURE_STORE)
        if feature_store is not None and not isinstance(
            feature_store, BaseFeatureStore
        ):
            _raise_type_error(feature_store, BaseFeatureStore)

        model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
        if model_deployer is not None and not isinstance(
            model_deployer, BaseModelDeployer
        ):
            _raise_type_error(model_deployer, BaseModelDeployer)

        experiment_tracker = components.get(
            StackComponentType.EXPERIMENT_TRACKER
        )
        if experiment_tracker is not None and not isinstance(
            experiment_tracker, BaseExperimentTracker
        ):
            _raise_type_error(experiment_tracker, BaseExperimentTracker)

        alerter = components.get(StackComponentType.ALERTER)
        if alerter is not None and not isinstance(alerter, BaseAlerter):
            _raise_type_error(alerter, BaseAlerter)

        annotator = components.get(StackComponentType.ANNOTATOR)
        if annotator is not None and not isinstance(annotator, BaseAnnotator):
            _raise_type_error(annotator, BaseAnnotator)

        data_validator = components.get(StackComponentType.DATA_VALIDATOR)
        if data_validator is not None and not isinstance(
            data_validator, BaseDataValidator
        ):
            _raise_type_error(data_validator, BaseDataValidator)

        image_builder = components.get(StackComponentType.IMAGE_BUILDER)
        if image_builder is not None and not isinstance(
            image_builder, BaseImageBuilder
        ):
            _raise_type_error(image_builder, BaseImageBuilder)

        model_registry = components.get(StackComponentType.MODEL_REGISTRY)
        if model_registry is not None and not isinstance(
            model_registry, BaseModelRegistry
        ):
            _raise_type_error(model_registry, BaseModelRegistry)

        return Stack(
            id=id,
            name=name,
            orchestrator=orchestrator,
            artifact_store=artifact_store,
            container_registry=container_registry,
            secrets_manager=secrets_manager,
            step_operator=step_operator,
            feature_store=feature_store,
            model_deployer=model_deployer,
            experiment_tracker=experiment_tracker,
            alerter=alerter,
            annotator=annotator,
            data_validator=data_validator,
            image_builder=image_builder,
            model_registry=model_registry,
        )

    @property
    def components(self) -> Dict[StackComponentType, "StackComponent"]:
        """All components of the stack.

        Returns:
            A dictionary of all components of the stack.
        """
        return {
            component.type: component
            for component in [
                self.orchestrator,
                self.artifact_store,
                self.container_registry,
                self.secrets_manager,
                self.step_operator,
                self.feature_store,
                self.model_deployer,
                self.experiment_tracker,
                self.alerter,
                self.annotator,
                self.data_validator,
                self.image_builder,
                self.model_registry,
            ]
            if component is not None
        }

    @property
    def id(self) -> UUID:
        """The ID of the stack.

        Returns:
            The ID of the stack.
        """
        return self._id

    @property
    def name(self) -> str:
        """The name of the stack.

        Returns:
            str: The name of the stack.
        """
        return self._name

    @property
    def orchestrator(self) -> "BaseOrchestrator":
        """The orchestrator of the stack.

        Returns:
            The orchestrator of the stack.
        """
        return self._orchestrator

    @property
    def artifact_store(self) -> "BaseArtifactStore":
        """The artifact store of the stack.

        Returns:
            The artifact store of the stack.
        """
        return self._artifact_store

    @property
    def container_registry(self) -> Optional["BaseContainerRegistry"]:
        """The container registry of the stack.

        Returns:
            The container registry of the stack or None if the stack does not
            have a container registry.
        """
        return self._container_registry

    @property
    def secrets_manager(self) -> Optional["BaseSecretsManager"]:
        """The secrets manager of the stack.

        Returns:
            The secrets manager of the stack.
        """
        return self._secrets_manager

    @property
    def step_operator(self) -> Optional["BaseStepOperator"]:
        """The step operator of the stack.

        Returns:
            The step operator of the stack.
        """
        return self._step_operator

    @property
    def feature_store(self) -> Optional["BaseFeatureStore"]:
        """The feature store of the stack.

        Returns:
            The feature store of the stack.
        """
        return self._feature_store

    @property
    def model_deployer(self) -> Optional["BaseModelDeployer"]:
        """The model deployer of the stack.

        Returns:
            The model deployer of the stack.
        """
        return self._model_deployer

    @property
    def experiment_tracker(self) -> Optional["BaseExperimentTracker"]:
        """The experiment tracker of the stack.

        Returns:
            The experiment tracker of the stack.
        """
        return self._experiment_tracker

    @property
    def alerter(self) -> Optional["BaseAlerter"]:
        """The alerter of the stack.

        Returns:
            The alerter of the stack.
        """
        return self._alerter

    @property
    def annotator(self) -> Optional["BaseAnnotator"]:
        """The annotator of the stack.

        Returns:
            The annotator of the stack.
        """
        return self._annotator

    @property
    def data_validator(self) -> Optional["BaseDataValidator"]:
        """The data validator of the stack.

        Returns:
            The data validator of the stack.
        """
        return self._data_validator

    @property
    def image_builder(self) -> Optional["BaseImageBuilder"]:
        """The image builder of the stack.

        Returns:
            The image builder of the stack.
        """
        return self._image_builder

    @property
    def model_registry(self) -> Optional["BaseModelRegistry"]:
        """The model registry of the stack.

        Returns:
            The model registry of the stack.
        """
        return self._model_registry

    def dict(self) -> Dict[str, str]:
        """Converts the stack into a dictionary.

        Returns:
            A dictionary containing the stack components.
        """
        component_dict = {
            component_type.value: component.config.json(sort_keys=True)
            for component_type, component in self.components.items()
        }
        component_dict.update({"name": self.name})
        return component_dict

    def requirements(
        self,
        exclude_components: Optional[AbstractSet[StackComponentType]] = None,
    ) -> Set[str]:
        """Set of PyPI requirements for the stack.

        This method combines the requirements of all stack components (except
        the ones specified in `exclude_components`).

        Args:
            exclude_components: Set of component types for which the
                requirements should not be included in the output.

        Returns:
            Set of PyPI requirements.
        """
        exclude_components = exclude_components or set()
        requirements = [
            component.requirements
            for component in self.components.values()
            if component.type not in exclude_components
        ]
        return set.union(*requirements) if requirements else set()

    @property
    def apt_packages(self) -> List[str]:
        """List of APT package requirements for the stack.

        Returns:
            A list of APT package requirements for the stack.
        """
        return [
            package
            for component in self.components.values()
            for package in component.apt_packages
        ]

    def check_local_paths(self) -> bool:
        """Checks if the stack has local paths.

        Returns:
            True if the stack has local paths, False otherwise.

        Raises:
            ValueError: If the stack has local paths that do not conform to
                the convention that all local path must be relative to the
                local stores directory.
        """
        from zenml.config.global_config import GlobalConfiguration

        local_stores_path = GlobalConfiguration().local_stores_path

        # go through all stack components and identify those that advertise
        # a local path where they persist information that they need to be
        # available when running pipelines.
        has_local_paths = False
        for stack_comp in self.components.values():
            local_path = stack_comp.local_path
            if not local_path:
                continue
            # double-check this convention, just in case it wasn't respected
            # as documented in `StackComponent.local_path`
            if not local_path.startswith(local_stores_path):
                raise ValueError(
                    f"Local path {local_path} for component "
                    f"{stack_comp.name} is not in the local stores "
                    f"directory ({local_stores_path})."
                )
            has_local_paths = True

        return has_local_paths

    @property
    def required_secrets(self) -> Set["secret_utils.SecretReference"]:
        """All required secrets for this stack.

        Returns:
            The required secrets of this stack.
        """
        secrets = [
            component.config.required_secrets
            for component in self.components.values()
        ]
        return set.union(*secrets) if secrets else set()

    @property
    def setting_classes(self) -> Dict[str, Type["BaseSettings"]]:
        """Setting classes of all components of this stack.

        Returns:
            All setting classes and their respective keys.
        """
        setting_classes = {}
        for component in self.components.values():
            if component.settings_class:
                key = settings_utils.get_stack_component_setting_key(component)
                setting_classes[key] = component.settings_class
        return setting_classes

    @property
    def requires_remote_server(self) -> bool:
        """If the stack requires a remote ZenServer to run.

        This is the case if any code is getting executed remotely. This is the
        case for both remote orchestrators as well as remote step operators.

        Returns:
            If the stack requires a remote ZenServer to run.
        """
        return self.orchestrator.config.is_remote or (
            self.step_operator is not None
            and self.step_operator.config.is_remote
        )

    def _validate_secrets(self, raise_exception: bool) -> None:
        """Validates that all secrets of the stack exists.

        Args:
            raise_exception: If `True`, raises an exception if the stack has
                no secrets manager or a secret is missing. Otherwise a
                warning is logged.

        # noqa: DAR402
        Raises:
            StackValidationError: If the stack has no secrets manager or a
                secret is missing.
        """
        env_value = os.getenv(
            ENV_ZENML_SECRET_VALIDATION_LEVEL,
            default=SecretValidationLevel.SECRET_AND_KEY_EXISTS.value,
        )
        secret_validation_level = SecretValidationLevel(env_value)

        required_secrets = self.required_secrets
        if (
            secret_validation_level != SecretValidationLevel.NONE
            and required_secrets
        ):

            def _handle_error(message: str) -> None:
                """Handles the error by raising an exception or logging.

                Args:
                    message: The error message.

                Raises:
                    StackValidationError: If called and `raise_exception` of
                        the outer method is `True`.
                """
                if raise_exception:
                    raise StackValidationError(message)
                else:
                    message += (
                        "\nYou need to solve this issue before running "
                        "a pipeline on this stack."
                    )
                    logger.warning(message)

            missing = []

            client = Client()

            # First, attempt to resolve secrets through the secrets store
            for secret_ref in required_secrets.copy():
                try:
                    store_secret = client.get_secret(secret_ref.name)
                    if (
                        secret_validation_level
                        == SecretValidationLevel.SECRET_AND_KEY_EXISTS
                    ):
                        _ = store_secret.values[secret_ref.key]
                except (KeyError, NotImplementedError):
                    pass
                else:
                    # Drop this secret from the list of required secrets
                    required_secrets.remove(secret_ref)

            # If there are still required secrets, continue with the secrets
            # manager
            if not required_secrets:
                return

            if not self.secrets_manager:
                _handle_error(
                    f"Some component in stack `{self.name}` reference secret "
                    "values, but there is no secrets manager in this stack."
                )
                return

            existing_secrets = set(self.secrets_manager.get_all_secret_keys())
            for secret_ref in required_secrets:
                if (
                    secret_validation_level
                    == SecretValidationLevel.SECRET_AND_KEY_EXISTS
                ):
                    try:
                        _ = self.secrets_manager.get_secret(
                            secret_ref.name
                        ).content[secret_ref.key]
                    except KeyError:
                        missing.append(secret_ref)
                elif (
                    secret_validation_level
                    == SecretValidationLevel.SECRET_EXISTS
                ):
                    if secret_ref.name not in existing_secrets:
                        missing.append(secret_ref)

            if missing:
                _handle_error(
                    f"Missing secrets for stack: {missing}.\nTo register the "
                    "missing secrets for this stack, run `zenml stack "
                    f"register-secrets {self.name}`\nIf you want to "
                    "adjust the degree to which ZenML validates the existence "
                    "of secrets in your stack, you can do so by setting the "
                    f"environment variable {ENV_ZENML_SECRET_VALIDATION_LEVEL} "
                    "to one of the following values: "
                    f"{SecretValidationLevel.values()}."
                )

    def validate(
        self,
        fail_if_secrets_missing: bool = False,
    ) -> None:
        """Checks whether the stack configuration is valid.

        To check if a stack configuration is valid, the following criteria must
        be met:
        - the stack must have an image builder if other components require it
        - the `StackValidator` of each stack component has to validate the
            stack to make sure all the components are compatible with each other
        - the required secrets of all components need to exist

        Args:
            fail_if_secrets_missing: If this is `True`, an error will be raised
                if a secret for a component is missing. Otherwise, only a
                warning will be logged.
        """
        self.validate_image_builder()
        for component in self.components.values():
            if component.validator:
                component.validator.validate(stack=self)

        self._validate_secrets(raise_exception=fail_if_secrets_missing)

    def validate_image_builder(self) -> None:
        """Validates that the stack has an image builder if required.

        If the stack requires an image builder, but none is specified, a
        local image builder will be created and assigned to the stack to
        ensure backwards compatibility.
        """
        requires_image_builder = (
            self.orchestrator.flavor != "local"
            or self.step_operator
            or (self.model_deployer and self.model_deployer.flavor != "mlflow")
        )
        skip_default_image_builder = handle_bool_env_var(
            ENV_ZENML_SKIP_IMAGE_BUILDER_DEFAULT, default=False
        )
        if (
            requires_image_builder
            and not skip_default_image_builder
            and not self.image_builder
        ):
            from datetime import datetime
            from uuid import uuid4

            from zenml.image_builders import (
                LocalImageBuilder,
                LocalImageBuilderConfig,
                LocalImageBuilderFlavor,
            )

            flavor = LocalImageBuilderFlavor()

            image_builder = LocalImageBuilder(
                id=uuid4(),
                name="temporary_default",
                flavor=flavor.name,
                type=flavor.type,
                config=LocalImageBuilderConfig(),
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                created=datetime.utcnow(),
                updated=datetime.utcnow(),
            )

            logger.warning(
                "The stack `%s` contains components that require building "
                "Docker images. Older versions of ZenML always built these "
                "images locally, but since version 0.32.0 this behavior can be "
                "configured using the `image_builder` stack component. This "
                "stack will temporarily default to a local image builder that "
                "mirrors the previous behavior, but this will be removed in "
                "future versions of ZenML. Please add an image builder to this "
                "stack:\n"
                "`zenml image-builder register <NAME> ...\n"
                "zenml stack update %s -i <NAME>`",
                self.name,
                self.id,
            )

            self._image_builder = image_builder

    def prepare_pipeline_deployment(
        self, deployment: "PipelineDeploymentResponseModel"
    ) -> None:
        """Prepares the stack for a pipeline deployment.

        This method is called before a pipeline is deployed.

        Args:
            deployment: The pipeline deployment

        Raises:
            StackValidationError: If the stack component is not running.
            RuntimeError: If trying to deploy a pipeline that requires a remote
                ZenML server with a local one.
        """
        self.validate(fail_if_secrets_missing=True)

        for component in self.components.values():
            if not component.is_running:
                raise StackValidationError(
                    f"The '{component.name}' {component.type} stack component "
                    f"is not currently running. Please run the following "
                    f"command to provision and start the component:\n\n"
                    f"    `zenml stack up`\n"
                    f"It is worth noting that the provision command will "
                    f" be deprecated in the future. ZenML will no longer "
                    f"be responsible for provisioning infrastructure, "
                    f"or port-forwarding directly. Instead of managing "
                    f"the state of the components, ZenML will be utilizing "
                    f"the already running stack or stack components directly. "
                    f"Additionally, we are also providing a variety of "
                    f" deployment recipes for popular Kubernetes-based "
                    f"integrations such as Kubeflow, Tekton, and Seldon etc."
                    f"Check out https://docs.zenml.io/platform-guide/set-up-your-mlops-platform/deploy-and-set-up-a-cloud-stack/deploy-a-stack-using-stack-recipes"
                    f"for more information."
                )

        if self.requires_remote_server and Client().zen_store.is_local_store():
            raise RuntimeError(
                "Stacks with remote components such as remote orchestrators "
                "and step operators require a remote "
                "ZenML server. To run a pipeline with this stack you need to "
                "connect to a remote ZenML server first. Check out "
                "https://docs.zenml.io/platform-guide/set-up-your-mlops-platform/deploy-zenml for "
                "more information on how to deploy ZenML."
            )

        for component in self.components.values():
            component.prepare_pipeline_deployment(
                deployment=deployment, stack=self
            )

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBaseModel"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the stack.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        return list(
            itertools.chain.from_iterable(
                component.get_docker_builds(deployment=deployment)
                for component in self.components.values()
            )
        )

    def deploy_pipeline(
        self,
        deployment: "PipelineDeploymentResponseModel",
    ) -> Any:
        """Deploys a pipeline on this stack.

        Args:
            deployment: The pipeline deployment.

        Returns:
            The return value of the call to `orchestrator.run_pipeline(...)`.
        """
        return self.orchestrator.run(deployment=deployment, stack=self)

    def _get_active_components_for_step(
        self, step_config: "StepConfiguration"
    ) -> Dict[StackComponentType, "StackComponent"]:
        """Gets all the active stack components for a stack.

        Args:
            step_config: Configuration of the step for which to get the active
                components.

        Returns:
            Dictionary of active stack components.
        """

        def _is_active(component: "StackComponent") -> bool:
            """Checks whether a stack component is actively used in the step.

            Args:
                component: The component to check.

            Returns:
                If the component is used in this step.
            """
            if component.type == StackComponentType.STEP_OPERATOR:
                return component.name == step_config.step_operator

            if component.type == StackComponentType.EXPERIMENT_TRACKER:
                return component.name == step_config.experiment_tracker

            return True

        return {
            component_type: component
            for component_type, component in self.components.items()
            if _is_active(component)
        }

    def prepare_step_run(self, info: "StepRunInfo") -> None:
        """Prepares running a step.

        Args:
            info: Info about the step that will be executed.
        """
        for component in self._get_active_components_for_step(
            info.config
        ).values():
            component.prepare_step_run(info=info)

    def get_pipeline_run_metadata(
        self, run_id: UUID
    ) -> Dict[UUID, Dict[str, MetadataType]]:
        """Get general component-specific metadata for a pipeline run.

        Args:
            run_id: ID of the pipeline run.

        Returns:
            A dictionary mapping component IDs to the metadata they created.
        """
        pipeline_run_metadata: Dict[UUID, Dict[str, MetadataType]] = {}
        for component in self.components.values():
            try:
                component_metadata = component.get_pipeline_run_metadata(
                    run_id=run_id
                )
                if component_metadata:
                    pipeline_run_metadata[component.id] = component_metadata
            except Exception as e:
                logger.warning(
                    f"Extracting pipeline run metadata failed for component "
                    f"'{component.name}' of type '{component.type}': {e}"
                )
        return pipeline_run_metadata

    def get_step_run_metadata(
        self, info: "StepRunInfo"
    ) -> Dict[UUID, Dict[str, MetadataType]]:
        """Get component-specific metadata for a step run.

        Args:
            info: Info about the step that was executed.

        Returns:
            A dictionary mapping component IDs to the metadata they created.
        """
        step_run_metadata: Dict[UUID, Dict[str, MetadataType]] = {}
        for component in self._get_active_components_for_step(
            info.config
        ).values():
            try:
                component_metadata = component.get_step_run_metadata(info=info)
                if component_metadata:
                    step_run_metadata[component.id] = component_metadata
            except Exception as e:
                logger.warning(
                    f"Extracting step run metadata failed for component "
                    f"'{component.name}' of type '{component.type}': {e}"
                )
        return step_run_metadata

    def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
        """Cleans up resources after the step run is finished.

        Args:
            info: Info about the step that was executed.
            step_failed: Whether the step failed.
        """
        for component in self._get_active_components_for_step(
            info.config
        ).values():
            component.cleanup_step_run(info=info, step_failed=step_failed)

    @property
    def is_provisioned(self) -> bool:
        """If the stack provisioned resources to run locally.

        Returns:
            True if the stack provisioned resources to run locally.
        """
        return all(
            component.is_provisioned for component in self.components.values()
        )

    @property
    def is_running(self) -> bool:
        """If the stack is running locally.

        Returns:
            True if the stack is running locally, False otherwise.
        """
        return all(
            component.is_running for component in self.components.values()
        )

    def provision(self) -> None:
        """Provisions resources to run the stack locally."""
        self.validate(fail_if_secrets_missing=True)
        logger.info("Provisioning resources for stack '%s'.", self.name)
        for component in self.components.values():
            if not component.is_provisioned:
                component.provision()
                logger.info("Provisioned resources for %s.", component)

    def deprovision(self) -> None:
        """Deprovisions all local resources of the stack."""
        logger.info("Deprovisioning resources for stack '%s'.", self.name)
        for component in self.components.values():
            if component.is_provisioned:
                try:
                    component.deprovision()
                    logger.info("Deprovisioned resources for %s.", component)
                except NotImplementedError as e:
                    logger.warning(e)

    def resume(self) -> None:
        """Resumes the provisioned local resources of the stack.

        Raises:
            ProvisioningError: If any stack component is missing provisioned
                resources.
        """
        logger.info("Resuming provisioned resources for stack %s.", self.name)
        for component in self.components.values():
            if component.is_running:
                # the component is already running, no need to resume anything
                pass
            elif component.is_provisioned:
                component.resume()
                logger.info("Resumed resources for %s.", component)
            else:
                raise ProvisioningError(
                    f"Unable to resume resources for {component}: No "
                    f"resources have been provisioned for this component."
                )

    def suspend(self) -> None:
        """Suspends the provisioned local resources of the stack."""
        logger.info(
            "Suspending provisioned resources for stack '%s'.", self.name
        )
        for component in self.components.values():
            if not component.is_suspended:
                try:
                    component.suspend()
                    logger.info("Suspended resources for %s.", component)
                except NotImplementedError:
                    logger.warning(
                        "Suspending provisioned resources not implemented "
                        "for %s. Continuing without suspending resources...",
                        component,
                    )
alerter: Optional[BaseAlerter] property readonly

The alerter of the stack.

Returns:

Type Description
Optional[BaseAlerter]

The alerter of the stack.

annotator: Optional[BaseAnnotator] property readonly

The annotator of the stack.

Returns:

Type Description
Optional[BaseAnnotator]

The annotator of the stack.

apt_packages: List[str] property readonly

List of APT package requirements for the stack.

Returns:

Type Description
List[str]

A list of APT package requirements for the stack.

artifact_store: BaseArtifactStore property readonly

The artifact store of the stack.

Returns:

Type Description
BaseArtifactStore

The artifact store of the stack.

components: Dict[zenml.enums.StackComponentType, StackComponent] property readonly

All components of the stack.

Returns:

Type Description
Dict[zenml.enums.StackComponentType, StackComponent]

A dictionary of all components of the stack.

container_registry: Optional[BaseContainerRegistry] property readonly

The container registry of the stack.

Returns:

Type Description
Optional[BaseContainerRegistry]

The container registry of the stack or None if the stack does not have a container registry.

data_validator: Optional[BaseDataValidator] property readonly

The data validator of the stack.

Returns:

Type Description
Optional[BaseDataValidator]

The data validator of the stack.

experiment_tracker: Optional[BaseExperimentTracker] property readonly

The experiment tracker of the stack.

Returns:

Type Description
Optional[BaseExperimentTracker]

The experiment tracker of the stack.

feature_store: Optional[BaseFeatureStore] property readonly

The feature store of the stack.

Returns:

Type Description
Optional[BaseFeatureStore]

The feature store of the stack.

id: UUID property readonly

The ID of the stack.

Returns:

Type Description
UUID

The ID of the stack.

image_builder: Optional[BaseImageBuilder] property readonly

The image builder of the stack.

Returns:

Type Description
Optional[BaseImageBuilder]

The image builder of the stack.

is_provisioned: bool property readonly

If the stack provisioned resources to run locally.

Returns:

Type Description
bool

True if the stack provisioned resources to run locally.

is_running: bool property readonly

If the stack is running locally.

Returns:

Type Description
bool

True if the stack is running locally, False otherwise.

model_deployer: Optional[BaseModelDeployer] property readonly

The model deployer of the stack.

Returns:

Type Description
Optional[BaseModelDeployer]

The model deployer of the stack.

model_registry: Optional[BaseModelRegistry] property readonly

The model registry of the stack.

Returns:

Type Description
Optional[BaseModelRegistry]

The model registry of the stack.

name: str property readonly

The name of the stack.

Returns:

Type Description
str

The name of the stack.

orchestrator: BaseOrchestrator property readonly

The orchestrator of the stack.

Returns:

Type Description
BaseOrchestrator

The orchestrator of the stack.

required_secrets: Set[secret_utils.SecretReference] property readonly

All required secrets for this stack.

Returns:

Type Description
Set[secret_utils.SecretReference]

The required secrets of this stack.

requires_remote_server: bool property readonly

If the stack requires a remote ZenServer to run.

This is the case if any code is getting executed remotely. This is the case for both remote orchestrators as well as remote step operators.

Returns:

Type Description
bool

If the stack requires a remote ZenServer to run.

secrets_manager: Optional[BaseSecretsManager] property readonly

The secrets manager of the stack.

Returns:

Type Description
Optional[BaseSecretsManager]

The secrets manager of the stack.

setting_classes: Dict[str, Type[BaseSettings]] property readonly

Setting classes of all components of this stack.

Returns:

Type Description
Dict[str, Type[BaseSettings]]

All setting classes and their respective keys.

step_operator: Optional[BaseStepOperator] property readonly

The step operator of the stack.

Returns:

Type Description
Optional[BaseStepOperator]

The step operator of the stack.

__init__(self, id, name, *, orchestrator, artifact_store, container_registry=None, secrets_manager=None, step_operator=None, feature_store=None, model_deployer=None, experiment_tracker=None, alerter=None, annotator=None, data_validator=None, image_builder=None, model_registry=None) special

Initializes and validates a stack instance.

Parameters:

Name Type Description Default
id UUID

Unique ID of the stack.

required
name str

Name of the stack.

required
orchestrator BaseOrchestrator

Orchestrator component of the stack.

required
artifact_store BaseArtifactStore

Artifact store component of the stack.

required
container_registry Optional[BaseContainerRegistry]

Container registry component of the stack.

None
secrets_manager Optional[BaseSecretsManager]

Secrets manager component of the stack.

None
step_operator Optional[BaseStepOperator]

Step operator component of the stack.

None
feature_store Optional[BaseFeatureStore]

Feature store component of the stack.

None
model_deployer Optional[BaseModelDeployer]

Model deployer component of the stack.

None
experiment_tracker Optional[BaseExperimentTracker]

Experiment tracker component of the stack.

None
alerter Optional[BaseAlerter]

Alerter component of the stack.

None
annotator Optional[BaseAnnotator]

Annotator component of the stack.

None
data_validator Optional[BaseDataValidator]

Data validator component of the stack.

None
image_builder Optional[BaseImageBuilder]

Image builder component of the stack.

None
model_registry Optional[BaseModelRegistry]

Model registry component of the stack.

None
Source code in zenml/stack/stack.py
def __init__(
    self,
    id: UUID,
    name: str,
    *,
    orchestrator: "BaseOrchestrator",
    artifact_store: "BaseArtifactStore",
    container_registry: Optional["BaseContainerRegistry"] = None,
    secrets_manager: Optional["BaseSecretsManager"] = None,
    step_operator: Optional["BaseStepOperator"] = None,
    feature_store: Optional["BaseFeatureStore"] = None,
    model_deployer: Optional["BaseModelDeployer"] = None,
    experiment_tracker: Optional["BaseExperimentTracker"] = None,
    alerter: Optional["BaseAlerter"] = None,
    annotator: Optional["BaseAnnotator"] = None,
    data_validator: Optional["BaseDataValidator"] = None,
    image_builder: Optional["BaseImageBuilder"] = None,
    model_registry: Optional["BaseModelRegistry"] = None,
):
    """Initializes and validates a stack instance.

    Args:
        id: Unique ID of the stack.
        name: Name of the stack.
        orchestrator: Orchestrator component of the stack.
        artifact_store: Artifact store component of the stack.
        container_registry: Container registry component of the stack.
        secrets_manager: Secrets manager component of the stack.
        step_operator: Step operator component of the stack.
        feature_store: Feature store component of the stack.
        model_deployer: Model deployer component of the stack.
        experiment_tracker: Experiment tracker component of the stack.
        alerter: Alerter component of the stack.
        annotator: Annotator component of the stack.
        data_validator: Data validator component of the stack.
        image_builder: Image builder component of the stack.
        model_registry: Model registry component of the stack.
    """
    self._id = id
    self._name = name
    self._orchestrator = orchestrator
    self._artifact_store = artifact_store
    self._container_registry = container_registry
    self._step_operator = step_operator
    self._secrets_manager = secrets_manager
    self._feature_store = feature_store
    self._model_deployer = model_deployer
    self._experiment_tracker = experiment_tracker
    self._alerter = alerter
    self._annotator = annotator
    self._data_validator = data_validator
    self._model_registry = model_registry
    self._image_builder = image_builder
check_local_paths(self)

Checks if the stack has local paths.

Returns:

Type Description
bool

True if the stack has local paths, False otherwise.

Exceptions:

Type Description
ValueError

If the stack has local paths that do not conform to the convention that all local path must be relative to the local stores directory.

Source code in zenml/stack/stack.py
def check_local_paths(self) -> bool:
    """Checks if the stack has local paths.

    Returns:
        True if the stack has local paths, False otherwise.

    Raises:
        ValueError: If the stack has local paths that do not conform to
            the convention that all local path must be relative to the
            local stores directory.
    """
    from zenml.config.global_config import GlobalConfiguration

    local_stores_path = GlobalConfiguration().local_stores_path

    # go through all stack components and identify those that advertise
    # a local path where they persist information that they need to be
    # available when running pipelines.
    has_local_paths = False
    for stack_comp in self.components.values():
        local_path = stack_comp.local_path
        if not local_path:
            continue
        # double-check this convention, just in case it wasn't respected
        # as documented in `StackComponent.local_path`
        if not local_path.startswith(local_stores_path):
            raise ValueError(
                f"Local path {local_path} for component "
                f"{stack_comp.name} is not in the local stores "
                f"directory ({local_stores_path})."
            )
        has_local_paths = True

    return has_local_paths
cleanup_step_run(self, info, step_failed)

Cleans up resources after the step run is finished.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required
step_failed bool

Whether the step failed.

required
Source code in zenml/stack/stack.py
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
    """Cleans up resources after the step run is finished.

    Args:
        info: Info about the step that was executed.
        step_failed: Whether the step failed.
    """
    for component in self._get_active_components_for_step(
        info.config
    ).values():
        component.cleanup_step_run(info=info, step_failed=step_failed)
deploy_pipeline(self, deployment)

Deploys a pipeline on this stack.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponseModel

The pipeline deployment.

required

Returns:

Type Description
Any

The return value of the call to orchestrator.run_pipeline(...).

Source code in zenml/stack/stack.py
def deploy_pipeline(
    self,
    deployment: "PipelineDeploymentResponseModel",
) -> Any:
    """Deploys a pipeline on this stack.

    Args:
        deployment: The pipeline deployment.

    Returns:
        The return value of the call to `orchestrator.run_pipeline(...)`.
    """
    return self.orchestrator.run(deployment=deployment, stack=self)
deprovision(self)

Deprovisions all local resources of the stack.

Source code in zenml/stack/stack.py
def deprovision(self) -> None:
    """Deprovisions all local resources of the stack."""
    logger.info("Deprovisioning resources for stack '%s'.", self.name)
    for component in self.components.values():
        if component.is_provisioned:
            try:
                component.deprovision()
                logger.info("Deprovisioned resources for %s.", component)
            except NotImplementedError as e:
                logger.warning(e)
dict(self)

Converts the stack into a dictionary.

Returns:

Type Description
Dict[str, str]

A dictionary containing the stack components.

Source code in zenml/stack/stack.py
def dict(self) -> Dict[str, str]:
    """Converts the stack into a dictionary.

    Returns:
        A dictionary containing the stack components.
    """
    component_dict = {
        component_type.value: component.config.json(sort_keys=True)
        for component_type, component in self.components.items()
    }
    component_dict.update({"name": self.name})
    return component_dict
from_components(id, name, components) classmethod

Creates a stack instance from a dict of stack components.

noqa: DAR402

Parameters:

Name Type Description Default
id UUID

Unique ID of the stack.

required
name str

The name of the stack.

required
components Dict[zenml.enums.StackComponentType, StackComponent]

The components of the stack.

required

Returns:

Type Description
Stack

A stack instance consisting of the given components.

Exceptions:

Type Description
TypeError

If a required component is missing or a component doesn't inherit from the expected base class.

Source code in zenml/stack/stack.py
@classmethod
def from_components(
    cls,
    id: UUID,
    name: str,
    components: Dict[StackComponentType, "StackComponent"],
) -> "Stack":
    """Creates a stack instance from a dict of stack components.

    # noqa: DAR402

    Args:
        id: Unique ID of the stack.
        name: The name of the stack.
        components: The components of the stack.

    Returns:
        A stack instance consisting of the given components.

    Raises:
        TypeError: If a required component is missing or a component
            doesn't inherit from the expected base class.
    """
    from zenml.alerter import BaseAlerter
    from zenml.annotators import BaseAnnotator
    from zenml.artifact_stores import BaseArtifactStore
    from zenml.container_registries import BaseContainerRegistry
    from zenml.data_validators import BaseDataValidator
    from zenml.experiment_trackers import BaseExperimentTracker
    from zenml.feature_stores import BaseFeatureStore
    from zenml.image_builders import BaseImageBuilder
    from zenml.model_deployers import BaseModelDeployer
    from zenml.model_registries import BaseModelRegistry
    from zenml.orchestrators import BaseOrchestrator
    from zenml.secrets_managers import BaseSecretsManager
    from zenml.step_operators import BaseStepOperator

    def _raise_type_error(
        component: Optional["StackComponent"], expected_class: Type[Any]
    ) -> NoReturn:
        """Raises a TypeError that the component has an unexpected type.

        Args:
            component: The component that has an unexpected type.
            expected_class: The expected type of the component.

        Raises:
            TypeError: If the component has an unexpected type.
        """
        raise TypeError(
            f"Unable to create stack: Wrong stack component type "
            f"`{component.__class__.__name__}` (expected: subclass "
            f"of `{expected_class.__name__}`)"
        )

    orchestrator = components.get(StackComponentType.ORCHESTRATOR)
    if not isinstance(orchestrator, BaseOrchestrator):
        _raise_type_error(orchestrator, BaseOrchestrator)

    artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
    if not isinstance(artifact_store, BaseArtifactStore):
        _raise_type_error(artifact_store, BaseArtifactStore)

    container_registry = components.get(
        StackComponentType.CONTAINER_REGISTRY
    )
    if container_registry is not None and not isinstance(
        container_registry, BaseContainerRegistry
    ):
        _raise_type_error(container_registry, BaseContainerRegistry)

    secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
    if secrets_manager is not None and not isinstance(
        secrets_manager, BaseSecretsManager
    ):
        _raise_type_error(secrets_manager, BaseSecretsManager)

    step_operator = components.get(StackComponentType.STEP_OPERATOR)
    if step_operator is not None and not isinstance(
        step_operator, BaseStepOperator
    ):
        _raise_type_error(step_operator, BaseStepOperator)

    feature_store = components.get(StackComponentType.FEATURE_STORE)
    if feature_store is not None and not isinstance(
        feature_store, BaseFeatureStore
    ):
        _raise_type_error(feature_store, BaseFeatureStore)

    model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
    if model_deployer is not None and not isinstance(
        model_deployer, BaseModelDeployer
    ):
        _raise_type_error(model_deployer, BaseModelDeployer)

    experiment_tracker = components.get(
        StackComponentType.EXPERIMENT_TRACKER
    )
    if experiment_tracker is not None and not isinstance(
        experiment_tracker, BaseExperimentTracker
    ):
        _raise_type_error(experiment_tracker, BaseExperimentTracker)

    alerter = components.get(StackComponentType.ALERTER)
    if alerter is not None and not isinstance(alerter, BaseAlerter):
        _raise_type_error(alerter, BaseAlerter)

    annotator = components.get(StackComponentType.ANNOTATOR)
    if annotator is not None and not isinstance(annotator, BaseAnnotator):
        _raise_type_error(annotator, BaseAnnotator)

    data_validator = components.get(StackComponentType.DATA_VALIDATOR)
    if data_validator is not None and not isinstance(
        data_validator, BaseDataValidator
    ):
        _raise_type_error(data_validator, BaseDataValidator)

    image_builder = components.get(StackComponentType.IMAGE_BUILDER)
    if image_builder is not None and not isinstance(
        image_builder, BaseImageBuilder
    ):
        _raise_type_error(image_builder, BaseImageBuilder)

    model_registry = components.get(StackComponentType.MODEL_REGISTRY)
    if model_registry is not None and not isinstance(
        model_registry, BaseModelRegistry
    ):
        _raise_type_error(model_registry, BaseModelRegistry)

    return Stack(
        id=id,
        name=name,
        orchestrator=orchestrator,
        artifact_store=artifact_store,
        container_registry=container_registry,
        secrets_manager=secrets_manager,
        step_operator=step_operator,
        feature_store=feature_store,
        model_deployer=model_deployer,
        experiment_tracker=experiment_tracker,
        alerter=alerter,
        annotator=annotator,
        data_validator=data_validator,
        image_builder=image_builder,
        model_registry=model_registry,
    )
from_model(stack_model) classmethod

Creates a Stack instance from a StackModel.

Parameters:

Name Type Description Default
stack_model StackResponseModel

The StackModel to create the Stack from.

required

Returns:

Type Description
Stack

The created Stack instance.

Source code in zenml/stack/stack.py
@classmethod
def from_model(cls, stack_model: StackResponseModel) -> "Stack":
    """Creates a Stack instance from a StackModel.

    Args:
        stack_model: The StackModel to create the Stack from.

    Returns:
        The created Stack instance.
    """
    from zenml.stack import StackComponent

    stack_components = {
        type_: StackComponent.from_model(model[0])
        for type_, model in stack_model.components.items()
    }
    return Stack.from_components(
        id=stack_model.id,
        name=stack_model.name,
        components=stack_components,
    )
get_docker_builds(self, deployment)

Gets the Docker builds required for the stack.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in zenml/stack/stack.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBaseModel"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the stack.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    return list(
        itertools.chain.from_iterable(
            component.get_docker_builds(deployment=deployment)
            for component in self.components.values()
        )
    )
get_pipeline_run_metadata(self, run_id)

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run.

required

Returns:

Type Description
Dict[uuid.UUID, Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]]

A dictionary mapping component IDs to the metadata they created.

Source code in zenml/stack/stack.py
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[UUID, Dict[str, MetadataType]]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: ID of the pipeline run.

    Returns:
        A dictionary mapping component IDs to the metadata they created.
    """
    pipeline_run_metadata: Dict[UUID, Dict[str, MetadataType]] = {}
    for component in self.components.values():
        try:
            component_metadata = component.get_pipeline_run_metadata(
                run_id=run_id
            )
            if component_metadata:
                pipeline_run_metadata[component.id] = component_metadata
        except Exception as e:
            logger.warning(
                f"Extracting pipeline run metadata failed for component "
                f"'{component.name}' of type '{component.type}': {e}"
            )
    return pipeline_run_metadata
get_step_run_metadata(self, info)

Get component-specific metadata for a step run.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required

Returns:

Type Description
Dict[uuid.UUID, Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]]

A dictionary mapping component IDs to the metadata they created.

Source code in zenml/stack/stack.py
def get_step_run_metadata(
    self, info: "StepRunInfo"
) -> Dict[UUID, Dict[str, MetadataType]]:
    """Get component-specific metadata for a step run.

    Args:
        info: Info about the step that was executed.

    Returns:
        A dictionary mapping component IDs to the metadata they created.
    """
    step_run_metadata: Dict[UUID, Dict[str, MetadataType]] = {}
    for component in self._get_active_components_for_step(
        info.config
    ).values():
        try:
            component_metadata = component.get_step_run_metadata(info=info)
            if component_metadata:
                step_run_metadata[component.id] = component_metadata
        except Exception as e:
            logger.warning(
                f"Extracting step run metadata failed for component "
                f"'{component.name}' of type '{component.type}': {e}"
            )
    return step_run_metadata
prepare_pipeline_deployment(self, deployment)

Prepares the stack for a pipeline deployment.

This method is called before a pipeline is deployed.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponseModel

The pipeline deployment

required

Exceptions:

Type Description
StackValidationError

If the stack component is not running.

RuntimeError

If trying to deploy a pipeline that requires a remote ZenML server with a local one.

Source code in zenml/stack/stack.py
def prepare_pipeline_deployment(
    self, deployment: "PipelineDeploymentResponseModel"
) -> None:
    """Prepares the stack for a pipeline deployment.

    This method is called before a pipeline is deployed.

    Args:
        deployment: The pipeline deployment

    Raises:
        StackValidationError: If the stack component is not running.
        RuntimeError: If trying to deploy a pipeline that requires a remote
            ZenML server with a local one.
    """
    self.validate(fail_if_secrets_missing=True)

    for component in self.components.values():
        if not component.is_running:
            raise StackValidationError(
                f"The '{component.name}' {component.type} stack component "
                f"is not currently running. Please run the following "
                f"command to provision and start the component:\n\n"
                f"    `zenml stack up`\n"
                f"It is worth noting that the provision command will "
                f" be deprecated in the future. ZenML will no longer "
                f"be responsible for provisioning infrastructure, "
                f"or port-forwarding directly. Instead of managing "
                f"the state of the components, ZenML will be utilizing "
                f"the already running stack or stack components directly. "
                f"Additionally, we are also providing a variety of "
                f" deployment recipes for popular Kubernetes-based "
                f"integrations such as Kubeflow, Tekton, and Seldon etc."
                f"Check out https://docs.zenml.io/platform-guide/set-up-your-mlops-platform/deploy-and-set-up-a-cloud-stack/deploy-a-stack-using-stack-recipes"
                f"for more information."
            )

    if self.requires_remote_server and Client().zen_store.is_local_store():
        raise RuntimeError(
            "Stacks with remote components such as remote orchestrators "
            "and step operators require a remote "
            "ZenML server. To run a pipeline with this stack you need to "
            "connect to a remote ZenML server first. Check out "
            "https://docs.zenml.io/platform-guide/set-up-your-mlops-platform/deploy-zenml for "
            "more information on how to deploy ZenML."
        )

    for component in self.components.values():
        component.prepare_pipeline_deployment(
            deployment=deployment, stack=self
        )
prepare_step_run(self, info)

Prepares running a step.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that will be executed.

required
Source code in zenml/stack/stack.py
def prepare_step_run(self, info: "StepRunInfo") -> None:
    """Prepares running a step.

    Args:
        info: Info about the step that will be executed.
    """
    for component in self._get_active_components_for_step(
        info.config
    ).values():
        component.prepare_step_run(info=info)
provision(self)

Provisions resources to run the stack locally.

Source code in zenml/stack/stack.py
def provision(self) -> None:
    """Provisions resources to run the stack locally."""
    self.validate(fail_if_secrets_missing=True)
    logger.info("Provisioning resources for stack '%s'.", self.name)
    for component in self.components.values():
        if not component.is_provisioned:
            component.provision()
            logger.info("Provisioned resources for %s.", component)
requirements(self, exclude_components=None)

Set of PyPI requirements for the stack.

This method combines the requirements of all stack components (except the ones specified in exclude_components).

Parameters:

Name Type Description Default
exclude_components Optional[AbstractSet[zenml.enums.StackComponentType]]

Set of component types for which the requirements should not be included in the output.

None

Returns:

Type Description
Set[str]

Set of PyPI requirements.

Source code in zenml/stack/stack.py
def requirements(
    self,
    exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
    """Set of PyPI requirements for the stack.

    This method combines the requirements of all stack components (except
    the ones specified in `exclude_components`).

    Args:
        exclude_components: Set of component types for which the
            requirements should not be included in the output.

    Returns:
        Set of PyPI requirements.
    """
    exclude_components = exclude_components or set()
    requirements = [
        component.requirements
        for component in self.components.values()
        if component.type not in exclude_components
    ]
    return set.union(*requirements) if requirements else set()
resume(self)

Resumes the provisioned local resources of the stack.

Exceptions:

Type Description
ProvisioningError

If any stack component is missing provisioned resources.

Source code in zenml/stack/stack.py
def resume(self) -> None:
    """Resumes the provisioned local resources of the stack.

    Raises:
        ProvisioningError: If any stack component is missing provisioned
            resources.
    """
    logger.info("Resuming provisioned resources for stack %s.", self.name)
    for component in self.components.values():
        if component.is_running:
            # the component is already running, no need to resume anything
            pass
        elif component.is_provisioned:
            component.resume()
            logger.info("Resumed resources for %s.", component)
        else:
            raise ProvisioningError(
                f"Unable to resume resources for {component}: No "
                f"resources have been provisioned for this component."
            )
suspend(self)

Suspends the provisioned local resources of the stack.

Source code in zenml/stack/stack.py
def suspend(self) -> None:
    """Suspends the provisioned local resources of the stack."""
    logger.info(
        "Suspending provisioned resources for stack '%s'.", self.name
    )
    for component in self.components.values():
        if not component.is_suspended:
            try:
                component.suspend()
                logger.info("Suspended resources for %s.", component)
            except NotImplementedError:
                logger.warning(
                    "Suspending provisioned resources not implemented "
                    "for %s. Continuing without suspending resources...",
                    component,
                )
validate(self, fail_if_secrets_missing=False)

Checks whether the stack configuration is valid.

To check if a stack configuration is valid, the following criteria must be met: - the stack must have an image builder if other components require it - the StackValidator of each stack component has to validate the stack to make sure all the components are compatible with each other - the required secrets of all components need to exist

Parameters:

Name Type Description Default
fail_if_secrets_missing bool

If this is True, an error will be raised if a secret for a component is missing. Otherwise, only a warning will be logged.

False
Source code in zenml/stack/stack.py
def validate(
    self,
    fail_if_secrets_missing: bool = False,
) -> None:
    """Checks whether the stack configuration is valid.

    To check if a stack configuration is valid, the following criteria must
    be met:
    - the stack must have an image builder if other components require it
    - the `StackValidator` of each stack component has to validate the
        stack to make sure all the components are compatible with each other
    - the required secrets of all components need to exist

    Args:
        fail_if_secrets_missing: If this is `True`, an error will be raised
            if a secret for a component is missing. Otherwise, only a
            warning will be logged.
    """
    self.validate_image_builder()
    for component in self.components.values():
        if component.validator:
            component.validator.validate(stack=self)

    self._validate_secrets(raise_exception=fail_if_secrets_missing)
validate_image_builder(self)

Validates that the stack has an image builder if required.

If the stack requires an image builder, but none is specified, a local image builder will be created and assigned to the stack to ensure backwards compatibility.

Source code in zenml/stack/stack.py
def validate_image_builder(self) -> None:
    """Validates that the stack has an image builder if required.

    If the stack requires an image builder, but none is specified, a
    local image builder will be created and assigned to the stack to
    ensure backwards compatibility.
    """
    requires_image_builder = (
        self.orchestrator.flavor != "local"
        or self.step_operator
        or (self.model_deployer and self.model_deployer.flavor != "mlflow")
    )
    skip_default_image_builder = handle_bool_env_var(
        ENV_ZENML_SKIP_IMAGE_BUILDER_DEFAULT, default=False
    )
    if (
        requires_image_builder
        and not skip_default_image_builder
        and not self.image_builder
    ):
        from datetime import datetime
        from uuid import uuid4

        from zenml.image_builders import (
            LocalImageBuilder,
            LocalImageBuilderConfig,
            LocalImageBuilderFlavor,
        )

        flavor = LocalImageBuilderFlavor()

        image_builder = LocalImageBuilder(
            id=uuid4(),
            name="temporary_default",
            flavor=flavor.name,
            type=flavor.type,
            config=LocalImageBuilderConfig(),
            user=Client().active_user.id,
            workspace=Client().active_workspace.id,
            created=datetime.utcnow(),
            updated=datetime.utcnow(),
        )

        logger.warning(
            "The stack `%s` contains components that require building "
            "Docker images. Older versions of ZenML always built these "
            "images locally, but since version 0.32.0 this behavior can be "
            "configured using the `image_builder` stack component. This "
            "stack will temporarily default to a local image builder that "
            "mirrors the previous behavior, but this will be removed in "
            "future versions of ZenML. Please add an image builder to this "
            "stack:\n"
            "`zenml image-builder register <NAME> ...\n"
            "zenml stack update %s -i <NAME>`",
            self.name,
            self.id,
        )

        self._image_builder = image_builder

stack_component

Implementation of the ZenML Stack Component class.

StackComponent

Abstract StackComponent class for all components of a ZenML stack.

Source code in zenml/stack/stack_component.py
class StackComponent:
    """Abstract StackComponent class for all components of a ZenML stack."""

    def __init__(
        self,
        name: str,
        id: UUID,
        config: StackComponentConfig,
        flavor: str,
        type: StackComponentType,
        user: Optional[UUID],
        workspace: UUID,
        created: datetime,
        updated: datetime,
        labels: Optional[Dict[str, Any]] = None,
        connector_requirements: Optional[ServiceConnectorRequirements] = None,
        connector: Optional[UUID] = None,
        connector_resource_id: Optional[str] = None,
        *args: Any,
        **kwargs: Any,
    ):
        """Initializes a StackComponent.

        Args:
            name: The name of the component.
            id: The unique ID of the component.
            config: The config of the component.
            flavor: The flavor of the component.
            type: The type of the component.
            user: The ID of the user who created the component.
            workspace: The ID of the workspace the component belongs to.
            created: The creation time of the component.
            updated: The last update time of the component.
            labels: The labels of the component.
            connector_requirements: The requirements for the connector.
            connector: The ID of a connector linked to the component.
            connector_resource_id: The custom resource ID to access through
                the connector.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.

        Raises:
            ValueError: If a secret reference is passed as name.
        """
        if secret_utils.is_secret_reference(name):
            raise ValueError(
                "Passing the `name` attribute of a stack component as a "
                "secret reference is not allowed."
            )

        self.id = id
        self.name = name
        self._config = config
        self.flavor = flavor
        self.type = type
        self.user = user
        self.workspace = workspace
        self.created = created
        self.updated = updated
        self.labels = labels
        self.connector_requirements = connector_requirements
        self.connector = connector
        self.connector_resource_id = connector_resource_id
        self._connector_instance: Optional[ServiceConnector] = None

    @classmethod
    def from_model(
        cls, component_model: "ComponentResponseModel"
    ) -> "StackComponent":
        """Creates a StackComponent from a ComponentModel.

        Args:
            component_model: The ComponentModel to create the StackComponent

        Returns:
            The created StackComponent.

        Raises:
            ImportError: If the flavor can't be imported.
        """
        from zenml.client import Client

        flavor_model = Client().get_flavor_by_name_and_type(
            name=component_model.flavor,
            component_type=component_model.type,
        )

        try:
            from zenml.stack import Flavor

            flavor = Flavor.from_model(flavor_model)
        except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )

        configuration = flavor.config_class(**component_model.configuration)

        if component_model.user is not None:
            user_id = component_model.user.id
        else:
            user_id = None

        return flavor.implementation_class(
            user=user_id,
            workspace=component_model.workspace.id,
            name=component_model.name,
            id=component_model.id,
            config=configuration,
            labels=component_model.labels,
            flavor=component_model.flavor,
            type=component_model.type,
            created=component_model.created,
            updated=component_model.updated,
            connector_requirements=flavor.service_connector_requirements,
            connector=component_model.connector.id
            if component_model.connector
            else None,
            connector_resource_id=component_model.connector_resource_id,
        )

    @property
    def config(self) -> StackComponentConfig:
        """Returns the configuration of the stack component.

        This should be overwritten by any subclasses that define custom configs
        to return the correct config class.

        Returns:
            The configuration of the stack component.
        """
        return self._config

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Class specifying available settings for this component.

        Returns:
            Optional settings class.
        """
        return None

    def get_settings(
        self,
        container: Union["Step", "StepRunInfo", "PipelineDeploymentBaseModel"],
    ) -> "BaseSettings":
        """Gets settings for this stack component.

        This will return `None` if the stack component doesn't specify a
        settings class or the container doesn't contain runtime
        options for this component.

        Args:
            container: The `Step`, `StepRunInfo` or `PipelineDeployment` from
                which to get the settings.

        Returns:
            Settings for this stack component.

        Raises:
            RuntimeError: If the stack component does not specify a settings
                class.
        """
        if not self.settings_class:
            raise RuntimeError(
                f"Unable to get settings for component {self} because this "
                "component does not have an associated settings class. "
                "Return a settings class from the `@settings_class` property "
                "and try again."
            )

        key = settings_utils.get_stack_component_setting_key(self)

        all_settings = (
            container.config.settings
            if isinstance(container, (Step, StepRunInfo))
            else container.pipeline_configuration.settings
        )

        if key in all_settings:
            return self.settings_class.parse_obj(all_settings[key])
        else:
            return self.settings_class()

    def connector_has_expired(self) -> bool:
        """Checks whether the connector linked to this stack component has expired.

        Returns:
            Whether the connector linked to this stack component has expired, or isn't linked to a connector.
        """
        if self.connector is None:
            # The stack component isn't linked to a connector
            return False

        if self._connector_instance is None:
            return True

        return self._connector_instance.has_expired()

    def get_connector(self) -> Optional["ServiceConnector"]:
        """Returns the connector linked to this stack component.

        Returns:
            The connector linked to this stack component.

        Raises:
            RuntimeError: If the stack component does not specify connector
                requirements or if the connector linked to the component is not
                compatible or not found.
        """
        from zenml.client import Client

        if self.connector is None:
            return None

        if self._connector_instance is not None:
            # If the connector instance is still valid, return it. Otherwise,
            # we'll try to get a new one.
            if not self._connector_instance.has_expired():
                return self._connector_instance

        if self.connector_requirements is None:
            raise RuntimeError(
                f"Unable to get connector for component {self} because this "
                "component does not declare any connector requirements in its. "
                "flavor specification. Override the "
                "`service_connector_requirements` method in its flavor class "
                "to return a connector requirements specification and try "
                "again."
            )

        if self.connector_requirements.resource_id_attr is not None:
            # Check if an attribute is set in the component configuration
            resource_id = getattr(
                self.config, self.connector_requirements.resource_id_attr
            )
        else:
            # Otherwise, use the resource ID configured in the component
            resource_id = self.connector_resource_id

        client = Client()
        try:
            self._connector_instance = client.get_service_connector_client(
                name_id_or_prefix=self.connector,
                resource_type=self.connector_requirements.resource_type,
                resource_id=resource_id,
            )
        except KeyError:
            raise RuntimeError(
                f"The connector with ID {self.connector} linked "
                f"to the '{self.name}' {self.type} stack component could not "
                f"be found or is not accessible. Please verify that the "
                f"connector exists and that you have access to it."
            )
        except ValueError as e:
            raise RuntimeError(
                f"The connector with ID {self.connector} linked "
                f"to the '{self.name}' {self.type} stack component could not "
                f"be correctly configured: {e}."
            )
        except AuthorizationException as e:
            raise RuntimeError(
                f"The connector with ID {self.connector} linked "
                f"to the '{self.name}' {self.type} stack component could not "
                f"be accessed due to an authorization error: {e}. Please "
                f"verify that you have access to the connector and try again."
            )

        return self._connector_instance

    @property
    def log_file(self) -> Optional[str]:
        """Optional path to a log file for the stack component.

        Returns:
            Optional path to a log file for the stack component.
        """
        # TODO [ENG-136]: Add support for multiple log files for a stack
        #  component. E.g. let each component return a generator that yields
        #  logs instead of specifying a single file path.
        return None

    @property
    def requirements(self) -> Set[str]:
        """Set of PyPI requirements for the component.

        Returns:
            A set of PyPI requirements for the component.
        """
        from zenml.integrations.utils import get_requirements_for_module

        return set(get_requirements_for_module(self.__module__))

    @property
    def apt_packages(self) -> List[str]:
        """List of APT package requirements for the component.

        Returns:
            A list of APT package requirements for the component.
        """
        from zenml.integrations.utils import get_integration_for_module

        integration = get_integration_for_module(self.__module__)
        return integration.APT_PACKAGES if integration else []

    @property
    def local_path(self) -> Optional[str]:
        """Path to a local directory to store persistent information.

        This property should only be implemented by components that need to
        store persistent information in a directory on the local machine and
        also need that information to be available during pipeline runs.

        IMPORTANT: the path returned by this property must always be a path
        that is relative to the ZenML local store's directory. The local
        orchestrators rely on this convention to correctly mount the
        local folders in the containers. This is an example of a valid
        path:

        ```python
        from zenml.config.global_config import GlobalConfiguration

        ...

        @property
        def local_path(self) -> Optional[str]:

            return os.path.join(
                GlobalConfiguration().local_stores_path,
                str(self.uuid),
            )
        ```

        Returns:
            A path to a local directory used by the component to store
            persistent information.
        """
        return None

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBaseModel"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        return []

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
    ) -> None:
        """Prepares deploying the pipeline.

        This method gets called immediately before a pipeline is deployed.
        Subclasses should override it if they require runtime configuration
        options or if they need to run code before the pipeline deployment.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """

    def get_pipeline_run_metadata(
        self, run_id: UUID
    ) -> Dict[str, "MetadataType"]:
        """Get general component-specific metadata for a pipeline run.

        Args:
            run_id: The ID of the pipeline run.

        Returns:
            A dictionary of metadata.
        """
        return {}

    def prepare_step_run(self, info: "StepRunInfo") -> None:
        """Prepares running a step.

        Args:
            info: Info about the step that will be executed.
        """

    def get_step_run_metadata(
        self, info: "StepRunInfo"
    ) -> Dict[str, "MetadataType"]:
        """Get component- and step-specific metadata after a step ran.

        Args:
            info: Info about the step that was executed.

        Returns:
            A dictionary of metadata.
        """
        return {}

    def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
        """Cleans up resources after the step run is finished.

        Args:
            info: Info about the step that was executed.
            step_failed: Whether the step failed.
        """

    @property
    def post_registration_message(self) -> Optional[str]:
        """Optional message printed after the stack component is registered.

        Returns:
            An optional message.
        """
        return None

    @property
    def validator(self) -> Optional["StackValidator"]:
        """The optional validator of the stack component.

        This validator will be called each time a stack with the stack
        component is initialized. Subclasses should override this property
        and return a `StackValidator` that makes sure they're not included in
        any stack that they're not compatible with.

        Returns:
            An optional `StackValidator` instance.
        """
        return None

    @property
    def is_provisioned(self) -> bool:
        """If the component provisioned resources to run.

        Returns:
            True if the component provisioned resources to run.
        """
        return True

    @property
    def is_running(self) -> bool:
        """If the component is running.

        Returns:
            True if the component is running.
        """
        return True

    @property
    def is_suspended(self) -> bool:
        """If the component is suspended.

        Returns:
            True if the component is suspended.
        """
        return not self.is_running

    def provision(self) -> None:
        """Provisions resources to run the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Provisioning resources not implemented for {self}."
        )

    def deprovision(self) -> None:
        """Deprovisions all resources of the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Deprovisioning resource not implemented for {self}."
        )

    def resume(self) -> None:
        """Resumes the provisioned resources of the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Resuming provisioned resources not implemented for {self}."
        )

    def suspend(self) -> None:
        """Suspends the provisioned resources of the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Suspending provisioned resources not implemented for {self}."
        )

    def __repr__(self) -> str:
        """String representation of the stack component.

        Returns:
            A string representation of the stack component.
        """
        attribute_representation = ", ".join(
            f"{key}={value}" for key, value in self.config.dict().items()
        )
        return (
            f"{self.__class__.__qualname__}(type={self.type}, "
            f"flavor={self.flavor}, {attribute_representation})"
        )

    def __str__(self) -> str:
        """String representation of the stack component.

        Returns:
            A string representation of the stack component.
        """
        return self.__repr__()
apt_packages: List[str] property readonly

List of APT package requirements for the component.

Returns:

Type Description
List[str]

A list of APT package requirements for the component.

config: StackComponentConfig property readonly

Returns the configuration of the stack component.

This should be overwritten by any subclasses that define custom configs to return the correct config class.

Returns:

Type Description
StackComponentConfig

The configuration of the stack component.

is_provisioned: bool property readonly

If the component provisioned resources to run.

Returns:

Type Description
bool

True if the component provisioned resources to run.

is_running: bool property readonly

If the component is running.

Returns:

Type Description
bool

True if the component is running.

is_suspended: bool property readonly

If the component is suspended.

Returns:

Type Description
bool

True if the component is suspended.

local_path: Optional[str] property readonly

Path to a local directory to store persistent information.

This property should only be implemented by components that need to store persistent information in a directory on the local machine and also need that information to be available during pipeline runs.

IMPORTANT: the path returned by this property must always be a path that is relative to the ZenML local store's directory. The local orchestrators rely on this convention to correctly mount the local folders in the containers. This is an example of a valid path:

from zenml.config.global_config import GlobalConfiguration

...

@property
def local_path(self) -> Optional[str]:

    return os.path.join(
        GlobalConfiguration().local_stores_path,
        str(self.uuid),
    )

Returns:

Type Description
Optional[str]

A path to a local directory used by the component to store persistent information.

log_file: Optional[str] property readonly

Optional path to a log file for the stack component.

Returns:

Type Description
Optional[str]

Optional path to a log file for the stack component.

post_registration_message: Optional[str] property readonly

Optional message printed after the stack component is registered.

Returns:

Type Description
Optional[str]

An optional message.

requirements: Set[str] property readonly

Set of PyPI requirements for the component.

Returns:

Type Description
Set[str]

A set of PyPI requirements for the component.

settings_class: Optional[Type[BaseSettings]] property readonly

Class specifying available settings for this component.

Returns:

Type Description
Optional[Type[BaseSettings]]

Optional settings class.

validator: Optional[StackValidator] property readonly

The optional validator of the stack component.

This validator will be called each time a stack with the stack component is initialized. Subclasses should override this property and return a StackValidator that makes sure they're not included in any stack that they're not compatible with.

Returns:

Type Description
Optional[StackValidator]

An optional StackValidator instance.

__init__(self, name, id, config, flavor, type, user, workspace, created, updated, labels=None, connector_requirements=None, connector=None, connector_resource_id=None, *args, **kwargs) special

Initializes a StackComponent.

Parameters:

Name Type Description Default
name str

The name of the component.

required
id UUID

The unique ID of the component.

required
config StackComponentConfig

The config of the component.

required
flavor str

The flavor of the component.

required
type StackComponentType

The type of the component.

required
user Optional[uuid.UUID]

The ID of the user who created the component.

required
workspace UUID

The ID of the workspace the component belongs to.

required
created datetime

The creation time of the component.

required
updated datetime

The last update time of the component.

required
labels Optional[Dict[str, Any]]

The labels of the component.

None
connector_requirements Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]

The requirements for the connector.

None
connector Optional[uuid.UUID]

The ID of a connector linked to the component.

None
connector_resource_id Optional[str]

The custom resource ID to access through the connector.

None
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}

Exceptions:

Type Description
ValueError

If a secret reference is passed as name.

Source code in zenml/stack/stack_component.py
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    workspace: UUID,
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        workspace: The ID of the workspace the component belongs to.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.workspace = workspace
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
__repr__(self) special

String representation of the stack component.

Returns:

Type Description
str

A string representation of the stack component.

Source code in zenml/stack/stack_component.py
def __repr__(self) -> str:
    """String representation of the stack component.

    Returns:
        A string representation of the stack component.
    """
    attribute_representation = ", ".join(
        f"{key}={value}" for key, value in self.config.dict().items()
    )
    return (
        f"{self.__class__.__qualname__}(type={self.type}, "
        f"flavor={self.flavor}, {attribute_representation})"
    )
__str__(self) special

String representation of the stack component.

Returns:

Type Description
str

A string representation of the stack component.

Source code in zenml/stack/stack_component.py
def __str__(self) -> str:
    """String representation of the stack component.

    Returns:
        A string representation of the stack component.
    """
    return self.__repr__()
cleanup_step_run(self, info, step_failed)

Cleans up resources after the step run is finished.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required
step_failed bool

Whether the step failed.

required
Source code in zenml/stack/stack_component.py
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
    """Cleans up resources after the step run is finished.

    Args:
        info: Info about the step that was executed.
        step_failed: Whether the step failed.
    """
connector_has_expired(self)

Checks whether the connector linked to this stack component has expired.

Returns:

Type Description
bool

Whether the connector linked to this stack component has expired, or isn't linked to a connector.

Source code in zenml/stack/stack_component.py
def connector_has_expired(self) -> bool:
    """Checks whether the connector linked to this stack component has expired.

    Returns:
        Whether the connector linked to this stack component has expired, or isn't linked to a connector.
    """
    if self.connector is None:
        # The stack component isn't linked to a connector
        return False

    if self._connector_instance is None:
        return True

    return self._connector_instance.has_expired()
deprovision(self)

Deprovisions all resources of the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def deprovision(self) -> None:
    """Deprovisions all resources of the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Deprovisioning resource not implemented for {self}."
    )
from_model(component_model) classmethod

Creates a StackComponent from a ComponentModel.

Parameters:

Name Type Description Default
component_model ComponentResponseModel

The ComponentModel to create the StackComponent

required

Returns:

Type Description
StackComponent

The created StackComponent.

Exceptions:

Type Description
ImportError

If the flavor can't be imported.

Source code in zenml/stack/stack_component.py
@classmethod
def from_model(
    cls, component_model: "ComponentResponseModel"
) -> "StackComponent":
    """Creates a StackComponent from a ComponentModel.

    Args:
        component_model: The ComponentModel to create the StackComponent

    Returns:
        The created StackComponent.

    Raises:
        ImportError: If the flavor can't be imported.
    """
    from zenml.client import Client

    flavor_model = Client().get_flavor_by_name_and_type(
        name=component_model.flavor,
        component_type=component_model.type,
    )

    try:
        from zenml.stack import Flavor

        flavor = Flavor.from_model(flavor_model)
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        raise ImportError(
            f"Couldn't import flavor {flavor_model.name}: {err}"
        )

    configuration = flavor.config_class(**component_model.configuration)

    if component_model.user is not None:
        user_id = component_model.user.id
    else:
        user_id = None

    return flavor.implementation_class(
        user=user_id,
        workspace=component_model.workspace.id,
        name=component_model.name,
        id=component_model.id,
        config=configuration,
        labels=component_model.labels,
        flavor=component_model.flavor,
        type=component_model.type,
        created=component_model.created,
        updated=component_model.updated,
        connector_requirements=flavor.service_connector_requirements,
        connector=component_model.connector.id
        if component_model.connector
        else None,
        connector_resource_id=component_model.connector_resource_id,
    )
get_connector(self)

Returns the connector linked to this stack component.

Returns:

Type Description
Optional[ServiceConnector]

The connector linked to this stack component.

Exceptions:

Type Description
RuntimeError

If the stack component does not specify connector requirements or if the connector linked to the component is not compatible or not found.

Source code in zenml/stack/stack_component.py
def get_connector(self) -> Optional["ServiceConnector"]:
    """Returns the connector linked to this stack component.

    Returns:
        The connector linked to this stack component.

    Raises:
        RuntimeError: If the stack component does not specify connector
            requirements or if the connector linked to the component is not
            compatible or not found.
    """
    from zenml.client import Client

    if self.connector is None:
        return None

    if self._connector_instance is not None:
        # If the connector instance is still valid, return it. Otherwise,
        # we'll try to get a new one.
        if not self._connector_instance.has_expired():
            return self._connector_instance

    if self.connector_requirements is None:
        raise RuntimeError(
            f"Unable to get connector for component {self} because this "
            "component does not declare any connector requirements in its. "
            "flavor specification. Override the "
            "`service_connector_requirements` method in its flavor class "
            "to return a connector requirements specification and try "
            "again."
        )

    if self.connector_requirements.resource_id_attr is not None:
        # Check if an attribute is set in the component configuration
        resource_id = getattr(
            self.config, self.connector_requirements.resource_id_attr
        )
    else:
        # Otherwise, use the resource ID configured in the component
        resource_id = self.connector_resource_id

    client = Client()
    try:
        self._connector_instance = client.get_service_connector_client(
            name_id_or_prefix=self.connector,
            resource_type=self.connector_requirements.resource_type,
            resource_id=resource_id,
        )
    except KeyError:
        raise RuntimeError(
            f"The connector with ID {self.connector} linked "
            f"to the '{self.name}' {self.type} stack component could not "
            f"be found or is not accessible. Please verify that the "
            f"connector exists and that you have access to it."
        )
    except ValueError as e:
        raise RuntimeError(
            f"The connector with ID {self.connector} linked "
            f"to the '{self.name}' {self.type} stack component could not "
            f"be correctly configured: {e}."
        )
    except AuthorizationException as e:
        raise RuntimeError(
            f"The connector with ID {self.connector} linked "
            f"to the '{self.name}' {self.type} stack component could not "
            f"be accessed due to an authorization error: {e}. Please "
            f"verify that you have access to the connector and try again."
        )

    return self._connector_instance
get_docker_builds(self, deployment)

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in zenml/stack/stack_component.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBaseModel"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    return []
get_pipeline_run_metadata(self, run_id)

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in zenml/stack/stack_component.py
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    return {}
get_settings(self, container)

Gets settings for this stack component.

This will return None if the stack component doesn't specify a settings class or the container doesn't contain runtime options for this component.

Parameters:

Name Type Description Default
container Union[Step, StepRunInfo, PipelineDeploymentBaseModel]

The Step, StepRunInfo or PipelineDeployment from which to get the settings.

required

Returns:

Type Description
BaseSettings

Settings for this stack component.

Exceptions:

Type Description
RuntimeError

If the stack component does not specify a settings class.

Source code in zenml/stack/stack_component.py
def get_settings(
    self,
    container: Union["Step", "StepRunInfo", "PipelineDeploymentBaseModel"],
) -> "BaseSettings":
    """Gets settings for this stack component.

    This will return `None` if the stack component doesn't specify a
    settings class or the container doesn't contain runtime
    options for this component.

    Args:
        container: The `Step`, `StepRunInfo` or `PipelineDeployment` from
            which to get the settings.

    Returns:
        Settings for this stack component.

    Raises:
        RuntimeError: If the stack component does not specify a settings
            class.
    """
    if not self.settings_class:
        raise RuntimeError(
            f"Unable to get settings for component {self} because this "
            "component does not have an associated settings class. "
            "Return a settings class from the `@settings_class` property "
            "and try again."
        )

    key = settings_utils.get_stack_component_setting_key(self)

    all_settings = (
        container.config.settings
        if isinstance(container, (Step, StepRunInfo))
        else container.pipeline_configuration.settings
    )

    if key in all_settings:
        return self.settings_class.parse_obj(all_settings[key])
    else:
        return self.settings_class()
get_step_run_metadata(self, info)

Get component- and step-specific metadata after a step ran.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in zenml/stack/stack_component.py
def get_step_run_metadata(
    self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
    """Get component- and step-specific metadata after a step ran.

    Args:
        info: Info about the step that was executed.

    Returns:
        A dictionary of metadata.
    """
    return {}
prepare_pipeline_deployment(self, deployment, stack)

Prepares deploying the pipeline.

This method gets called immediately before a pipeline is deployed. Subclasses should override it if they require runtime configuration options or if they need to run code before the pipeline deployment.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponseModel

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/stack/stack_component.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeploymentResponseModel",
    stack: "Stack",
) -> None:
    """Prepares deploying the pipeline.

    This method gets called immediately before a pipeline is deployed.
    Subclasses should override it if they require runtime configuration
    options or if they need to run code before the pipeline deployment.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
prepare_step_run(self, info)

Prepares running a step.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that will be executed.

required
Source code in zenml/stack/stack_component.py
def prepare_step_run(self, info: "StepRunInfo") -> None:
    """Prepares running a step.

    Args:
        info: Info about the step that will be executed.
    """
provision(self)

Provisions resources to run the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def provision(self) -> None:
    """Provisions resources to run the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Provisioning resources not implemented for {self}."
    )
resume(self)

Resumes the provisioned resources of the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def resume(self) -> None:
    """Resumes the provisioned resources of the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Resuming provisioned resources not implemented for {self}."
    )
suspend(self)

Suspends the provisioned resources of the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def suspend(self) -> None:
    """Suspends the provisioned resources of the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Suspending provisioned resources not implemented for {self}."
    )

StackComponentConfig (BaseModel, ABC) pydantic-model

Base class for all ZenML stack component configs.

Source code in zenml/stack/stack_component.py
class StackComponentConfig(BaseModel, ABC):
    """Base class for all ZenML stack component configs."""

    def __init__(
        self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
    ) -> None:
        """Ensures that secret references don't clash with pydantic validation.

        StackComponents allow the specification of all their string attributes
        using secret references of the form `{{secret_name.key}}`. This however
        is only possible when the stack component does not perform any explicit
        validation of this attribute using pydantic validators. If this were
        the case, the validation would run on the secret reference and would
        fail or in the worst case, modify the secret reference and lead to
        unexpected behavior. This method ensures that no attributes that require
        custom pydantic validation are set as secret references.

        Args:
            warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
            **kwargs: Arguments to initialize this stack component.

        Raises:
            ValueError: If an attribute that requires custom pydantic validation
                is passed as a secret reference, or if the `name` attribute
                was passed as a secret reference.
        """
        for key, value in kwargs.items():
            try:
                field = self.__class__.__fields__[key]
            except KeyError:
                # Value for a private attribute or non-existing field, this
                # will fail during the upcoming pydantic validation
                continue

            if value is None:
                continue

            if not secret_utils.is_secret_reference(value):
                if (
                    secret_utils.is_secret_field(field)
                    and warn_about_plain_text_secrets
                ):
                    logger.warning(
                        "You specified a plain-text value for the sensitive "
                        f"attribute `{key}` for a `{self.__class__.__name__}` "
                        "stack component. This is currently only a warning, "
                        "but future versions of ZenML will require you to pass "
                        "in sensitive information as secrets. Check out the "
                        "documentation on how to configure your stack "
                        "components with secrets here: "
                        "https://docs.zenml.io/platform-guide/set-up-your-mlops-platform/use-the-secret-store"
                    )
                continue

            requires_validation = field.pre_validators or field.post_validators
            if requires_validation:
                raise ValueError(
                    f"Passing the stack component attribute `{key}` as a "
                    "secret reference is not allowed as additional validation "
                    "is required for this attribute."
                )

        super().__init__(**kwargs)

    @property
    def required_secrets(self) -> Set[secret_utils.SecretReference]:
        """All required secrets for this stack component.

        Returns:
            The required secrets of this stack component.
        """
        return {
            secret_utils.parse_secret_reference(v)
            for v in self.dict().values()
            if secret_utils.is_secret_reference(v)
        }

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        Concrete stack component configuration classes should override this
        method to return True if the stack component is running in a remote
        location, and it needs to access the ZenML database.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Examples:
          * Orchestrators that are running pipelines in the cloud or in a
          location other than the local host
          * Step Operators that are running steps in the cloud or in a location
          other than the local host

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return False

    @property
    def is_valid(self) -> bool:
        """Checks if the stack component configurations are valid.

        Concrete stack component configuration classes should override this
        method to return False if the stack component configurations are invalid.

        Returns:
            True if the stack component config is valid, False otherwise.
        """
        return True

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        Concrete stack component configuration classes should override this
        method to return True if the stack component is relying on local
        resources or capabilities (e.g. local filesystem, local database or
        other services).

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Examples:
          * Artifact Stores that store artifacts in the local filesystem
          * Orchestrators that are connected to local orchestration runtime
          services (e.g. local Kubernetes clusters, Docker containers etc).

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False

    def __custom_getattribute__(self, key: str) -> Any:
        """Returns the (potentially resolved) attribute value for the given key.

        An attribute value may be either specified directly, or as a secret
        reference. In case of a secret reference, this method resolves the
        reference and returns the secret value instead.

        Args:
            key: The key for which to get the attribute value.

        Raises:
            RuntimeError: If the stack component is not part of the active
                stack, or the active stack is missing a secrets manager.
            KeyError: If the secret or secret key don't exist.

        Returns:
            The (potentially resolved) attribute value.
        """
        from zenml.client import Client

        value = super().__getattribute__(key)

        if not secret_utils.is_secret_reference(value):
            return value

        secret_ref = secret_utils.parse_secret_reference(value)

        # Try to resolve the secret using the secret store first
        try:
            store_secret = Client().get_secret_by_name_and_scope(
                name=secret_ref.name,
            )
        except (KeyError, NotImplementedError):
            pass
        else:
            if secret_ref.key in store_secret.values:
                return store_secret.secret_values[secret_ref.key]
            else:
                raise KeyError(
                    f"Failed to resolve secret reference for attribute {key} "
                    f"of stack component `{self}`. "
                    f"The secret {secret_ref.name} does not contain a value "
                    f"for key {secret_ref.key}. Available keys: "
                    f"{set(store_secret.values)}."
                )

        # A stack component can be part of many stacks, and currently a
        # secrets manager is associated with a stack. This means we're
        # not able to identify the 'correct' secrets manager that the user
        # wanted to resolve the secrets in a general way. We therefore
        # limit secret resolving to components of the active stack.
        if not self._is_part_of_active_stack():
            raise RuntimeError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The stack component is not "
                "part of the active stack and therefore can't have it's "
                "secret references resolved. If you want to access attributes "
                "of this stack component which reference secrets, set a stack "
                "which includes both this component and a secrets manager as "
                "your active stack: `zenml stack set <STACK_NAME>`."
            )

        secrets_manager = Client().active_stack.secrets_manager
        if not secrets_manager:
            raise RuntimeError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The active stack does not "
                "have a secrets manager."
            )

        try:
            secret = secrets_manager.get_secret(secret_ref.name)
        except KeyError:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The secret "
                f"{secret_ref.name} does not exist."
            )

        try:
            secret_value = secret.content[secret_ref.key]
        except KeyError:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The secret "
                f"{secret_ref.name} does not contain a value for key "
                f"{secret_ref.key}. Available keys: {set(secret.content)}."
            )

        return str(secret_value)

    def _is_part_of_active_stack(self) -> bool:
        """Checks if this config belongs to a component in the active stack.

        Returns:
            True if this config belongs to a component in the active stack,
            False otherwise.
        """
        from zenml.client import Client

        for component in Client().active_stack.components.values():
            if component.config == self:
                return True
        return False

    if not TYPE_CHECKING:
        # When defining __getattribute__, mypy allows accessing non-existent
        # attributes without failing
        # (see https://github.com/python/mypy/issues/13319).
        __getattribute__ = __custom_getattribute__

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
        # prevent extra attributes during model initialization
        extra = Extra.forbid
is_local: bool property readonly

Checks if this stack component is running locally.

Concrete stack component configuration classes should override this method to return True if the stack component is relying on local resources or capabilities (e.g. local filesystem, local database or other services).

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Examples:

  • Artifact Stores that store artifacts in the local filesystem
  • Orchestrators that are connected to local orchestration runtime services (e.g. local Kubernetes clusters, Docker containers etc).

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property readonly

Checks if this stack component is running remotely.

Concrete stack component configuration classes should override this method to return True if the stack component is running in a remote location, and it needs to access the ZenML database.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Examples:

  • Orchestrators that are running pipelines in the cloud or in a location other than the local host
  • Step Operators that are running steps in the cloud or in a location other than the local host

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_valid: bool property readonly

Checks if the stack component configurations are valid.

Concrete stack component configuration classes should override this method to return False if the stack component configurations are invalid.

Returns:

Type Description
bool

True if the stack component config is valid, False otherwise.

required_secrets: Set[zenml.utils.secret_utils.SecretReference] property readonly

All required secrets for this stack component.

Returns:

Type Description
Set[zenml.utils.secret_utils.SecretReference]

The required secrets of this stack component.

Config

Pydantic configuration class.

Source code in zenml/stack/stack_component.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
    # prevent extra attributes during model initialization
    extra = Extra.forbid
__custom_getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
RuntimeError

If the stack component is not part of the active stack, or the active stack is missing a secrets manager.

KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/stack/stack_component.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        RuntimeError: If the stack component is not part of the active
            stack, or the active stack is missing a secrets manager.
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    from zenml.client import Client

    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    secret_ref = secret_utils.parse_secret_reference(value)

    # Try to resolve the secret using the secret store first
    try:
        store_secret = Client().get_secret_by_name_and_scope(
            name=secret_ref.name,
        )
    except (KeyError, NotImplementedError):
        pass
    else:
        if secret_ref.key in store_secret.values:
            return store_secret.secret_values[secret_ref.key]
        else:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`. "
                f"The secret {secret_ref.name} does not contain a value "
                f"for key {secret_ref.key}. Available keys: "
                f"{set(store_secret.values)}."
            )

    # A stack component can be part of many stacks, and currently a
    # secrets manager is associated with a stack. This means we're
    # not able to identify the 'correct' secrets manager that the user
    # wanted to resolve the secrets in a general way. We therefore
    # limit secret resolving to components of the active stack.
    if not self._is_part_of_active_stack():
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The stack component is not "
            "part of the active stack and therefore can't have it's "
            "secret references resolved. If you want to access attributes "
            "of this stack component which reference secrets, set a stack "
            "which includes both this component and a secrets manager as "
            "your active stack: `zenml stack set <STACK_NAME>`."
        )

    secrets_manager = Client().active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The active stack does not "
            "have a secrets manager."
        )

    try:
        secret = secrets_manager.get_secret(secret_ref.name)
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not exist."
        )

    try:
        secret_value = secret.content[secret_ref.key]
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not contain a value for key "
            f"{secret_ref.key}. Available keys: {set(secret.content)}."
        )

    return str(secret_value)
__getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
RuntimeError

If the stack component is not part of the active stack, or the active stack is missing a secrets manager.

KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/stack/stack_component.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        RuntimeError: If the stack component is not part of the active
            stack, or the active stack is missing a secrets manager.
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    from zenml.client import Client

    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    secret_ref = secret_utils.parse_secret_reference(value)

    # Try to resolve the secret using the secret store first
    try:
        store_secret = Client().get_secret_by_name_and_scope(
            name=secret_ref.name,
        )
    except (KeyError, NotImplementedError):
        pass
    else:
        if secret_ref.key in store_secret.values:
            return store_secret.secret_values[secret_ref.key]
        else:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`. "
                f"The secret {secret_ref.name} does not contain a value "
                f"for key {secret_ref.key}. Available keys: "
                f"{set(store_secret.values)}."
            )

    # A stack component can be part of many stacks, and currently a
    # secrets manager is associated with a stack. This means we're
    # not able to identify the 'correct' secrets manager that the user
    # wanted to resolve the secrets in a general way. We therefore
    # limit secret resolving to components of the active stack.
    if not self._is_part_of_active_stack():
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The stack component is not "
            "part of the active stack and therefore can't have it's "
            "secret references resolved. If you want to access attributes "
            "of this stack component which reference secrets, set a stack "
            "which includes both this component and a secrets manager as "
            "your active stack: `zenml stack set <STACK_NAME>`."
        )

    secrets_manager = Client().active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The active stack does not "
            "have a secrets manager."
        )

    try:
        secret = secrets_manager.get_secret(secret_ref.name)
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not exist."
        )

    try:
        secret_value = secret.content[secret_ref.key]
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not contain a value for key "
            f"{secret_ref.key}. Available keys: {set(secret.content)}."
        )

    return str(secret_value)
__init__(self, warn_about_plain_text_secrets=False, **kwargs) special

Ensures that secret references don't clash with pydantic validation.

StackComponents allow the specification of all their string attributes using secret references of the form {{secret_name.key}}. This however is only possible when the stack component does not perform any explicit validation of this attribute using pydantic validators. If this were the case, the validation would run on the secret reference and would fail or in the worst case, modify the secret reference and lead to unexpected behavior. This method ensures that no attributes that require custom pydantic validation are set as secret references.

Parameters:

Name Type Description Default
warn_about_plain_text_secrets bool

If true, then warns about using plain-text secrets.

False
**kwargs Any

Arguments to initialize this stack component.

{}

Exceptions:

Type Description
ValueError

If an attribute that requires custom pydantic validation is passed as a secret reference, or if the name attribute was passed as a secret reference.

Source code in zenml/stack/stack_component.py
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.__fields__[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/platform-guide/set-up-your-mlops-platform/use-the-secret-store"
                )
            continue

        requires_validation = field.pre_validators or field.post_validators
        if requires_validation:
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)

stack_validator

Implementation of the ZenML Stack Validator.

StackValidator

A StackValidator is used to validate a stack configuration.

Each StackComponent can provide a StackValidator to make sure it is compatible with all components of the stack. The KubeflowOrchestrator for example will always require the stack to have a container registry in order to push the docker images that are required to run a pipeline in Kubeflow Pipelines.

Source code in zenml/stack/stack_validator.py
class StackValidator:
    """A `StackValidator` is used to validate a stack configuration.

    Each `StackComponent` can provide a `StackValidator` to make sure it is
    compatible with all components of the stack. The `KubeflowOrchestrator`
    for example will always require the stack to have a container registry
    in order to push the docker images that are required to run a pipeline
    in Kubeflow Pipelines.
    """

    def __init__(
        self,
        required_components: Optional[AbstractSet[StackComponentType]] = None,
        custom_validation_function: Optional[
            Callable[["Stack"], Tuple[bool, str]]
        ] = None,
    ):
        """Initializes a `StackValidator` instance.

        Args:
            required_components: Optional set of stack components that must
                exist in the stack.
            custom_validation_function: Optional function that returns whether
                a stack is valid and an error message to show if not valid.
        """
        self._required_components = required_components or set()
        self._custom_validation_function = custom_validation_function

    def validate(self, stack: "Stack") -> None:
        """Validates the given stack.

        Checks if the stack contains all the required components and passes
        the custom validation function of the validator.

        Args:
            stack: The stack to validate.

        Raises:
            StackValidationError: If the stack does not meet all the
                validation criteria.
        """
        missing_components = self._required_components - set(stack.components)
        if missing_components:
            raise StackValidationError(
                f"Missing stack components {missing_components} for "
                f"stack: {stack.name}"
            )

        if self._custom_validation_function:
            valid, err_msg = self._custom_validation_function(stack)
            if not valid:
                raise StackValidationError(
                    f"Custom validation function failed to validate "
                    f"stack '{stack.name}': {err_msg}"
                )
__init__(self, required_components=None, custom_validation_function=None) special

Initializes a StackValidator instance.

Parameters:

Name Type Description Default
required_components Optional[AbstractSet[zenml.enums.StackComponentType]]

Optional set of stack components that must exist in the stack.

None
custom_validation_function Optional[Callable[[Stack], Tuple[bool, str]]]

Optional function that returns whether a stack is valid and an error message to show if not valid.

None
Source code in zenml/stack/stack_validator.py
def __init__(
    self,
    required_components: Optional[AbstractSet[StackComponentType]] = None,
    custom_validation_function: Optional[
        Callable[["Stack"], Tuple[bool, str]]
    ] = None,
):
    """Initializes a `StackValidator` instance.

    Args:
        required_components: Optional set of stack components that must
            exist in the stack.
        custom_validation_function: Optional function that returns whether
            a stack is valid and an error message to show if not valid.
    """
    self._required_components = required_components or set()
    self._custom_validation_function = custom_validation_function
validate(self, stack)

Validates the given stack.

Checks if the stack contains all the required components and passes the custom validation function of the validator.

Parameters:

Name Type Description Default
stack Stack

The stack to validate.

required

Exceptions:

Type Description
StackValidationError

If the stack does not meet all the validation criteria.

Source code in zenml/stack/stack_validator.py
def validate(self, stack: "Stack") -> None:
    """Validates the given stack.

    Checks if the stack contains all the required components and passes
    the custom validation function of the validator.

    Args:
        stack: The stack to validate.

    Raises:
        StackValidationError: If the stack does not meet all the
            validation criteria.
    """
    missing_components = self._required_components - set(stack.components)
    if missing_components:
        raise StackValidationError(
            f"Missing stack components {missing_components} for "
            f"stack: {stack.name}"
        )

    if self._custom_validation_function:
        valid, err_msg = self._custom_validation_function(stack)
        if not valid:
            raise StackValidationError(
                f"Custom validation function failed to validate "
                f"stack '{stack.name}': {err_msg}"
            )