Stack
zenml.stack
special
Initialization of the ZenML Stack.
The stack is essentially all the configuration for the infrastructure of your MLOps platform.
A stack is made up of multiple components. Some examples are:
- An Artifact Store
- An Orchestrator
- A Step Operator (Optional)
- A Container Registry (Optional)
authentication_mixin
Stack component mixin for authentication.
AuthenticationConfigMixin (StackComponentConfig)
pydantic-model
Base config for authentication mixins.
Any stack component that implements AuthenticationMixin
should have a
config that inherits from this class.
Attributes:
Name | Type | Description |
---|---|---|
authentication_secret |
Optional[str] |
Name of the secret that stores the authentication credentials. |
Source code in zenml/stack/authentication_mixin.py
class AuthenticationConfigMixin(StackComponentConfig):
"""Base config for authentication mixins.
Any stack component that implements `AuthenticationMixin` should have a
config that inherits from this class.
Attributes:
authentication_secret: Name of the secret that stores the
authentication credentials.
"""
authentication_secret: Optional[str] = None
AuthenticationMixin (StackComponent)
Stack component mixin for authentication.
Any stack component that implements this mixin should have a config that
inherits from AuthenticationConfigMixin
.
Source code in zenml/stack/authentication_mixin.py
class AuthenticationMixin(StackComponent):
"""Stack component mixin for authentication.
Any stack component that implements this mixin should have a config that
inherits from `AuthenticationConfigMixin`.
"""
@property
def config(self) -> AuthenticationConfigMixin:
"""Returns the `AuthenticationConfigMixin` config.
Returns:
The configuration.
"""
return cast(AuthenticationConfigMixin, self._config)
def get_authentication_secret(
self, expected_schema_type: Type[T]
) -> Optional[T]:
"""Gets the secret referred to by the authentication secret attribute.
Args:
expected_schema_type: The expected secret schema class.
Returns:
The secret object if the `authentication_secret` attribute is set,
`None` otherwise.
Raises:
RuntimeError: If no secrets manager exists in the active stack.
TypeError: If the secret is not of the expected schema type.
"""
if not self.config.authentication_secret:
return None
active_stack = Client().active_stack
secrets_manager = active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Unable to retrieve secret '{self.config.authentication_secret}' "
"because the active stack does not have a secrets manager."
)
secret = secrets_manager.get_secret(self.config.authentication_secret)
if not isinstance(secret, expected_schema_type):
raise TypeError(
f"Authentication secret has type {secret.TYPE} but a secret of "
f"type {expected_schema_type.TYPE} was expected. To solve this "
f"issue, register a secret with name "
f"{self.config.authentication_secret} of type "
f"{expected_schema_type.TYPE} using the following command: \n "
f"`zenml secrets-manager secret register {self.config.authentication_secret} "
f"--schema={expected_schema_type.TYPE} ...`"
)
return secret
config: AuthenticationConfigMixin
property
readonly
Returns the AuthenticationConfigMixin
config.
Returns:
Type | Description |
---|---|
AuthenticationConfigMixin |
The configuration. |
get_authentication_secret(self, expected_schema_type)
Gets the secret referred to by the authentication secret attribute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
expected_schema_type |
Type[~T] |
The expected secret schema class. |
required |
Returns:
Type | Description |
---|---|
Optional[~T] |
The secret object if the |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no secrets manager exists in the active stack. |
TypeError |
If the secret is not of the expected schema type. |
Source code in zenml/stack/authentication_mixin.py
def get_authentication_secret(
self, expected_schema_type: Type[T]
) -> Optional[T]:
"""Gets the secret referred to by the authentication secret attribute.
Args:
expected_schema_type: The expected secret schema class.
Returns:
The secret object if the `authentication_secret` attribute is set,
`None` otherwise.
Raises:
RuntimeError: If no secrets manager exists in the active stack.
TypeError: If the secret is not of the expected schema type.
"""
if not self.config.authentication_secret:
return None
active_stack = Client().active_stack
secrets_manager = active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Unable to retrieve secret '{self.config.authentication_secret}' "
"because the active stack does not have a secrets manager."
)
secret = secrets_manager.get_secret(self.config.authentication_secret)
if not isinstance(secret, expected_schema_type):
raise TypeError(
f"Authentication secret has type {secret.TYPE} but a secret of "
f"type {expected_schema_type.TYPE} was expected. To solve this "
f"issue, register a secret with name "
f"{self.config.authentication_secret} of type "
f"{expected_schema_type.TYPE} using the following command: \n "
f"`zenml secrets-manager secret register {self.config.authentication_secret} "
f"--schema={expected_schema_type.TYPE} ...`"
)
return secret
flavor
Base ZenML Flavor implementation.
Flavor
Class for ZenML Flavors.
Source code in zenml/stack/flavor.py
class Flavor:
"""Class for ZenML Flavors."""
@property
@abstractmethod
def name(self) -> str:
"""The flavor name.
Returns:
The flavor name.
"""
@property
@abstractmethod
def type(self) -> StackComponentType:
"""The stack component type.
Returns:
The stack component type.
"""
@property
@abstractmethod
def implementation_class(self) -> Type[StackComponent]:
"""Implementation class for this flavor.
Returns:
The implementation class for this flavor.
"""
@property
@abstractmethod
def config_class(self) -> Type[StackComponentConfig]:
"""Returns `StackComponentConfig` config class.
Returns:
The config class.
"""
@property
def config_schema(self) -> str:
"""The config schema for a flavor.
Returns:
The config schema.
"""
return self.config_class.schema_json()
@classmethod
def from_model(cls, flavor_model: FlavorResponseModel) -> "Flavor":
"""Loads a flavor from a model.
Args:
flavor_model: The model to load from.
Returns:
The loaded flavor.
"""
flavor = load_source_path_class(flavor_model.source)() # noqa
return cast(Flavor, flavor)
def to_model(
self, integration: Optional[str] = None
) -> FlavorRequestModel:
"""Converts a flavor to a model.
Args:
integration: The integration to use for the model.
Returns:
The model.
"""
from zenml.client import Client
client = Client()
model = FlavorRequestModel(
user=client.active_user.id,
project=client.active_project.id,
name=self.name,
type=self.type,
source=resolve_class(self.__class__), # noqa
config_schema=self.config_schema,
integration=integration,
)
return model
config_class: Type[zenml.stack.stack_component.StackComponentConfig]
property
readonly
Returns StackComponentConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.stack.stack_component.StackComponentConfig] |
The config class. |
config_schema: str
property
readonly
The config schema for a flavor.
Returns:
Type | Description |
---|---|
str |
The config schema. |
implementation_class: Type[zenml.stack.stack_component.StackComponent]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.stack.stack_component.StackComponent] |
The implementation class for this flavor. |
name: str
property
readonly
The flavor name.
Returns:
Type | Description |
---|---|
str |
The flavor name. |
type: StackComponentType
property
readonly
The stack component type.
Returns:
Type | Description |
---|---|
StackComponentType |
The stack component type. |
from_model(flavor_model)
classmethod
Loads a flavor from a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_model |
FlavorResponseModel |
The model to load from. |
required |
Returns:
Type | Description |
---|---|
Flavor |
The loaded flavor. |
Source code in zenml/stack/flavor.py
@classmethod
def from_model(cls, flavor_model: FlavorResponseModel) -> "Flavor":
"""Loads a flavor from a model.
Args:
flavor_model: The model to load from.
Returns:
The loaded flavor.
"""
flavor = load_source_path_class(flavor_model.source)() # noqa
return cast(Flavor, flavor)
to_model(self, integration=None)
Converts a flavor to a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
integration |
Optional[str] |
The integration to use for the model. |
None |
Returns:
Type | Description |
---|---|
FlavorRequestModel |
The model. |
Source code in zenml/stack/flavor.py
def to_model(
self, integration: Optional[str] = None
) -> FlavorRequestModel:
"""Converts a flavor to a model.
Args:
integration: The integration to use for the model.
Returns:
The model.
"""
from zenml.client import Client
client = Client()
model = FlavorRequestModel(
user=client.active_user.id,
project=client.active_project.id,
name=self.name,
type=self.type,
source=resolve_class(self.__class__), # noqa
config_schema=self.config_schema,
integration=integration,
)
return model
flavor_registry
Implementation of the ZenML flavor registry.
FlavorRegistry
Registry for stack component flavors.
The flavors defined by ZenML must be registered here.
Source code in zenml/stack/flavor_registry.py
class FlavorRegistry:
"""Registry for stack component flavors.
The flavors defined by ZenML must be registered here.
"""
def __init__(self) -> None:
"""Initialization of the flavors."""
self._flavors: DefaultDict[
StackComponentType, Dict[str, FlavorResponseModel]
] = defaultdict(dict)
self.register_default_flavors()
self.register_integration_flavors()
def register_default_flavors(self) -> None:
"""Registers the default built-in flavors."""
from zenml.artifact_stores import LocalArtifactStoreFlavor
from zenml.container_registries import (
AzureContainerRegistryFlavor,
DefaultContainerRegistryFlavor,
DockerHubContainerRegistryFlavor,
GCPContainerRegistryFlavor,
GitHubContainerRegistryFlavor,
)
from zenml.orchestrators import (
LocalDockerOrchestratorFlavor,
LocalOrchestratorFlavor,
)
from zenml.secrets_managers import LocalSecretsManagerFlavor
default_flavors = (
LocalArtifactStoreFlavor,
LocalOrchestratorFlavor,
LocalDockerOrchestratorFlavor,
DefaultContainerRegistryFlavor,
AzureContainerRegistryFlavor,
DockerHubContainerRegistryFlavor,
GCPContainerRegistryFlavor,
GitHubContainerRegistryFlavor,
LocalSecretsManagerFlavor,
)
for flavor in default_flavors:
flavor_instance = flavor() # type: ignore[abstract]
self._register_flavor(
flavor_instance.to_model(integration="built-in")
)
def register_integration_flavors(self) -> None:
"""Registers the flavors implemented by integrations."""
for name, integration in integration_registry.integrations.items():
integrated_flavors = integration.flavors()
if integrated_flavors:
for flavor in integrated_flavors:
self._register_flavor(flavor().to_model(integration=name))
def _register_flavor(
self,
flavor: FlavorRequestModel,
) -> None:
"""Registers a stack component flavor.
Args:
flavor: The flavor to register.
Raises:
KeyError: If the flavor is already registered.
"""
flavors = self._flavors[flavor.type]
if flavor.name in flavors:
raise KeyError(
f"There is already a {flavor.type} with the flavor "
f"`{flavor.name}`. Please select another name for the flavor."
)
client = Client()
flavor_response_model = FlavorResponseModel(
name=flavor.name,
type=flavor.type,
config_schema=flavor.config_schema,
source=flavor.source,
integration=flavor.integration,
# This is a small trick to convert the request to response
id=UUID(int=0),
user=client.active_user,
project=client.active_project,
created=datetime.utcnow(),
updated=datetime.utcnow(),
)
flavors[flavor.name] = flavor_response_model
logger.debug(
f"Registered flavor for '{flavor.name}' and type '{flavor.type}'.",
)
@property
def flavors(self) -> List[FlavorResponseModel]:
"""Returns all registered flavors.
Returns:
The list of all registered flavors.
"""
flavors = list()
for flavors_by_type in self._flavors.values():
for flavor in flavors_by_type.values():
flavors.append(flavor)
return flavors
def get_flavors_by_type(
self, component_type: StackComponentType
) -> List[FlavorResponseModel]:
"""Return the list of flavors with given type.
Args:
component_type: The type of the stack component.
Returns:
The list of flavors with the given type.
"""
return list(self._flavors[component_type].values())
def get_flavor_by_name_and_type(
self, name: str, component_type: StackComponentType
) -> FlavorResponseModel:
"""Gets the flavor for a given name and type.
Args:
name: The name of the flavor.
component_type: The type of the stack component.
Returns:
The flavor with the given name and type.
"""
return self._flavors[component_type][name]
flavors: List[zenml.models.flavor_models.FlavorResponseModel]
property
readonly
Returns all registered flavors.
Returns:
Type | Description |
---|---|
List[zenml.models.flavor_models.FlavorResponseModel] |
The list of all registered flavors. |
__init__(self)
special
Initialization of the flavors.
Source code in zenml/stack/flavor_registry.py
def __init__(self) -> None:
"""Initialization of the flavors."""
self._flavors: DefaultDict[
StackComponentType, Dict[str, FlavorResponseModel]
] = defaultdict(dict)
self.register_default_flavors()
self.register_integration_flavors()
get_flavor_by_name_and_type(self, name, component_type)
Gets the flavor for a given name and type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the flavor. |
required |
component_type |
StackComponentType |
The type of the stack component. |
required |
Returns:
Type | Description |
---|---|
FlavorResponseModel |
The flavor with the given name and type. |
Source code in zenml/stack/flavor_registry.py
def get_flavor_by_name_and_type(
self, name: str, component_type: StackComponentType
) -> FlavorResponseModel:
"""Gets the flavor for a given name and type.
Args:
name: The name of the flavor.
component_type: The type of the stack component.
Returns:
The flavor with the given name and type.
"""
return self._flavors[component_type][name]
get_flavors_by_type(self, component_type)
Return the list of flavors with given type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
The type of the stack component. |
required |
Returns:
Type | Description |
---|---|
List[zenml.models.flavor_models.FlavorResponseModel] |
The list of flavors with the given type. |
Source code in zenml/stack/flavor_registry.py
def get_flavors_by_type(
self, component_type: StackComponentType
) -> List[FlavorResponseModel]:
"""Return the list of flavors with given type.
Args:
component_type: The type of the stack component.
Returns:
The list of flavors with the given type.
"""
return list(self._flavors[component_type].values())
register_default_flavors(self)
Registers the default built-in flavors.
Source code in zenml/stack/flavor_registry.py
def register_default_flavors(self) -> None:
"""Registers the default built-in flavors."""
from zenml.artifact_stores import LocalArtifactStoreFlavor
from zenml.container_registries import (
AzureContainerRegistryFlavor,
DefaultContainerRegistryFlavor,
DockerHubContainerRegistryFlavor,
GCPContainerRegistryFlavor,
GitHubContainerRegistryFlavor,
)
from zenml.orchestrators import (
LocalDockerOrchestratorFlavor,
LocalOrchestratorFlavor,
)
from zenml.secrets_managers import LocalSecretsManagerFlavor
default_flavors = (
LocalArtifactStoreFlavor,
LocalOrchestratorFlavor,
LocalDockerOrchestratorFlavor,
DefaultContainerRegistryFlavor,
AzureContainerRegistryFlavor,
DockerHubContainerRegistryFlavor,
GCPContainerRegistryFlavor,
GitHubContainerRegistryFlavor,
LocalSecretsManagerFlavor,
)
for flavor in default_flavors:
flavor_instance = flavor() # type: ignore[abstract]
self._register_flavor(
flavor_instance.to_model(integration="built-in")
)
register_integration_flavors(self)
Registers the flavors implemented by integrations.
Source code in zenml/stack/flavor_registry.py
def register_integration_flavors(self) -> None:
"""Registers the flavors implemented by integrations."""
for name, integration in integration_registry.integrations.items():
integrated_flavors = integration.flavors()
if integrated_flavors:
for flavor in integrated_flavors:
self._register_flavor(flavor().to_model(integration=name))
stack
Implementation of the ZenML Stack class.
Stack
ZenML stack class.
A ZenML stack is a collection of multiple stack components that are required to run ZenML pipelines. Some of these components (orchestrator, and artifact store) are required to run any kind of pipeline, other components like the container registry are only required if other stack components depend on them.
Source code in zenml/stack/stack.py
class Stack:
"""ZenML stack class.
A ZenML stack is a collection of multiple stack components that are
required to run ZenML pipelines. Some of these components (orchestrator,
and artifact store) are required to run any kind of
pipeline, other components like the container registry are only required
if other stack components depend on them.
"""
def __init__(
self,
id: UUID,
name: str,
*,
orchestrator: "BaseOrchestrator",
artifact_store: "BaseArtifactStore",
container_registry: Optional["BaseContainerRegistry"] = None,
secrets_manager: Optional["BaseSecretsManager"] = None,
step_operator: Optional["BaseStepOperator"] = None,
feature_store: Optional["BaseFeatureStore"] = None,
model_deployer: Optional["BaseModelDeployer"] = None,
experiment_tracker: Optional["BaseExperimentTracker"] = None,
alerter: Optional["BaseAlerter"] = None,
annotator: Optional["BaseAnnotator"] = None,
data_validator: Optional["BaseDataValidator"] = None,
):
"""Initializes and validates a stack instance.
# noqa: DAR402
Args:
id: Unique ID of the stack.
name: Name of the stack.
orchestrator: Orchestrator component of the stack.
artifact_store: Artifact store component of the stack.
container_registry: Container registry component of the stack.
secrets_manager: Secrets manager component of the stack.
step_operator: Step operator component of the stack.
feature_store: Feature store component of the stack.
model_deployer: Model deployer component of the stack.
experiment_tracker: Experiment tracker component of the stack.
alerter: Alerter component of the stack.
annotator: Annotator component of the stack.
data_validator: Data validator component of the stack.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self._id = id
self._name = name
self._orchestrator = orchestrator
self._artifact_store = artifact_store
self._container_registry = container_registry
self._step_operator = step_operator
self._secrets_manager = secrets_manager
self._feature_store = feature_store
self._model_deployer = model_deployer
self._experiment_tracker = experiment_tracker
self._alerter = alerter
self._annotator = annotator
self._data_validator = data_validator
@classmethod
def from_model(cls, stack_model: StackResponseModel) -> "Stack":
"""Creates a Stack instance from a StackModel.
Args:
stack_model: The StackModel to create the Stack from.
Returns:
The created Stack instance.
"""
from zenml.stack import StackComponent
stack_components = {
type_: StackComponent.from_model(model[0])
for type_, model in stack_model.components.items()
}
return Stack.from_components(
id=stack_model.id,
name=stack_model.name,
components=stack_components,
)
@classmethod
def from_components(
cls,
id: UUID,
name: str,
components: Dict[StackComponentType, "StackComponent"],
) -> "Stack":
"""Creates a stack instance from a dict of stack components.
# noqa: DAR402
Args:
id: Unique ID of the stack.
name: The name of the stack.
components: The components of the stack.
Returns:
A stack instance consisting of the given components.
Raises:
TypeError: If a required component is missing or a component
doesn't inherit from the expected base class.
"""
from zenml.alerter import BaseAlerter
from zenml.annotators import BaseAnnotator
from zenml.artifact_stores import BaseArtifactStore
from zenml.container_registries import BaseContainerRegistry
from zenml.data_validators import BaseDataValidator
from zenml.experiment_trackers import BaseExperimentTracker
from zenml.feature_stores import BaseFeatureStore
from zenml.model_deployers import BaseModelDeployer
from zenml.orchestrators import BaseOrchestrator
from zenml.secrets_managers import BaseSecretsManager
from zenml.step_operators import BaseStepOperator
def _raise_type_error(
component: Optional["StackComponent"], expected_class: Type[Any]
) -> NoReturn:
"""Raises a TypeError that the component has an unexpected type.
Args:
component: The component that has an unexpected type.
expected_class: The expected type of the component.
Raises:
TypeError: If the component has an unexpected type.
"""
raise TypeError(
f"Unable to create stack: Wrong stack component type "
f"`{component.__class__.__name__}` (expected: subclass "
f"of `{expected_class.__name__}`)"
)
orchestrator = components.get(StackComponentType.ORCHESTRATOR)
if not isinstance(orchestrator, BaseOrchestrator):
_raise_type_error(orchestrator, BaseOrchestrator)
artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
if not isinstance(artifact_store, BaseArtifactStore):
_raise_type_error(artifact_store, BaseArtifactStore)
container_registry = components.get(
StackComponentType.CONTAINER_REGISTRY
)
if container_registry is not None and not isinstance(
container_registry, BaseContainerRegistry
):
_raise_type_error(container_registry, BaseContainerRegistry)
secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
if secrets_manager is not None and not isinstance(
secrets_manager, BaseSecretsManager
):
_raise_type_error(secrets_manager, BaseSecretsManager)
step_operator = components.get(StackComponentType.STEP_OPERATOR)
if step_operator is not None and not isinstance(
step_operator, BaseStepOperator
):
_raise_type_error(step_operator, BaseStepOperator)
feature_store = components.get(StackComponentType.FEATURE_STORE)
if feature_store is not None and not isinstance(
feature_store, BaseFeatureStore
):
_raise_type_error(feature_store, BaseFeatureStore)
model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
if model_deployer is not None and not isinstance(
model_deployer, BaseModelDeployer
):
_raise_type_error(model_deployer, BaseModelDeployer)
experiment_tracker = components.get(
StackComponentType.EXPERIMENT_TRACKER
)
if experiment_tracker is not None and not isinstance(
experiment_tracker, BaseExperimentTracker
):
_raise_type_error(experiment_tracker, BaseExperimentTracker)
alerter = components.get(StackComponentType.ALERTER)
if alerter is not None and not isinstance(alerter, BaseAlerter):
_raise_type_error(alerter, BaseAlerter)
annotator = components.get(StackComponentType.ANNOTATOR)
if annotator is not None and not isinstance(annotator, BaseAnnotator):
_raise_type_error(annotator, BaseAnnotator)
data_validator = components.get(StackComponentType.DATA_VALIDATOR)
if data_validator is not None and not isinstance(
data_validator, BaseDataValidator
):
_raise_type_error(data_validator, BaseDataValidator)
return Stack(
id=id,
name=name,
orchestrator=orchestrator,
artifact_store=artifact_store,
container_registry=container_registry,
secrets_manager=secrets_manager,
step_operator=step_operator,
feature_store=feature_store,
model_deployer=model_deployer,
experiment_tracker=experiment_tracker,
alerter=alerter,
annotator=annotator,
data_validator=data_validator,
)
@property
def components(self) -> Dict[StackComponentType, "StackComponent"]:
"""All components of the stack.
Returns:
A dictionary of all components of the stack.
"""
return {
component.type: component
for component in [
self.orchestrator,
self.artifact_store,
self.container_registry,
self.secrets_manager,
self.step_operator,
self.feature_store,
self.model_deployer,
self.experiment_tracker,
self.alerter,
self.annotator,
self.data_validator,
]
if component is not None
}
@property
def id(self) -> UUID:
"""The ID of the stack.
Returns:
The ID of the stack.
"""
return self._id
@property
def name(self) -> str:
"""The name of the stack.
Returns:
str: The name of the stack.
"""
return self._name
@property
def orchestrator(self) -> "BaseOrchestrator":
"""The orchestrator of the stack.
Returns:
The orchestrator of the stack.
"""
return self._orchestrator
@property
def artifact_store(self) -> "BaseArtifactStore":
"""The artifact store of the stack.
Returns:
The artifact store of the stack.
"""
return self._artifact_store
@property
def container_registry(self) -> Optional["BaseContainerRegistry"]:
"""The container registry of the stack.
Returns:
The container registry of the stack or None if the stack does not
have a container registry.
"""
return self._container_registry
@property
def secrets_manager(self) -> Optional["BaseSecretsManager"]:
"""The secrets manager of the stack.
Returns:
The secrets manager of the stack.
"""
return self._secrets_manager
@property
def step_operator(self) -> Optional["BaseStepOperator"]:
"""The step operator of the stack.
Returns:
The step operator of the stack.
"""
return self._step_operator
@property
def feature_store(self) -> Optional["BaseFeatureStore"]:
"""The feature store of the stack.
Returns:
The feature store of the stack.
"""
return self._feature_store
@property
def model_deployer(self) -> Optional["BaseModelDeployer"]:
"""The model deployer of the stack.
Returns:
The model deployer of the stack.
"""
return self._model_deployer
@property
def experiment_tracker(self) -> Optional["BaseExperimentTracker"]:
"""The experiment tracker of the stack.
Returns:
The experiment tracker of the stack.
"""
return self._experiment_tracker
@property
def alerter(self) -> Optional["BaseAlerter"]:
"""The alerter of the stack.
Returns:
The alerter of the stack.
"""
return self._alerter
@property
def annotator(self) -> Optional["BaseAnnotator"]:
"""The annotator of the stack.
Returns:
The annotator of the stack.
"""
return self._annotator
@property
def data_validator(self) -> Optional["BaseDataValidator"]:
"""The data validator of the stack.
Returns:
The data validator of the stack.
"""
return self._data_validator
def dict(self) -> Dict[str, str]:
"""Converts the stack into a dictionary.
Returns:
A dictionary containing the stack components.
"""
component_dict = {
component_type.value: component.config.json(sort_keys=True)
for component_type, component in self.components.items()
}
component_dict.update({"name": self.name})
return component_dict
def requirements(
self,
exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
"""Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in `exclude_components`).
Args:
exclude_components: Set of component types for which the
requirements should not be included in the output.
Returns:
Set of PyPI requirements.
"""
exclude_components = exclude_components or set()
requirements = [
component.requirements
for component in self.components.values()
if component.type not in exclude_components
]
return set.union(*requirements) if requirements else set()
@property
def apt_packages(self) -> List[str]:
"""List of APT package requirements for the stack.
Returns:
A list of APT package requirements for the stack.
"""
return [
package
for component in self.components.values()
for package in component.apt_packages
]
def check_local_paths(self) -> bool:
"""Checks if the stack has local paths.
Returns:
True if the stack has local paths, False otherwise.
Raises:
ValueError: If the stack has local paths that do not conform to
the convention that all local path must be relative to the
local stores directory.
"""
from zenml.config.global_config import GlobalConfiguration
local_stores_path = GlobalConfiguration().local_stores_path
# go through all stack components and identify those that advertise
# a local path where they persist information that they need to be
# available when running pipelines.
has_local_paths = False
for stack_comp in self.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
# double-check this convention, just in case it wasn't respected
# as documented in `StackComponent.local_path`
if not local_path.startswith(local_stores_path):
raise ValueError(
f"Local path {local_path} for component "
f"{stack_comp.name} is not in the local stores "
f"directory ({local_stores_path})."
)
has_local_paths = True
return has_local_paths
@property
def required_secrets(self) -> Set["secret_utils.SecretReference"]:
"""All required secrets for this stack.
Returns:
The required secrets of this stack.
"""
secrets = [
component.config.required_secrets
for component in self.components.values()
]
return set.union(*secrets) if secrets else set()
@property
def setting_classes(self) -> Dict[str, Type["BaseSettings"]]:
"""Setting classes of all components of this stack.
Returns:
All setting classes and their respective keys.
"""
setting_classes = {}
for component in self.components.values():
if component.settings_class:
key = settings_utils.get_stack_component_setting_key(component)
setting_classes[key] = component.settings_class
return setting_classes
@property
def requires_remote_server(self) -> bool:
"""If the stack requires a remote ZenServer to run.
This is the case if any code is getting executed remotely. This is the
case for both remote orchestrators as well as remote step operators.
Returns:
If the stack requires a remote ZenServer to run.
"""
return self.orchestrator.config.is_remote or (
self.step_operator is not None
and self.step_operator.config.is_remote
)
def _validate_secrets(self, raise_exception: bool) -> None:
"""Validates that all secrets of the stack exists.
Args:
raise_exception: If `True`, raises an exception if the stack has
no secrets manager or a secret is missing. Otherwise a
warning is logged.
# noqa: DAR402
Raises:
StackValidationError: If the stack has no secrets manager or a
secret is missing.
"""
env_value = os.getenv(
ENV_ZENML_SECRET_VALIDATION_LEVEL,
default=SecretValidationLevel.SECRET_AND_KEY_EXISTS.value,
)
secret_validation_level = SecretValidationLevel(env_value)
required_secrets = self.required_secrets
if (
secret_validation_level != SecretValidationLevel.NONE
and required_secrets
):
def _handle_error(message: str) -> None:
"""Handles the error by raising an exception or logging.
Args:
message: The error message.
Raises:
StackValidationError: If called and `raise_exception` of
the outer method is `True`.
"""
if raise_exception:
raise StackValidationError(message)
else:
message += (
"\nYou need to solve this issue before running "
"a pipeline on this stack."
)
logger.warning(message)
if not self.secrets_manager:
_handle_error(
f"Some component in stack `{self.name}` reference secret "
"values, but there is no secrets manager in this stack."
)
return
missing = []
existing_secrets = set(self.secrets_manager.get_all_secret_keys())
for secret_ref in required_secrets:
if (
secret_validation_level
== SecretValidationLevel.SECRET_AND_KEY_EXISTS
):
try:
_ = self.secrets_manager.get_secret(
secret_ref.name
).content[secret_ref.key]
except KeyError:
missing.append(secret_ref)
elif (
secret_validation_level
== SecretValidationLevel.SECRET_EXISTS
):
if secret_ref.name not in existing_secrets:
missing.append(secret_ref)
if missing:
_handle_error(
f"Missing secrets for stack: {missing}.\nTo register the "
"missing secrets for this stack, run `zenml stack "
f"register-secrets {self.name}`\nIf you want to "
"adjust the degree to which ZenML validates the existence "
"of secrets in your stack, you can do so by setting the "
f"environment variable {ENV_ZENML_SECRET_VALIDATION_LEVEL} "
"to one of the following values: "
f"{SecretValidationLevel.values()}."
)
def validate(
self,
fail_if_secrets_missing: bool = False,
) -> None:
"""Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- the `StackValidator` of each stack component has to validate the
stack to make sure all the components are compatible with each other
- the required secrets of all components need to exist
Args:
fail_if_secrets_missing: If this is `True`, an error will be raised
if a secret for a component is missing. Otherwise, only a
warning will be logged.
"""
for component in self.components.values():
if component.validator:
component.validator.validate(stack=self)
self._validate_secrets(raise_exception=fail_if_secrets_missing)
def prepare_pipeline_deployment(
self, deployment: "PipelineDeployment"
) -> None:
"""Prepares the stack for a pipeline deployment.
This method is called before a pipeline is deployed.
Args:
deployment: The pipeline deployment
Raises:
StackValidationError: If the stack component is not running.
RuntimeError: If trying to deploy a pipeline that requires a remote
ZenML server with a local one.
"""
self.validate(fail_if_secrets_missing=True)
for component in self.components.values():
if not component.is_running:
raise StackValidationError(
f"The '{component.name}' {component.type} stack component "
f"is not currently running. Please run the following "
f"command to provision and start the component:\n\n"
f" `zenml stack up`\n"
)
if self.requires_remote_server and Client().zen_store.is_local_store():
raise RuntimeError(
"Stacks with remote components such as remote orchestrators "
"and step operators require a remote "
"ZenML server. To run a pipeline with this stack you need to "
"connect to a remote ZenML server first. Check out "
"https://docs.zenml.io/getting-started/deploying-zenml for "
"more information on how to deploy ZenML."
)
for component in self.components.values():
component.prepare_pipeline_deployment(
deployment=deployment, stack=self
)
def deploy_pipeline(self, deployment: "PipelineDeployment") -> Any:
"""Deploys a pipeline on this stack.
Args:
deployment: The pipeline deployment.
Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
"""
return self.orchestrator.run(deployment=deployment, stack=self)
def _get_active_components_for_step(
self, step_config: "StepConfiguration"
) -> Dict[StackComponentType, "StackComponent"]:
"""Gets all the active stack components for a stack.
Args:
step_config: Configuration of the step for which to get the active
components.
Returns:
Dictionary of active stack components.
"""
def _is_active(component: "StackComponent") -> bool:
"""Checks whether a stack component is actively used in the step.
Args:
component: The component to check.
Returns:
If the component is used in this step.
"""
if component.type == StackComponentType.STEP_OPERATOR:
return component.name == step_config.step_operator
if component.type == StackComponentType.EXPERIMENT_TRACKER:
return component.name == step_config.experiment_tracker
return True
return {
component_type: component
for component_type, component in self.components.items()
if _is_active(component)
}
def prepare_step_run(self, info: "StepRunInfo") -> None:
"""Prepares running a step.
Args:
info: Info about the step that will be executed.
"""
for component in self._get_active_components_for_step(
info.config
).values():
component.prepare_step_run(info=info)
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
"""Cleans up resources after the step run is finished.
Args:
info: Info about the step that was executed.
step_failed: Whether the step failed.
"""
for component in self._get_active_components_for_step(
info.config
).values():
component.cleanup_step_run(info=info, step_failed=step_failed)
@property
def is_provisioned(self) -> bool:
"""If the stack provisioned resources to run locally.
Returns:
True if the stack provisioned resources to run locally.
"""
return all(
component.is_provisioned for component in self.components.values()
)
@property
def is_running(self) -> bool:
"""If the stack is running locally.
Returns:
True if the stack is running locally, False otherwise.
"""
return all(
component.is_running for component in self.components.values()
)
def provision(self) -> None:
"""Provisions resources to run the stack locally."""
self.validate(fail_if_secrets_missing=True)
logger.info("Provisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if not component.is_provisioned:
component.provision()
logger.info("Provisioned resources for %s.", component)
def deprovision(self) -> None:
"""Deprovisions all local resources of the stack."""
logger.info("Deprovisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if component.is_provisioned:
try:
component.deprovision()
logger.info("Deprovisioned resources for %s.", component)
except NotImplementedError as e:
logger.warning(e)
def resume(self) -> None:
"""Resumes the provisioned local resources of the stack.
Raises:
ProvisioningError: If any stack component is missing provisioned
resources.
"""
logger.info("Resuming provisioned resources for stack %s.", self.name)
for component in self.components.values():
if component.is_running:
# the component is already running, no need to resume anything
pass
elif component.is_provisioned:
component.resume()
logger.info("Resumed resources for %s.", component)
else:
raise ProvisioningError(
f"Unable to resume resources for {component}: No "
f"resources have been provisioned for this component."
)
def suspend(self) -> None:
"""Suspends the provisioned local resources of the stack."""
logger.info(
"Suspending provisioned resources for stack '%s'.", self.name
)
for component in self.components.values():
if not component.is_suspended:
try:
component.suspend()
logger.info("Suspended resources for %s.", component)
except NotImplementedError:
logger.warning(
"Suspending provisioned resources not implemented "
"for %s. Continuing without suspending resources...",
component,
)
alerter: Optional[BaseAlerter]
property
readonly
The alerter of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseAlerter] |
The alerter of the stack. |
annotator: Optional[BaseAnnotator]
property
readonly
The annotator of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseAnnotator] |
The annotator of the stack. |
apt_packages: List[str]
property
readonly
List of APT package requirements for the stack.
Returns:
Type | Description |
---|---|
List[str] |
A list of APT package requirements for the stack. |
artifact_store: BaseArtifactStore
property
readonly
The artifact store of the stack.
Returns:
Type | Description |
---|---|
BaseArtifactStore |
The artifact store of the stack. |
components: Dict[zenml.enums.StackComponentType, StackComponent]
property
readonly
All components of the stack.
Returns:
Type | Description |
---|---|
Dict[zenml.enums.StackComponentType, StackComponent] |
A dictionary of all components of the stack. |
container_registry: Optional[BaseContainerRegistry]
property
readonly
The container registry of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseContainerRegistry] |
The container registry of the stack or None if the stack does not have a container registry. |
data_validator: Optional[BaseDataValidator]
property
readonly
The data validator of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseDataValidator] |
The data validator of the stack. |
experiment_tracker: Optional[BaseExperimentTracker]
property
readonly
The experiment tracker of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseExperimentTracker] |
The experiment tracker of the stack. |
feature_store: Optional[BaseFeatureStore]
property
readonly
The feature store of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseFeatureStore] |
The feature store of the stack. |
id: UUID
property
readonly
The ID of the stack.
Returns:
Type | Description |
---|---|
UUID |
The ID of the stack. |
is_provisioned: bool
property
readonly
If the stack provisioned resources to run locally.
Returns:
Type | Description |
---|---|
bool |
True if the stack provisioned resources to run locally. |
is_running: bool
property
readonly
If the stack is running locally.
Returns:
Type | Description |
---|---|
bool |
True if the stack is running locally, False otherwise. |
model_deployer: Optional[BaseModelDeployer]
property
readonly
The model deployer of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseModelDeployer] |
The model deployer of the stack. |
name: str
property
readonly
The name of the stack.
Returns:
Type | Description |
---|---|
str |
The name of the stack. |
orchestrator: BaseOrchestrator
property
readonly
The orchestrator of the stack.
Returns:
Type | Description |
---|---|
BaseOrchestrator |
The orchestrator of the stack. |
required_secrets: Set[secret_utils.SecretReference]
property
readonly
All required secrets for this stack.
Returns:
Type | Description |
---|---|
Set[secret_utils.SecretReference] |
The required secrets of this stack. |
requires_remote_server: bool
property
readonly
If the stack requires a remote ZenServer to run.
This is the case if any code is getting executed remotely. This is the case for both remote orchestrators as well as remote step operators.
Returns:
Type | Description |
---|---|
bool |
If the stack requires a remote ZenServer to run. |
secrets_manager: Optional[BaseSecretsManager]
property
readonly
The secrets manager of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseSecretsManager] |
The secrets manager of the stack. |
setting_classes: Dict[str, Type[BaseSettings]]
property
readonly
Setting classes of all components of this stack.
Returns:
Type | Description |
---|---|
Dict[str, Type[BaseSettings]] |
All setting classes and their respective keys. |
step_operator: Optional[BaseStepOperator]
property
readonly
The step operator of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseStepOperator] |
The step operator of the stack. |
__init__(self, id, name, *, orchestrator, artifact_store, container_registry=None, secrets_manager=None, step_operator=None, feature_store=None, model_deployer=None, experiment_tracker=None, alerter=None, annotator=None, data_validator=None)
special
Initializes and validates a stack instance.
noqa: DAR402
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
Unique ID of the stack. |
required |
name |
str |
Name of the stack. |
required |
orchestrator |
BaseOrchestrator |
Orchestrator component of the stack. |
required |
artifact_store |
BaseArtifactStore |
Artifact store component of the stack. |
required |
container_registry |
Optional[BaseContainerRegistry] |
Container registry component of the stack. |
None |
secrets_manager |
Optional[BaseSecretsManager] |
Secrets manager component of the stack. |
None |
step_operator |
Optional[BaseStepOperator] |
Step operator component of the stack. |
None |
feature_store |
Optional[BaseFeatureStore] |
Feature store component of the stack. |
None |
model_deployer |
Optional[BaseModelDeployer] |
Model deployer component of the stack. |
None |
experiment_tracker |
Optional[BaseExperimentTracker] |
Experiment tracker component of the stack. |
None |
alerter |
Optional[BaseAlerter] |
Alerter component of the stack. |
None |
annotator |
Optional[BaseAnnotator] |
Annotator component of the stack. |
None |
data_validator |
Optional[BaseDataValidator] |
Data validator component of the stack. |
None |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack configuration is not valid. |
Source code in zenml/stack/stack.py
def __init__(
self,
id: UUID,
name: str,
*,
orchestrator: "BaseOrchestrator",
artifact_store: "BaseArtifactStore",
container_registry: Optional["BaseContainerRegistry"] = None,
secrets_manager: Optional["BaseSecretsManager"] = None,
step_operator: Optional["BaseStepOperator"] = None,
feature_store: Optional["BaseFeatureStore"] = None,
model_deployer: Optional["BaseModelDeployer"] = None,
experiment_tracker: Optional["BaseExperimentTracker"] = None,
alerter: Optional["BaseAlerter"] = None,
annotator: Optional["BaseAnnotator"] = None,
data_validator: Optional["BaseDataValidator"] = None,
):
"""Initializes and validates a stack instance.
# noqa: DAR402
Args:
id: Unique ID of the stack.
name: Name of the stack.
orchestrator: Orchestrator component of the stack.
artifact_store: Artifact store component of the stack.
container_registry: Container registry component of the stack.
secrets_manager: Secrets manager component of the stack.
step_operator: Step operator component of the stack.
feature_store: Feature store component of the stack.
model_deployer: Model deployer component of the stack.
experiment_tracker: Experiment tracker component of the stack.
alerter: Alerter component of the stack.
annotator: Annotator component of the stack.
data_validator: Data validator component of the stack.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self._id = id
self._name = name
self._orchestrator = orchestrator
self._artifact_store = artifact_store
self._container_registry = container_registry
self._step_operator = step_operator
self._secrets_manager = secrets_manager
self._feature_store = feature_store
self._model_deployer = model_deployer
self._experiment_tracker = experiment_tracker
self._alerter = alerter
self._annotator = annotator
self._data_validator = data_validator
check_local_paths(self)
Checks if the stack has local paths.
Returns:
Type | Description |
---|---|
bool |
True if the stack has local paths, False otherwise. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the stack has local paths that do not conform to the convention that all local path must be relative to the local stores directory. |
Source code in zenml/stack/stack.py
def check_local_paths(self) -> bool:
"""Checks if the stack has local paths.
Returns:
True if the stack has local paths, False otherwise.
Raises:
ValueError: If the stack has local paths that do not conform to
the convention that all local path must be relative to the
local stores directory.
"""
from zenml.config.global_config import GlobalConfiguration
local_stores_path = GlobalConfiguration().local_stores_path
# go through all stack components and identify those that advertise
# a local path where they persist information that they need to be
# available when running pipelines.
has_local_paths = False
for stack_comp in self.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
# double-check this convention, just in case it wasn't respected
# as documented in `StackComponent.local_path`
if not local_path.startswith(local_stores_path):
raise ValueError(
f"Local path {local_path} for component "
f"{stack_comp.name} is not in the local stores "
f"directory ({local_stores_path})."
)
has_local_paths = True
return has_local_paths
cleanup_step_run(self, info, step_failed)
Cleans up resources after the step run is finished.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that was executed. |
required |
step_failed |
bool |
Whether the step failed. |
required |
Source code in zenml/stack/stack.py
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
"""Cleans up resources after the step run is finished.
Args:
info: Info about the step that was executed.
step_failed: Whether the step failed.
"""
for component in self._get_active_components_for_step(
info.config
).values():
component.cleanup_step_run(info=info, step_failed=step_failed)
deploy_pipeline(self, deployment)
Deploys a pipeline on this stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment. |
required |
Returns:
Type | Description |
---|---|
Any |
The return value of the call to |
Source code in zenml/stack/stack.py
def deploy_pipeline(self, deployment: "PipelineDeployment") -> Any:
"""Deploys a pipeline on this stack.
Args:
deployment: The pipeline deployment.
Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
"""
return self.orchestrator.run(deployment=deployment, stack=self)
deprovision(self)
Deprovisions all local resources of the stack.
Source code in zenml/stack/stack.py
def deprovision(self) -> None:
"""Deprovisions all local resources of the stack."""
logger.info("Deprovisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if component.is_provisioned:
try:
component.deprovision()
logger.info("Deprovisioned resources for %s.", component)
except NotImplementedError as e:
logger.warning(e)
dict(self)
Converts the stack into a dictionary.
Returns:
Type | Description |
---|---|
Dict[str, str] |
A dictionary containing the stack components. |
Source code in zenml/stack/stack.py
def dict(self) -> Dict[str, str]:
"""Converts the stack into a dictionary.
Returns:
A dictionary containing the stack components.
"""
component_dict = {
component_type.value: component.config.json(sort_keys=True)
for component_type, component in self.components.items()
}
component_dict.update({"name": self.name})
return component_dict
from_components(id, name, components)
classmethod
Creates a stack instance from a dict of stack components.
noqa: DAR402
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
Unique ID of the stack. |
required |
name |
str |
The name of the stack. |
required |
components |
Dict[zenml.enums.StackComponentType, StackComponent] |
The components of the stack. |
required |
Returns:
Type | Description |
---|---|
Stack |
A stack instance consisting of the given components. |
Exceptions:
Type | Description |
---|---|
TypeError |
If a required component is missing or a component doesn't inherit from the expected base class. |
Source code in zenml/stack/stack.py
@classmethod
def from_components(
cls,
id: UUID,
name: str,
components: Dict[StackComponentType, "StackComponent"],
) -> "Stack":
"""Creates a stack instance from a dict of stack components.
# noqa: DAR402
Args:
id: Unique ID of the stack.
name: The name of the stack.
components: The components of the stack.
Returns:
A stack instance consisting of the given components.
Raises:
TypeError: If a required component is missing or a component
doesn't inherit from the expected base class.
"""
from zenml.alerter import BaseAlerter
from zenml.annotators import BaseAnnotator
from zenml.artifact_stores import BaseArtifactStore
from zenml.container_registries import BaseContainerRegistry
from zenml.data_validators import BaseDataValidator
from zenml.experiment_trackers import BaseExperimentTracker
from zenml.feature_stores import BaseFeatureStore
from zenml.model_deployers import BaseModelDeployer
from zenml.orchestrators import BaseOrchestrator
from zenml.secrets_managers import BaseSecretsManager
from zenml.step_operators import BaseStepOperator
def _raise_type_error(
component: Optional["StackComponent"], expected_class: Type[Any]
) -> NoReturn:
"""Raises a TypeError that the component has an unexpected type.
Args:
component: The component that has an unexpected type.
expected_class: The expected type of the component.
Raises:
TypeError: If the component has an unexpected type.
"""
raise TypeError(
f"Unable to create stack: Wrong stack component type "
f"`{component.__class__.__name__}` (expected: subclass "
f"of `{expected_class.__name__}`)"
)
orchestrator = components.get(StackComponentType.ORCHESTRATOR)
if not isinstance(orchestrator, BaseOrchestrator):
_raise_type_error(orchestrator, BaseOrchestrator)
artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
if not isinstance(artifact_store, BaseArtifactStore):
_raise_type_error(artifact_store, BaseArtifactStore)
container_registry = components.get(
StackComponentType.CONTAINER_REGISTRY
)
if container_registry is not None and not isinstance(
container_registry, BaseContainerRegistry
):
_raise_type_error(container_registry, BaseContainerRegistry)
secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
if secrets_manager is not None and not isinstance(
secrets_manager, BaseSecretsManager
):
_raise_type_error(secrets_manager, BaseSecretsManager)
step_operator = components.get(StackComponentType.STEP_OPERATOR)
if step_operator is not None and not isinstance(
step_operator, BaseStepOperator
):
_raise_type_error(step_operator, BaseStepOperator)
feature_store = components.get(StackComponentType.FEATURE_STORE)
if feature_store is not None and not isinstance(
feature_store, BaseFeatureStore
):
_raise_type_error(feature_store, BaseFeatureStore)
model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
if model_deployer is not None and not isinstance(
model_deployer, BaseModelDeployer
):
_raise_type_error(model_deployer, BaseModelDeployer)
experiment_tracker = components.get(
StackComponentType.EXPERIMENT_TRACKER
)
if experiment_tracker is not None and not isinstance(
experiment_tracker, BaseExperimentTracker
):
_raise_type_error(experiment_tracker, BaseExperimentTracker)
alerter = components.get(StackComponentType.ALERTER)
if alerter is not None and not isinstance(alerter, BaseAlerter):
_raise_type_error(alerter, BaseAlerter)
annotator = components.get(StackComponentType.ANNOTATOR)
if annotator is not None and not isinstance(annotator, BaseAnnotator):
_raise_type_error(annotator, BaseAnnotator)
data_validator = components.get(StackComponentType.DATA_VALIDATOR)
if data_validator is not None and not isinstance(
data_validator, BaseDataValidator
):
_raise_type_error(data_validator, BaseDataValidator)
return Stack(
id=id,
name=name,
orchestrator=orchestrator,
artifact_store=artifact_store,
container_registry=container_registry,
secrets_manager=secrets_manager,
step_operator=step_operator,
feature_store=feature_store,
model_deployer=model_deployer,
experiment_tracker=experiment_tracker,
alerter=alerter,
annotator=annotator,
data_validator=data_validator,
)
from_model(stack_model)
classmethod
Creates a Stack instance from a StackModel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_model |
StackResponseModel |
The StackModel to create the Stack from. |
required |
Returns:
Type | Description |
---|---|
Stack |
The created Stack instance. |
Source code in zenml/stack/stack.py
@classmethod
def from_model(cls, stack_model: StackResponseModel) -> "Stack":
"""Creates a Stack instance from a StackModel.
Args:
stack_model: The StackModel to create the Stack from.
Returns:
The created Stack instance.
"""
from zenml.stack import StackComponent
stack_components = {
type_: StackComponent.from_model(model[0])
for type_, model in stack_model.components.items()
}
return Stack.from_components(
id=stack_model.id,
name=stack_model.name,
components=stack_components,
)
prepare_pipeline_deployment(self, deployment)
Prepares the stack for a pipeline deployment.
This method is called before a pipeline is deployed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment |
required |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack component is not running. |
RuntimeError |
If trying to deploy a pipeline that requires a remote ZenML server with a local one. |
Source code in zenml/stack/stack.py
def prepare_pipeline_deployment(
self, deployment: "PipelineDeployment"
) -> None:
"""Prepares the stack for a pipeline deployment.
This method is called before a pipeline is deployed.
Args:
deployment: The pipeline deployment
Raises:
StackValidationError: If the stack component is not running.
RuntimeError: If trying to deploy a pipeline that requires a remote
ZenML server with a local one.
"""
self.validate(fail_if_secrets_missing=True)
for component in self.components.values():
if not component.is_running:
raise StackValidationError(
f"The '{component.name}' {component.type} stack component "
f"is not currently running. Please run the following "
f"command to provision and start the component:\n\n"
f" `zenml stack up`\n"
)
if self.requires_remote_server and Client().zen_store.is_local_store():
raise RuntimeError(
"Stacks with remote components such as remote orchestrators "
"and step operators require a remote "
"ZenML server. To run a pipeline with this stack you need to "
"connect to a remote ZenML server first. Check out "
"https://docs.zenml.io/getting-started/deploying-zenml for "
"more information on how to deploy ZenML."
)
for component in self.components.values():
component.prepare_pipeline_deployment(
deployment=deployment, stack=self
)
prepare_step_run(self, info)
Prepares running a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that will be executed. |
required |
Source code in zenml/stack/stack.py
def prepare_step_run(self, info: "StepRunInfo") -> None:
"""Prepares running a step.
Args:
info: Info about the step that will be executed.
"""
for component in self._get_active_components_for_step(
info.config
).values():
component.prepare_step_run(info=info)
provision(self)
Provisions resources to run the stack locally.
Source code in zenml/stack/stack.py
def provision(self) -> None:
"""Provisions resources to run the stack locally."""
self.validate(fail_if_secrets_missing=True)
logger.info("Provisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if not component.is_provisioned:
component.provision()
logger.info("Provisioned resources for %s.", component)
requirements(self, exclude_components=None)
Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in exclude_components
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exclude_components |
Optional[AbstractSet[zenml.enums.StackComponentType]] |
Set of component types for which the requirements should not be included in the output. |
None |
Returns:
Type | Description |
---|---|
Set[str] |
Set of PyPI requirements. |
Source code in zenml/stack/stack.py
def requirements(
self,
exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
"""Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in `exclude_components`).
Args:
exclude_components: Set of component types for which the
requirements should not be included in the output.
Returns:
Set of PyPI requirements.
"""
exclude_components = exclude_components or set()
requirements = [
component.requirements
for component in self.components.values()
if component.type not in exclude_components
]
return set.union(*requirements) if requirements else set()
resume(self)
Resumes the provisioned local resources of the stack.
Exceptions:
Type | Description |
---|---|
ProvisioningError |
If any stack component is missing provisioned resources. |
Source code in zenml/stack/stack.py
def resume(self) -> None:
"""Resumes the provisioned local resources of the stack.
Raises:
ProvisioningError: If any stack component is missing provisioned
resources.
"""
logger.info("Resuming provisioned resources for stack %s.", self.name)
for component in self.components.values():
if component.is_running:
# the component is already running, no need to resume anything
pass
elif component.is_provisioned:
component.resume()
logger.info("Resumed resources for %s.", component)
else:
raise ProvisioningError(
f"Unable to resume resources for {component}: No "
f"resources have been provisioned for this component."
)
suspend(self)
Suspends the provisioned local resources of the stack.
Source code in zenml/stack/stack.py
def suspend(self) -> None:
"""Suspends the provisioned local resources of the stack."""
logger.info(
"Suspending provisioned resources for stack '%s'.", self.name
)
for component in self.components.values():
if not component.is_suspended:
try:
component.suspend()
logger.info("Suspended resources for %s.", component)
except NotImplementedError:
logger.warning(
"Suspending provisioned resources not implemented "
"for %s. Continuing without suspending resources...",
component,
)
validate(self, fail_if_secrets_missing=False)
Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- the StackValidator
of each stack component has to validate the
stack to make sure all the components are compatible with each other
- the required secrets of all components need to exist
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fail_if_secrets_missing |
bool |
If this is |
False |
Source code in zenml/stack/stack.py
def validate(
self,
fail_if_secrets_missing: bool = False,
) -> None:
"""Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- the `StackValidator` of each stack component has to validate the
stack to make sure all the components are compatible with each other
- the required secrets of all components need to exist
Args:
fail_if_secrets_missing: If this is `True`, an error will be raised
if a secret for a component is missing. Otherwise, only a
warning will be logged.
"""
for component in self.components.values():
if component.validator:
component.validator.validate(stack=self)
self._validate_secrets(raise_exception=fail_if_secrets_missing)
stack_component
Implementation of the ZenML Stack Component class.
StackComponent
Abstract StackComponent class for all components of a ZenML stack.
Source code in zenml/stack/stack_component.py
class StackComponent:
"""Abstract StackComponent class for all components of a ZenML stack."""
def __init__(
self,
name: str,
id: UUID,
config: StackComponentConfig,
flavor: str,
type: StackComponentType,
user: Optional[UUID],
project: UUID,
created: datetime,
updated: datetime,
*args: Any,
**kwargs: Any,
):
"""Initializes a StackComponent.
Args:
name: The name of the component.
id: The unique ID of the component.
config: The config of the component.
flavor: The flavor of the component.
type: The type of the component.
user: The ID of the user who created the component.
project: The ID of the project the component belongs to.
created: The creation time of the component.
updated: The last update time of the component.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Raises:
ValueError: If a secret reference is passed as name.
"""
if secret_utils.is_secret_reference(name):
raise ValueError(
"Passing the `name` attribute of a stack component as a "
"secret reference is not allowed."
)
self.id = id
self.name = name
self._config = config
self.flavor = flavor
self.type = type
self.user = user
self.project = project
self.created = created
self.updated = updated
@classmethod
def from_model(
cls, component_model: "ComponentResponseModel"
) -> "StackComponent":
"""Creates a StackComponent from a ComponentModel.
Args:
component_model: The ComponentModel to create the StackComponent
Returns:
The created StackComponent.
Raises:
ImportError: If the flavor can't be imported.
"""
from zenml.client import Client
flavor_model = Client().get_flavor_by_name_and_type(
name=component_model.flavor,
component_type=component_model.type,
)
try:
from zenml.stack import Flavor
flavor = Flavor.from_model(flavor_model)
except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
raise ImportError(
f"Couldn't import flavor {flavor_model.name}: {err}"
)
configuration = flavor.config_class(**component_model.configuration)
if component_model.user is not None:
user_id = component_model.user.id
else:
user_id = None
return flavor.implementation_class(
user=user_id,
project=component_model.project.id,
name=component_model.name,
id=component_model.id,
config=configuration,
flavor=component_model.flavor,
type=component_model.type,
created=component_model.created,
updated=component_model.updated,
)
@property
def config(self) -> StackComponentConfig:
"""Returns the configuration of the stack component.
This should be overwritten by any subclasses that define custom configs
to return the correct config class.
Returns:
The configuration of the stack component.
"""
return self._config
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Class specifying available settings for this component.
Returns:
Optional settings class.
"""
return None
def get_settings(
self, container: Union["Step", "StepRunInfo", "PipelineDeployment"]
) -> "BaseSettings":
"""Gets settings for this stack component.
This will return `None` if the stack component doesn't specify a
settings class or the container doesn't contain runtime
options for this component.
Args:
container: The `Step`, `StepRunInfo` or `PipelineDeployment` from
which to get the settings.
Returns:
Settings for this stack component.
Raises:
RuntimeError: If the stack component does not specify a settings
class.
"""
if not self.settings_class:
raise RuntimeError(
f"Unable to get settings for component {self} because this "
"component does not have an associated settings class. "
"Return a settings class from the `@settings_class` property "
"and try again."
)
key = settings_utils.get_stack_component_setting_key(self)
all_settings = (
container.config.settings
if isinstance(container, (Step, StepRunInfo))
else container.pipeline.settings
)
if key in all_settings:
return self.settings_class.parse_obj(all_settings[key])
else:
return self.settings_class()
@property
def log_file(self) -> Optional[str]:
"""Optional path to a log file for the stack component.
Returns:
Optional path to a log file for the stack component.
"""
# TODO [ENG-136]: Add support for multiple log files for a stack
# component. E.g. let each component return a generator that yields
# logs instead of specifying a single file path.
return None
@property
def requirements(self) -> Set[str]:
"""Set of PyPI requirements for the component.
Returns:
A set of PyPI requirements for the component.
"""
from zenml.integrations.utils import get_requirements_for_module
return set(get_requirements_for_module(self.__module__))
@property
def apt_packages(self) -> List[str]:
"""List of APT package requirements for the component.
Returns:
A list of APT package requirements for the component.
"""
from zenml.integrations.utils import get_integration_for_module
integration = get_integration_for_module(self.__module__)
return integration.APT_PACKAGES if integration else []
@property
def local_path(self) -> Optional[str]:
"""Path to a local directory to store persistent information.
This property should only be implemented by components that need to
store persistent information in a directory on the local machine and
also need that information to be available during pipeline runs.
IMPORTANT: the path returned by this property must always be a path
that is relative to the ZenML local store's directory. The local
orchestrators rely on this convention to correctly mount the
local folders in the containers. This is an example of a valid
path:
```python
from zenml.config.global_config import GlobalConfiguration
...
@property
def local_path(self) -> Optional[str]:
return os.path.join(
GlobalConfiguration().local_stores_path,
str(self.uuid),
)
```
Returns:
A path to a local directory used by the component to store
persistent information.
"""
return None
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed.
Subclasses should override it if they require runtime configuration
options or if they need to run code before the pipeline deployment.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
def prepare_step_run(self, info: "StepRunInfo") -> None:
"""Prepares running a step.
Args:
info: Info about the step that will be executed.
"""
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
"""Cleans up resources after the step run is finished.
Args:
info: Info about the step that was executed.
step_failed: Whether the step failed.
"""
@property
def post_registration_message(self) -> Optional[str]:
"""Optional message printed after the stack component is registered.
Returns:
An optional message.
"""
return None
@property
def validator(self) -> Optional["StackValidator"]:
"""The optional validator of the stack component.
This validator will be called each time a stack with the stack
component is initialized. Subclasses should override this property
and return a `StackValidator` that makes sure they're not included in
any stack that they're not compatible with.
Returns:
An optional `StackValidator` instance.
"""
return None
@property
def is_provisioned(self) -> bool:
"""If the component provisioned resources to run.
Returns:
True if the component provisioned resources to run.
"""
return True
@property
def is_running(self) -> bool:
"""If the component is running.
Returns:
True if the component is running.
"""
return True
@property
def is_suspended(self) -> bool:
"""If the component is suspended.
Returns:
True if the component is suspended.
"""
return not self.is_running
def provision(self) -> None:
"""Provisions resources to run the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Provisioning resources not implemented for {self}."
)
def deprovision(self) -> None:
"""Deprovisions all resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Deprovisioning resource not implemented for {self}."
)
def resume(self) -> None:
"""Resumes the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Resuming provisioned resources not implemented for {self}."
)
def suspend(self) -> None:
"""Suspends the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Suspending provisioned resources not implemented for {self}."
)
def __repr__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
attribute_representation = ", ".join(
f"{key}={value}" for key, value in self.config.dict().items()
)
return (
f"{self.__class__.__qualname__}(type={self.type}, "
f"flavor={self.flavor}, {attribute_representation})"
)
def __str__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
return self.__repr__()
apt_packages: List[str]
property
readonly
List of APT package requirements for the component.
Returns:
Type | Description |
---|---|
List[str] |
A list of APT package requirements for the component. |
config: StackComponentConfig
property
readonly
Returns the configuration of the stack component.
This should be overwritten by any subclasses that define custom configs to return the correct config class.
Returns:
Type | Description |
---|---|
StackComponentConfig |
The configuration of the stack component. |
is_provisioned: bool
property
readonly
If the component provisioned resources to run.
Returns:
Type | Description |
---|---|
bool |
True if the component provisioned resources to run. |
is_running: bool
property
readonly
If the component is running.
Returns:
Type | Description |
---|---|
bool |
True if the component is running. |
is_suspended: bool
property
readonly
If the component is suspended.
Returns:
Type | Description |
---|---|
bool |
True if the component is suspended. |
local_path: Optional[str]
property
readonly
Path to a local directory to store persistent information.
This property should only be implemented by components that need to store persistent information in a directory on the local machine and also need that information to be available during pipeline runs.
IMPORTANT: the path returned by this property must always be a path that is relative to the ZenML local store's directory. The local orchestrators rely on this convention to correctly mount the local folders in the containers. This is an example of a valid path:
from zenml.config.global_config import GlobalConfiguration
...
@property
def local_path(self) -> Optional[str]:
return os.path.join(
GlobalConfiguration().local_stores_path,
str(self.uuid),
)
Returns:
Type | Description |
---|---|
Optional[str] |
A path to a local directory used by the component to store persistent information. |
log_file: Optional[str]
property
readonly
Optional path to a log file for the stack component.
Returns:
Type | Description |
---|---|
Optional[str] |
Optional path to a log file for the stack component. |
post_registration_message: Optional[str]
property
readonly
Optional message printed after the stack component is registered.
Returns:
Type | Description |
---|---|
Optional[str] |
An optional message. |
requirements: Set[str]
property
readonly
Set of PyPI requirements for the component.
Returns:
Type | Description |
---|---|
Set[str] |
A set of PyPI requirements for the component. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Class specifying available settings for this component.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
Optional settings class. |
validator: Optional[StackValidator]
property
readonly
The optional validator of the stack component.
This validator will be called each time a stack with the stack
component is initialized. Subclasses should override this property
and return a StackValidator
that makes sure they're not included in
any stack that they're not compatible with.
Returns:
Type | Description |
---|---|
Optional[StackValidator] |
An optional |
__init__(self, name, id, config, flavor, type, user, project, created, updated, *args, **kwargs)
special
Initializes a StackComponent.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the component. |
required |
id |
UUID |
The unique ID of the component. |
required |
config |
StackComponentConfig |
The config of the component. |
required |
flavor |
str |
The flavor of the component. |
required |
type |
StackComponentType |
The type of the component. |
required |
user |
Optional[uuid.UUID] |
The ID of the user who created the component. |
required |
project |
UUID |
The ID of the project the component belongs to. |
required |
created |
datetime |
The creation time of the component. |
required |
updated |
datetime |
The last update time of the component. |
required |
*args |
Any |
Additional positional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Exceptions:
Type | Description |
---|---|
ValueError |
If a secret reference is passed as name. |
Source code in zenml/stack/stack_component.py
def __init__(
self,
name: str,
id: UUID,
config: StackComponentConfig,
flavor: str,
type: StackComponentType,
user: Optional[UUID],
project: UUID,
created: datetime,
updated: datetime,
*args: Any,
**kwargs: Any,
):
"""Initializes a StackComponent.
Args:
name: The name of the component.
id: The unique ID of the component.
config: The config of the component.
flavor: The flavor of the component.
type: The type of the component.
user: The ID of the user who created the component.
project: The ID of the project the component belongs to.
created: The creation time of the component.
updated: The last update time of the component.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Raises:
ValueError: If a secret reference is passed as name.
"""
if secret_utils.is_secret_reference(name):
raise ValueError(
"Passing the `name` attribute of a stack component as a "
"secret reference is not allowed."
)
self.id = id
self.name = name
self._config = config
self.flavor = flavor
self.type = type
self.user = user
self.project = project
self.created = created
self.updated = updated
__repr__(self)
special
String representation of the stack component.
Returns:
Type | Description |
---|---|
str |
A string representation of the stack component. |
Source code in zenml/stack/stack_component.py
def __repr__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
attribute_representation = ", ".join(
f"{key}={value}" for key, value in self.config.dict().items()
)
return (
f"{self.__class__.__qualname__}(type={self.type}, "
f"flavor={self.flavor}, {attribute_representation})"
)
__str__(self)
special
String representation of the stack component.
Returns:
Type | Description |
---|---|
str |
A string representation of the stack component. |
Source code in zenml/stack/stack_component.py
def __str__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
return self.__repr__()
cleanup_step_run(self, info, step_failed)
Cleans up resources after the step run is finished.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that was executed. |
required |
step_failed |
bool |
Whether the step failed. |
required |
Source code in zenml/stack/stack_component.py
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
"""Cleans up resources after the step run is finished.
Args:
info: Info about the step that was executed.
step_failed: Whether the step failed.
"""
deprovision(self)
Deprovisions all resources of the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def deprovision(self) -> None:
"""Deprovisions all resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Deprovisioning resource not implemented for {self}."
)
from_model(component_model)
classmethod
Creates a StackComponent from a ComponentModel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_model |
ComponentResponseModel |
The ComponentModel to create the StackComponent |
required |
Returns:
Type | Description |
---|---|
StackComponent |
The created StackComponent. |
Exceptions:
Type | Description |
---|---|
ImportError |
If the flavor can't be imported. |
Source code in zenml/stack/stack_component.py
@classmethod
def from_model(
cls, component_model: "ComponentResponseModel"
) -> "StackComponent":
"""Creates a StackComponent from a ComponentModel.
Args:
component_model: The ComponentModel to create the StackComponent
Returns:
The created StackComponent.
Raises:
ImportError: If the flavor can't be imported.
"""
from zenml.client import Client
flavor_model = Client().get_flavor_by_name_and_type(
name=component_model.flavor,
component_type=component_model.type,
)
try:
from zenml.stack import Flavor
flavor = Flavor.from_model(flavor_model)
except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
raise ImportError(
f"Couldn't import flavor {flavor_model.name}: {err}"
)
configuration = flavor.config_class(**component_model.configuration)
if component_model.user is not None:
user_id = component_model.user.id
else:
user_id = None
return flavor.implementation_class(
user=user_id,
project=component_model.project.id,
name=component_model.name,
id=component_model.id,
config=configuration,
flavor=component_model.flavor,
type=component_model.type,
created=component_model.created,
updated=component_model.updated,
)
get_settings(self, container)
Gets settings for this stack component.
This will return None
if the stack component doesn't specify a
settings class or the container doesn't contain runtime
options for this component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
container |
Union[Step, StepRunInfo, PipelineDeployment] |
The |
required |
Returns:
Type | Description |
---|---|
BaseSettings |
Settings for this stack component. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the stack component does not specify a settings class. |
Source code in zenml/stack/stack_component.py
def get_settings(
self, container: Union["Step", "StepRunInfo", "PipelineDeployment"]
) -> "BaseSettings":
"""Gets settings for this stack component.
This will return `None` if the stack component doesn't specify a
settings class or the container doesn't contain runtime
options for this component.
Args:
container: The `Step`, `StepRunInfo` or `PipelineDeployment` from
which to get the settings.
Returns:
Settings for this stack component.
Raises:
RuntimeError: If the stack component does not specify a settings
class.
"""
if not self.settings_class:
raise RuntimeError(
f"Unable to get settings for component {self} because this "
"component does not have an associated settings class. "
"Return a settings class from the `@settings_class` property "
"and try again."
)
key = settings_utils.get_stack_component_setting_key(self)
all_settings = (
container.config.settings
if isinstance(container, (Step, StepRunInfo))
else container.pipeline.settings
)
if key in all_settings:
return self.settings_class.parse_obj(all_settings[key])
else:
return self.settings_class()
prepare_pipeline_deployment(self, deployment, stack)
Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed. Subclasses should override it if they require runtime configuration options or if they need to run code before the pipeline deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/stack/stack_component.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed.
Subclasses should override it if they require runtime configuration
options or if they need to run code before the pipeline deployment.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
prepare_step_run(self, info)
Prepares running a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that will be executed. |
required |
Source code in zenml/stack/stack_component.py
def prepare_step_run(self, info: "StepRunInfo") -> None:
"""Prepares running a step.
Args:
info: Info about the step that will be executed.
"""
provision(self)
Provisions resources to run the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def provision(self) -> None:
"""Provisions resources to run the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Provisioning resources not implemented for {self}."
)
resume(self)
Resumes the provisioned resources of the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def resume(self) -> None:
"""Resumes the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Resuming provisioned resources not implemented for {self}."
)
suspend(self)
Suspends the provisioned resources of the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def suspend(self) -> None:
"""Suspends the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Suspending provisioned resources not implemented for {self}."
)
StackComponentConfig (BaseModel, ABC)
pydantic-model
Base class for all ZenML stack component configs.
Source code in zenml/stack/stack_component.py
class StackComponentConfig(BaseModel, ABC):
"""Base class for all ZenML stack component configs."""
def __init__(self, **kwargs: Any) -> None:
"""Ensures that secret references don't clash with pydantic validation.
StackComponents allow the specification of all their string attributes
using secret references of the form `{{secret_name.key}}`. This however
is only possible when the stack component does not perform any explicit
validation of this attribute using pydantic validators. If this were
the case, the validation would run on the secret reference and would
fail or in the worst case, modify the secret reference and lead to
unexpected behavior. This method ensures that no attributes that require
custom pydantic validation are set as secret references.
Args:
**kwargs: Arguments to initialize this stack component.
Raises:
ValueError: If an attribute that requires custom pydantic validation
is passed as a secret reference, or if the `name` attribute
was passed as a secret reference.
"""
for key, value in kwargs.items():
try:
field = self.__class__.__fields__[key]
except KeyError:
# Value for a private attribute or non-existing field, this
# will fail during the upcoming pydantic validation
continue
if value is None:
continue
if not secret_utils.is_secret_reference(value):
if secret_utils.is_secret_field(field):
logger.warning(
"You specified a plain-text value for the sensitive "
f"attribute `{key}` for a `{self.__class__.__name__}` "
"stack component. This is currently only a warning, "
"but future versions of ZenML will require you to pass "
"in sensitive information as secrets. Check out the "
"documentation on how to configure your stack "
"components with secrets here: "
"https://docs.zenml.io/advanced-guide/practical/secrets-management"
)
continue
requires_validation = field.pre_validators or field.post_validators
if requires_validation:
raise ValueError(
f"Passing the stack component attribute `{key}` as a "
"secret reference is not allowed as additional validation "
"is required for this attribute."
)
super().__init__(**kwargs)
@property
def required_secrets(self) -> Set[secret_utils.SecretReference]:
"""All required secrets for this stack component.
Returns:
The required secrets of this stack component.
"""
return {
secret_utils.parse_secret_reference(v)
for v in self.dict().values()
if secret_utils.is_secret_reference(v)
}
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
Concrete stack component configuration classes should override this
method to return True if the stack component is running in a remote
location, and it needs to access the ZenML database.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Examples:
* Orchestrators that are running pipelines in the cloud or in a
location other than the local host
* Step Operators that are running steps in the cloud or in a location
other than the local host
Returns:
True if this config is for a remote component, False otherwise.
"""
return False
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Concrete stack component configuration classes should override this
method to return True if the stack component is relying on local
resources or capabilities (e.g. local filesystem, local database or
other services).
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Examples:
* Artifact Stores that store artifacts in the local filesystem
* Orchestrators that are connected to local orchestration runtime
services (e.g. local Kubernetes clusters, Docker containers etc).
Returns:
True if this config is for a local component, False otherwise.
"""
return False
def __custom_getattribute__(self, key: str) -> Any:
"""Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret
reference. In case of a secret reference, this method resolves the
reference and returns the secret value instead.
Args:
key: The key for which to get the attribute value.
Raises:
RuntimeError: If the stack component is not part of the active
stack, or the active stack is missing a secrets manager.
KeyError: If the secret or secret key don't exist.
Returns:
The (potentially resolved) attribute value.
"""
value = super().__getattribute__(key)
if not secret_utils.is_secret_reference(value):
return value
# A stack component can be part of many stacks, and currently a
# secrets manager is associated with a stack. This means we're
# not able to identify the 'correct' secrets manager that the user
# wanted to resolve the secrets in a general way. We therefore
# limit secret resolving to components of the active stack.
if not self._is_part_of_active_stack():
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The stack component is not "
"part of the active stack and therefore can't have it's "
"secret references resolved. If you want to access attributes "
"of this stack component which reference secrets, set a stack "
"which includes both this component and a secrets manager as "
"your active stack: `zenml stack set <STACK_NAME>`."
)
from zenml.client import Client
secrets_manager = Client().active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The active stack does not "
"have a secrets manager."
)
secret_ref = secret_utils.parse_secret_reference(value)
try:
secret = secrets_manager.get_secret(secret_ref.name)
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The secret "
f"{secret_ref.name} does not exist."
)
try:
secret_value = secret.content[secret_ref.key]
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The secret "
f"{secret_ref.name} does not contain a value for key "
f"{secret_ref.key}. Available keys: {set(secret.content)}."
)
return str(secret_value)
def _is_part_of_active_stack(self) -> bool:
"""Checks if this config belongs to a component in the active stack.
Returns:
True if this config belongs to a component in the active stack,
False otherwise.
"""
from zenml.client import Client
for component in Client().active_stack.components.values():
if component.config == self:
return True
return False
if not TYPE_CHECKING:
# When defining __getattribute__, mypy allows accessing non-existent
# attributes without failing
# (see https://github.com/python/mypy/issues/13319).
__getattribute__ = __custom_getattribute__
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# prevent extra attributes during model initialization
extra = Extra.forbid
is_local: bool
property
readonly
Checks if this stack component is running locally.
Concrete stack component configuration classes should override this method to return True if the stack component is relying on local resources or capabilities (e.g. local filesystem, local database or other services).
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Examples:
- Artifact Stores that store artifacts in the local filesystem
- Orchestrators that are connected to local orchestration runtime services (e.g. local Kubernetes clusters, Docker containers etc).
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
Concrete stack component configuration classes should override this method to return True if the stack component is running in a remote location, and it needs to access the ZenML database.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Examples:
- Orchestrators that are running pipelines in the cloud or in a location other than the local host
- Step Operators that are running steps in the cloud or in a location other than the local host
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
required_secrets: Set[zenml.utils.secret_utils.SecretReference]
property
readonly
All required secrets for this stack component.
Returns:
Type | Description |
---|---|
Set[zenml.utils.secret_utils.SecretReference] |
The required secrets of this stack component. |
Config
Pydantic configuration class.
Source code in zenml/stack/stack_component.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# prevent extra attributes during model initialization
extra = Extra.forbid
__custom_getattribute__(self, key)
special
Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key for which to get the attribute value. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the stack component is not part of the active stack, or the active stack is missing a secrets manager. |
KeyError |
If the secret or secret key don't exist. |
Returns:
Type | Description |
---|---|
Any |
The (potentially resolved) attribute value. |
Source code in zenml/stack/stack_component.py
def __custom_getattribute__(self, key: str) -> Any:
"""Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret
reference. In case of a secret reference, this method resolves the
reference and returns the secret value instead.
Args:
key: The key for which to get the attribute value.
Raises:
RuntimeError: If the stack component is not part of the active
stack, or the active stack is missing a secrets manager.
KeyError: If the secret or secret key don't exist.
Returns:
The (potentially resolved) attribute value.
"""
value = super().__getattribute__(key)
if not secret_utils.is_secret_reference(value):
return value
# A stack component can be part of many stacks, and currently a
# secrets manager is associated with a stack. This means we're
# not able to identify the 'correct' secrets manager that the user
# wanted to resolve the secrets in a general way. We therefore
# limit secret resolving to components of the active stack.
if not self._is_part_of_active_stack():
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The stack component is not "
"part of the active stack and therefore can't have it's "
"secret references resolved. If you want to access attributes "
"of this stack component which reference secrets, set a stack "
"which includes both this component and a secrets manager as "
"your active stack: `zenml stack set <STACK_NAME>`."
)
from zenml.client import Client
secrets_manager = Client().active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The active stack does not "
"have a secrets manager."
)
secret_ref = secret_utils.parse_secret_reference(value)
try:
secret = secrets_manager.get_secret(secret_ref.name)
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The secret "
f"{secret_ref.name} does not exist."
)
try:
secret_value = secret.content[secret_ref.key]
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The secret "
f"{secret_ref.name} does not contain a value for key "
f"{secret_ref.key}. Available keys: {set(secret.content)}."
)
return str(secret_value)
__getattribute__(self, key)
special
Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key for which to get the attribute value. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the stack component is not part of the active stack, or the active stack is missing a secrets manager. |
KeyError |
If the secret or secret key don't exist. |
Returns:
Type | Description |
---|---|
Any |
The (potentially resolved) attribute value. |
Source code in zenml/stack/stack_component.py
def __custom_getattribute__(self, key: str) -> Any:
"""Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret
reference. In case of a secret reference, this method resolves the
reference and returns the secret value instead.
Args:
key: The key for which to get the attribute value.
Raises:
RuntimeError: If the stack component is not part of the active
stack, or the active stack is missing a secrets manager.
KeyError: If the secret or secret key don't exist.
Returns:
The (potentially resolved) attribute value.
"""
value = super().__getattribute__(key)
if not secret_utils.is_secret_reference(value):
return value
# A stack component can be part of many stacks, and currently a
# secrets manager is associated with a stack. This means we're
# not able to identify the 'correct' secrets manager that the user
# wanted to resolve the secrets in a general way. We therefore
# limit secret resolving to components of the active stack.
if not self._is_part_of_active_stack():
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The stack component is not "
"part of the active stack and therefore can't have it's "
"secret references resolved. If you want to access attributes "
"of this stack component which reference secrets, set a stack "
"which includes both this component and a secrets manager as "
"your active stack: `zenml stack set <STACK_NAME>`."
)
from zenml.client import Client
secrets_manager = Client().active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The active stack does not "
"have a secrets manager."
)
secret_ref = secret_utils.parse_secret_reference(value)
try:
secret = secrets_manager.get_secret(secret_ref.name)
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The secret "
f"{secret_ref.name} does not exist."
)
try:
secret_value = secret.content[secret_ref.key]
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key} "
f"of stack component `{self}`: The secret "
f"{secret_ref.name} does not contain a value for key "
f"{secret_ref.key}. Available keys: {set(secret.content)}."
)
return str(secret_value)
__init__(self, **kwargs)
special
Ensures that secret references don't clash with pydantic validation.
StackComponents allow the specification of all their string attributes
using secret references of the form {{secret_name.key}}
. This however
is only possible when the stack component does not perform any explicit
validation of this attribute using pydantic validators. If this were
the case, the validation would run on the secret reference and would
fail or in the worst case, modify the secret reference and lead to
unexpected behavior. This method ensures that no attributes that require
custom pydantic validation are set as secret references.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Arguments to initialize this stack component. |
{} |
Exceptions:
Type | Description |
---|---|
ValueError |
If an attribute that requires custom pydantic validation
is passed as a secret reference, or if the |
Source code in zenml/stack/stack_component.py
def __init__(self, **kwargs: Any) -> None:
"""Ensures that secret references don't clash with pydantic validation.
StackComponents allow the specification of all their string attributes
using secret references of the form `{{secret_name.key}}`. This however
is only possible when the stack component does not perform any explicit
validation of this attribute using pydantic validators. If this were
the case, the validation would run on the secret reference and would
fail or in the worst case, modify the secret reference and lead to
unexpected behavior. This method ensures that no attributes that require
custom pydantic validation are set as secret references.
Args:
**kwargs: Arguments to initialize this stack component.
Raises:
ValueError: If an attribute that requires custom pydantic validation
is passed as a secret reference, or if the `name` attribute
was passed as a secret reference.
"""
for key, value in kwargs.items():
try:
field = self.__class__.__fields__[key]
except KeyError:
# Value for a private attribute or non-existing field, this
# will fail during the upcoming pydantic validation
continue
if value is None:
continue
if not secret_utils.is_secret_reference(value):
if secret_utils.is_secret_field(field):
logger.warning(
"You specified a plain-text value for the sensitive "
f"attribute `{key}` for a `{self.__class__.__name__}` "
"stack component. This is currently only a warning, "
"but future versions of ZenML will require you to pass "
"in sensitive information as secrets. Check out the "
"documentation on how to configure your stack "
"components with secrets here: "
"https://docs.zenml.io/advanced-guide/practical/secrets-management"
)
continue
requires_validation = field.pre_validators or field.post_validators
if requires_validation:
raise ValueError(
f"Passing the stack component attribute `{key}` as a "
"secret reference is not allowed as additional validation "
"is required for this attribute."
)
super().__init__(**kwargs)
stack_validator
Implementation of the ZenML Stack Validator.
StackValidator
A StackValidator
is used to validate a stack configuration.
Each StackComponent
can provide a StackValidator
to make sure it is
compatible with all components of the stack. The KubeflowOrchestrator
for example will always require the stack to have a container registry
in order to push the docker images that are required to run a pipeline
in Kubeflow Pipelines.
Source code in zenml/stack/stack_validator.py
class StackValidator:
"""A `StackValidator` is used to validate a stack configuration.
Each `StackComponent` can provide a `StackValidator` to make sure it is
compatible with all components of the stack. The `KubeflowOrchestrator`
for example will always require the stack to have a container registry
in order to push the docker images that are required to run a pipeline
in Kubeflow Pipelines.
"""
def __init__(
self,
required_components: Optional[AbstractSet[StackComponentType]] = None,
custom_validation_function: Optional[
Callable[["Stack"], Tuple[bool, str]]
] = None,
):
"""Initializes a `StackValidator` instance.
Args:
required_components: Optional set of stack components that must
exist in the stack.
custom_validation_function: Optional function that returns whether
a stack is valid and an error message to show if not valid.
"""
self._required_components = required_components or set()
self._custom_validation_function = custom_validation_function
def validate(self, stack: "Stack") -> None:
"""Validates the given stack.
Checks if the stack contains all the required components and passes
the custom validation function of the validator.
Args:
stack: The stack to validate.
Raises:
StackValidationError: If the stack does not meet all the
validation criteria.
"""
missing_components = self._required_components - set(stack.components)
if missing_components:
raise StackValidationError(
f"Missing stack components {missing_components} for "
f"stack: {stack.name}"
)
if self._custom_validation_function:
valid, err_msg = self._custom_validation_function(stack)
if not valid:
raise StackValidationError(
f"Custom validation function failed to validate "
f"stack '{stack.name}': {err_msg}"
)
__init__(self, required_components=None, custom_validation_function=None)
special
Initializes a StackValidator
instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
required_components |
Optional[AbstractSet[zenml.enums.StackComponentType]] |
Optional set of stack components that must exist in the stack. |
None |
custom_validation_function |
Optional[Callable[[Stack], Tuple[bool, str]]] |
Optional function that returns whether a stack is valid and an error message to show if not valid. |
None |
Source code in zenml/stack/stack_validator.py
def __init__(
self,
required_components: Optional[AbstractSet[StackComponentType]] = None,
custom_validation_function: Optional[
Callable[["Stack"], Tuple[bool, str]]
] = None,
):
"""Initializes a `StackValidator` instance.
Args:
required_components: Optional set of stack components that must
exist in the stack.
custom_validation_function: Optional function that returns whether
a stack is valid and an error message to show if not valid.
"""
self._required_components = required_components or set()
self._custom_validation_function = custom_validation_function
validate(self, stack)
Validates the given stack.
Checks if the stack contains all the required components and passes the custom validation function of the validator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
Stack |
The stack to validate. |
required |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack does not meet all the validation criteria. |
Source code in zenml/stack/stack_validator.py
def validate(self, stack: "Stack") -> None:
"""Validates the given stack.
Checks if the stack contains all the required components and passes
the custom validation function of the validator.
Args:
stack: The stack to validate.
Raises:
StackValidationError: If the stack does not meet all the
validation criteria.
"""
missing_components = self._required_components - set(stack.components)
if missing_components:
raise StackValidationError(
f"Missing stack components {missing_components} for "
f"stack: {stack.name}"
)
if self._custom_validation_function:
valid, err_msg = self._custom_validation_function(stack)
if not valid:
raise StackValidationError(
f"Custom validation function failed to validate "
f"stack '{stack.name}': {err_msg}"
)