Stack
zenml.stack
special
Initialization of the ZenML Stack.
The stack is essentially all the configuration for the infrastructure of your MLOps platform.
A stack is made up of multiple components. Some examples are:
- An Artifact Store
- A Metadata Store
- An Orchestrator
- A Step Operator (Optional)
- A Container Registry (Optional)
authentication_mixin
Stack component mixin for authentication.
AuthenticationMixin (BaseModel)
pydantic-model
Stack component mixin for authentication.
Attributes:
Name | Type | Description |
---|---|---|
authentication_secret |
Optional[str] |
Name of the secret that stores the authentication credentials. |
Source code in zenml/stack/authentication_mixin.py
class AuthenticationMixin(BaseModel):
"""Stack component mixin for authentication.
Attributes:
authentication_secret: Name of the secret that stores the
authentication credentials.
"""
authentication_secret: Optional[str] = None
def get_authentication_secret(
self, expected_schema_type: Type[T]
) -> Optional[T]:
"""Gets the secret referred to by the authentication secret attribute.
Args:
expected_schema_type: The expected secret schema class.
Returns:
The secret object if the `authentication_secret` attribute is set,
`None` otherwise.
Raises:
RuntimeError: If no secrets manager exists in the active stack.
TypeError: If the secret is not of the expected schema type.
"""
if not self.authentication_secret:
return None
active_stack = Repository(skip_repository_check=True).active_stack # type: ignore[call-arg]
secrets_manager = active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Unable to retrieve secret '{self.authentication_secret}' "
"because the active stack does not have a secrets manager."
)
secret = secrets_manager.get_secret(self.authentication_secret)
if not isinstance(secret, expected_schema_type):
raise TypeError(
f"Authentication secret has type {secret.TYPE} but a secret of "
f"type {expected_schema_type.TYPE} was expected. To solve this "
f"issue, register a secret with name "
f"{self.authentication_secret} of type "
f"{expected_schema_type.TYPE} using the following command: \n "
f"`zenml secret register {self.authentication_secret} "
f"--schema={expected_schema_type.TYPE} ...`"
)
return secret
get_authentication_secret(self, expected_schema_type)
Gets the secret referred to by the authentication secret attribute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
expected_schema_type |
Type[~T] |
The expected secret schema class. |
required |
Returns:
Type | Description |
---|---|
Optional[~T] |
The secret object if the |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no secrets manager exists in the active stack. |
TypeError |
If the secret is not of the expected schema type. |
Source code in zenml/stack/authentication_mixin.py
def get_authentication_secret(
self, expected_schema_type: Type[T]
) -> Optional[T]:
"""Gets the secret referred to by the authentication secret attribute.
Args:
expected_schema_type: The expected secret schema class.
Returns:
The secret object if the `authentication_secret` attribute is set,
`None` otherwise.
Raises:
RuntimeError: If no secrets manager exists in the active stack.
TypeError: If the secret is not of the expected schema type.
"""
if not self.authentication_secret:
return None
active_stack = Repository(skip_repository_check=True).active_stack # type: ignore[call-arg]
secrets_manager = active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Unable to retrieve secret '{self.authentication_secret}' "
"because the active stack does not have a secrets manager."
)
secret = secrets_manager.get_secret(self.authentication_secret)
if not isinstance(secret, expected_schema_type):
raise TypeError(
f"Authentication secret has type {secret.TYPE} but a secret of "
f"type {expected_schema_type.TYPE} was expected. To solve this "
f"issue, register a secret with name "
f"{self.authentication_secret} of type "
f"{expected_schema_type.TYPE} using the following command: \n "
f"`zenml secret register {self.authentication_secret} "
f"--schema={expected_schema_type.TYPE} ...`"
)
return secret
flavor_registry
Implementation of the ZenML flavor registry.
FlavorRegistry
Registry for stack component flavors.
The flavors defined by ZenML must be registered here.
Source code in zenml/stack/flavor_registry.py
class FlavorRegistry:
"""Registry for stack component flavors.
The flavors defined by ZenML must be registered here.
"""
def __init__(self) -> None:
"""Initialization of the flavors."""
self._flavors: DefaultDict[
StackComponentType, Dict[str, FlavorWrapper]
] = defaultdict(dict)
self.register_default_flavors()
self.register_integration_flavors()
def register_default_flavors(self) -> None:
"""Registers the default built-in flavors."""
from zenml.artifact_stores import LocalArtifactStore
from zenml.container_registries import (
AzureContainerRegistry,
DefaultContainerRegistry,
DockerHubContainerRegistry,
GCPContainerRegistry,
GitHubContainerRegistry,
)
from zenml.metadata_stores import (
MySQLMetadataStore,
SQLiteMetadataStore,
)
from zenml.orchestrators import LocalOrchestrator
from zenml.secrets_managers import LocalSecretsManager
default_flavors = [
LocalOrchestrator,
SQLiteMetadataStore,
MySQLMetadataStore,
LocalArtifactStore,
DefaultContainerRegistry,
AzureContainerRegistry,
DockerHubContainerRegistry,
GCPContainerRegistry,
GitHubContainerRegistry,
LocalSecretsManager,
]
for flavor in default_flavors:
self._register_flavor(
FlavorWrapper(
name=flavor.FLAVOR, # type: ignore[attr-defined]
type=flavor.TYPE, # type: ignore[attr-defined]
source=flavor.__module__ + "." + flavor.__name__,
integration="built-in",
)
)
def register_integration_flavors(self) -> None:
"""Registers the flavors implemented by integrations."""
from zenml.integrations.registry import integration_registry
for integration in integration_registry.integrations.values():
integrated_flavors = integration.flavors()
if integrated_flavors:
for flavor in integrated_flavors:
self._register_flavor(flavor)
def _register_flavor(
self,
flavor: FlavorWrapper,
) -> None:
"""Registers a stack component flavor.
Args:
flavor: The flavor to register.
Raises:
KeyError: If the flavor is already registered.
"""
flavors = self._flavors[flavor.type]
if flavor.name in flavors:
raise KeyError(
f"There is already a {flavor.type} with the flavor "
f"`{flavor.name}`. Please select another name for the flavor."
)
flavors[flavor.name] = flavor
logger.debug(
f"Registered flavor for '{flavor.name}' and type '{flavor.type}'.",
)
def get_flavors_by_type(
self, component_type: StackComponentType
) -> Dict[str, FlavorWrapper]:
"""Return the list of flavors with given type.
Args:
component_type: The type of the stack component.
Returns:
The list of flavors with the given type.
"""
return self._flavors[component_type]
__init__(self)
special
Initialization of the flavors.
Source code in zenml/stack/flavor_registry.py
def __init__(self) -> None:
"""Initialization of the flavors."""
self._flavors: DefaultDict[
StackComponentType, Dict[str, FlavorWrapper]
] = defaultdict(dict)
self.register_default_flavors()
self.register_integration_flavors()
get_flavors_by_type(self, component_type)
Return the list of flavors with given type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
The type of the stack component. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
The list of flavors with the given type. |
Source code in zenml/stack/flavor_registry.py
def get_flavors_by_type(
self, component_type: StackComponentType
) -> Dict[str, FlavorWrapper]:
"""Return the list of flavors with given type.
Args:
component_type: The type of the stack component.
Returns:
The list of flavors with the given type.
"""
return self._flavors[component_type]
register_default_flavors(self)
Registers the default built-in flavors.
Source code in zenml/stack/flavor_registry.py
def register_default_flavors(self) -> None:
"""Registers the default built-in flavors."""
from zenml.artifact_stores import LocalArtifactStore
from zenml.container_registries import (
AzureContainerRegistry,
DefaultContainerRegistry,
DockerHubContainerRegistry,
GCPContainerRegistry,
GitHubContainerRegistry,
)
from zenml.metadata_stores import (
MySQLMetadataStore,
SQLiteMetadataStore,
)
from zenml.orchestrators import LocalOrchestrator
from zenml.secrets_managers import LocalSecretsManager
default_flavors = [
LocalOrchestrator,
SQLiteMetadataStore,
MySQLMetadataStore,
LocalArtifactStore,
DefaultContainerRegistry,
AzureContainerRegistry,
DockerHubContainerRegistry,
GCPContainerRegistry,
GitHubContainerRegistry,
LocalSecretsManager,
]
for flavor in default_flavors:
self._register_flavor(
FlavorWrapper(
name=flavor.FLAVOR, # type: ignore[attr-defined]
type=flavor.TYPE, # type: ignore[attr-defined]
source=flavor.__module__ + "." + flavor.__name__,
integration="built-in",
)
)
register_integration_flavors(self)
Registers the flavors implemented by integrations.
Source code in zenml/stack/flavor_registry.py
def register_integration_flavors(self) -> None:
"""Registers the flavors implemented by integrations."""
from zenml.integrations.registry import integration_registry
for integration in integration_registry.integrations.values():
integrated_flavors = integration.flavors()
if integrated_flavors:
for flavor in integrated_flavors:
self._register_flavor(flavor)
stack
Implementation of the ZenML Stack class.
Stack
ZenML stack class.
A ZenML stack is a collection of multiple stack components that are required to run ZenML pipelines. Some of these components (orchestrator, metadata store and artifact store) are required to run any kind of pipeline, other components like the container registry are only required if other stack components depend on them.
Source code in zenml/stack/stack.py
class Stack:
"""ZenML stack class.
A ZenML stack is a collection of multiple stack components that are
required to run ZenML pipelines. Some of these components (orchestrator,
metadata store and artifact store) are required to run any kind of
pipeline, other components like the container registry are only required
if other stack components depend on them.
"""
def __init__(
self,
name: str,
*,
orchestrator: "BaseOrchestrator",
metadata_store: "BaseMetadataStore",
artifact_store: "BaseArtifactStore",
container_registry: Optional["BaseContainerRegistry"] = None,
secrets_manager: Optional["BaseSecretsManager"] = None,
step_operator: Optional["BaseStepOperator"] = None,
feature_store: Optional["BaseFeatureStore"] = None,
model_deployer: Optional["BaseModelDeployer"] = None,
experiment_tracker: Optional["BaseExperimentTracker"] = None,
alerter: Optional["BaseAlerter"] = None,
annotator: Optional["BaseAnnotator"] = None,
data_validator: Optional["BaseDataValidator"] = None,
):
"""Initializes and validates a stack instance.
# noqa: DAR402
Args:
name: Name of the stack.
orchestrator: Orchestrator component of the stack.
metadata_store: Metadata store component of the stack.
artifact_store: Artifact store component of the stack.
container_registry: Container registry component of the stack.
secrets_manager: Secrets manager component of the stack.
step_operator: Step operator component of the stack.
feature_store: Feature store component of the stack.
model_deployer: Model deployer component of the stack.
experiment_tracker: Experiment tracker component of the stack.
alerter: Alerter component of the stack.
annotator: Annotator component of the stack.
data_validator: Data validator component of the stack.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self._name = name
self._orchestrator = orchestrator
self._metadata_store = metadata_store
self._artifact_store = artifact_store
self._container_registry = container_registry
self._step_operator = step_operator
self._secrets_manager = secrets_manager
self._feature_store = feature_store
self._model_deployer = model_deployer
self._experiment_tracker = experiment_tracker
self._alerter = alerter
self._annotator = annotator
self._data_validator = data_validator
@classmethod
def from_components(
cls, name: str, components: Dict[StackComponentType, "StackComponent"]
) -> "Stack":
"""Creates a stack instance from a dict of stack components.
# noqa: DAR402
Args:
name: The name of the stack.
components: The components of the stack.
Returns:
A stack instance consisting of the given components.
Raises:
TypeError: If a required component is missing or a component
doesn't inherit from the expected base class.
"""
from zenml.alerter import BaseAlerter
from zenml.annotators import BaseAnnotator
from zenml.artifact_stores import BaseArtifactStore
from zenml.container_registries import BaseContainerRegistry
from zenml.data_validators import BaseDataValidator
from zenml.experiment_trackers import BaseExperimentTracker
from zenml.feature_stores import BaseFeatureStore
from zenml.metadata_stores import BaseMetadataStore
from zenml.model_deployers import BaseModelDeployer
from zenml.orchestrators import BaseOrchestrator
from zenml.secrets_managers import BaseSecretsManager
from zenml.step_operators import BaseStepOperator
def _raise_type_error(
component: Optional["StackComponent"], expected_class: Type[Any]
) -> NoReturn:
"""Raises a TypeError that the component has an unexpected type.
Args:
component: The component that has an unexpected type.
expected_class: The expected type of the component.
Raises:
TypeError: If the component has an unexpected type.
"""
raise TypeError(
f"Unable to create stack: Wrong stack component type "
f"`{component.__class__.__name__}` (expected: subclass "
f"of `{expected_class.__name__}`)"
)
orchestrator = components.get(StackComponentType.ORCHESTRATOR)
if not isinstance(orchestrator, BaseOrchestrator):
_raise_type_error(orchestrator, BaseOrchestrator)
metadata_store = components.get(StackComponentType.METADATA_STORE)
if not isinstance(metadata_store, BaseMetadataStore):
_raise_type_error(metadata_store, BaseMetadataStore)
artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
if not isinstance(artifact_store, BaseArtifactStore):
_raise_type_error(artifact_store, BaseArtifactStore)
container_registry = components.get(
StackComponentType.CONTAINER_REGISTRY
)
if container_registry is not None and not isinstance(
container_registry, BaseContainerRegistry
):
_raise_type_error(container_registry, BaseContainerRegistry)
secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
if secrets_manager is not None and not isinstance(
secrets_manager, BaseSecretsManager
):
_raise_type_error(secrets_manager, BaseSecretsManager)
step_operator = components.get(StackComponentType.STEP_OPERATOR)
if step_operator is not None and not isinstance(
step_operator, BaseStepOperator
):
_raise_type_error(step_operator, BaseStepOperator)
feature_store = components.get(StackComponentType.FEATURE_STORE)
if feature_store is not None and not isinstance(
feature_store, BaseFeatureStore
):
_raise_type_error(feature_store, BaseFeatureStore)
model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
if model_deployer is not None and not isinstance(
model_deployer, BaseModelDeployer
):
_raise_type_error(model_deployer, BaseModelDeployer)
experiment_tracker = components.get(
StackComponentType.EXPERIMENT_TRACKER
)
if experiment_tracker is not None and not isinstance(
experiment_tracker, BaseExperimentTracker
):
_raise_type_error(experiment_tracker, BaseExperimentTracker)
alerter = components.get(StackComponentType.ALERTER)
if alerter is not None and not isinstance(alerter, BaseAlerter):
_raise_type_error(alerter, BaseAlerter)
annotator = components.get(StackComponentType.ANNOTATOR)
if annotator is not None and not isinstance(annotator, BaseAnnotator):
_raise_type_error(annotator, BaseAnnotator)
data_validator = components.get(StackComponentType.DATA_VALIDATOR)
if data_validator is not None and not isinstance(
data_validator, BaseDataValidator
):
_raise_type_error(data_validator, BaseDataValidator)
return Stack(
name=name,
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
container_registry=container_registry,
secrets_manager=secrets_manager,
step_operator=step_operator,
feature_store=feature_store,
model_deployer=model_deployer,
experiment_tracker=experiment_tracker,
alerter=alerter,
annotator=annotator,
data_validator=data_validator,
)
@classmethod
def default_local_stack(cls) -> "Stack":
"""Creates a stack instance which is configured to run locally.
Returns:
A stack instance configured to run locally.
"""
from zenml.artifact_stores import LocalArtifactStore
from zenml.metadata_stores import SQLiteMetadataStore
from zenml.orchestrators import LocalOrchestrator
orchestrator = LocalOrchestrator(name="default")
artifact_store_uuid = uuid.uuid4()
artifact_store_path = os.path.join(
GlobalConfiguration().config_directory,
"local_stores",
str(artifact_store_uuid),
)
io_utils.create_dir_recursive_if_not_exists(artifact_store_path)
artifact_store = LocalArtifactStore(
name="default",
uuid=artifact_store_uuid,
path=artifact_store_path,
)
metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
metadata_store = SQLiteMetadataStore(
name="default", uri=metadata_store_path
)
return cls(
name="default",
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
)
@property
def components(self) -> Dict[StackComponentType, "StackComponent"]:
"""All components of the stack.
Returns:
A dictionary of all components of the stack.
"""
return {
component.TYPE: component
for component in [
self.orchestrator,
self.metadata_store,
self.artifact_store,
self.container_registry,
self.secrets_manager,
self.step_operator,
self.feature_store,
self.model_deployer,
self.experiment_tracker,
self.alerter,
self.annotator,
self.data_validator,
]
if component is not None
}
@property
def name(self) -> str:
"""The name of the stack.
Returns:
str: The name of the stack.
"""
return self._name
@property
def orchestrator(self) -> "BaseOrchestrator":
"""The orchestrator of the stack.
Returns:
The orchestrator of the stack.
"""
return self._orchestrator
@property
def metadata_store(self) -> "BaseMetadataStore":
"""The metadata store of the stack.
Returns:
The metadata store of the stack.
"""
return self._metadata_store
@property
def artifact_store(self) -> "BaseArtifactStore":
"""The artifact store of the stack.
Returns:
The artifact store of the stack.
"""
return self._artifact_store
@property
def container_registry(self) -> Optional["BaseContainerRegistry"]:
"""The container registry of the stack.
Returns:
The container registry of the stack or None if the stack does not
have a container registry.
"""
return self._container_registry
@property
def secrets_manager(self) -> Optional["BaseSecretsManager"]:
"""The secrets manager of the stack.
Returns:
The secrets manager of the stack.
"""
return self._secrets_manager
@property
def step_operator(self) -> Optional["BaseStepOperator"]:
"""The step operator of the stack.
Returns:
The step operator of the stack.
"""
return self._step_operator
@property
def feature_store(self) -> Optional["BaseFeatureStore"]:
"""The feature store of the stack.
Returns:
The feature store of the stack.
"""
return self._feature_store
@property
def model_deployer(self) -> Optional["BaseModelDeployer"]:
"""The model deployer of the stack.
Returns:
The model deployer of the stack.
"""
return self._model_deployer
@property
def experiment_tracker(self) -> Optional["BaseExperimentTracker"]:
"""The experiment tracker of the stack.
Returns:
The experiment tracker of the stack.
"""
return self._experiment_tracker
@property
def alerter(self) -> Optional["BaseAlerter"]:
"""The alerter of the stack.
Returns:
The alerter of the stack.
"""
return self._alerter
@property
def annotator(self) -> Optional["BaseAnnotator"]:
"""The annotator of the stack.
Returns:
The annotator of the stack.
"""
return self._annotator
@property
def data_validator(self) -> Optional["BaseDataValidator"]:
"""The data validator of the stack.
Returns:
The data validator of the stack.
"""
return self._data_validator
@property
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options that are available to configure this stack.
This method combines the available runtime options for all components
of this stack. See `StackComponent.runtime_options()` for
more information.
Returns:
A dictionary of runtime options.
"""
runtime_options: Dict[str, Any] = {}
for component in self.components.values():
duplicate_runtime_options = (
runtime_options.keys() & component.runtime_options.keys()
)
if duplicate_runtime_options:
logger.warning(
"Found duplicate runtime options %s.",
duplicate_runtime_options,
)
runtime_options.update(component.runtime_options)
return runtime_options
def dict(self) -> Dict[str, str]:
"""Converts the stack into a dictionary.
Returns:
A dictionary containing the stack components.
"""
component_dict = {
component_type.value: component.json(sort_keys=True)
for component_type, component in self.components.items()
}
component_dict.update({"name": self.name})
return component_dict
def requirements(
self,
exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
"""Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in `exclude_components`).
Args:
exclude_components: Set of component types for which the
requirements should not be included in the output.
Returns:
Set of PyPI requirements.
"""
exclude_components = exclude_components or set()
requirements = [
component.requirements
for component in self.components.values()
if component.TYPE not in exclude_components
]
return set.union(*requirements) if requirements else set()
def validate(self) -> None:
"""Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- all components must support the execution mode (either local or
remote execution) specified by the orchestrator of the stack
- the `StackValidator` of each stack component has to validate the
stack to make sure all the components are compatible with each other
"""
for component in self.components.values():
if component.validator:
component.validator.validate(stack=self)
def _register_pipeline_run(
self,
pipeline: "BasePipeline",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Registers a pipeline run in the ZenStore.
Args:
pipeline: The pipeline that is being run.
runtime_configuration: The runtime configuration of the pipeline.
"""
from zenml.repository import Repository
from zenml.zen_stores.models import StackWrapper
from zenml.zen_stores.models.pipeline_models import (
PipelineRunWrapper,
PipelineWrapper,
)
repo = Repository()
active_project = repo.active_project
pipeline_run_wrapper = PipelineRunWrapper(
name=runtime_configuration.run_name,
pipeline=PipelineWrapper.from_pipeline(pipeline),
stack=StackWrapper.from_stack(self),
runtime_configuration=runtime_configuration,
user_id=repo.active_user.id,
project_name=active_project.name if active_project else None,
)
Repository().zen_store.register_pipeline_run(pipeline_run_wrapper)
def deploy_pipeline(
self,
pipeline: "BasePipeline",
runtime_configuration: RuntimeConfiguration,
) -> Any:
"""Deploys a pipeline on this stack.
Args:
pipeline: The pipeline to deploy.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self.validate()
for component in self.components.values():
if not component.is_running:
raise StackValidationError(
f"The '{component.name}' {component.TYPE} stack component "
f"is not currently running. Please run the following "
f"command to provision and start the component:\n\n"
f" `zenml stack up`\n"
)
for component in self.components.values():
component.prepare_pipeline_deployment(
pipeline=pipeline,
stack=self,
runtime_configuration=runtime_configuration,
)
for component in self.components.values():
component.prepare_pipeline_run()
runtime_configuration[
RUN_NAME_OPTION_KEY
] = runtime_configuration.run_name or (
f"{pipeline.name}-"
f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
)
logger.info(
"Using stack `%s` to run pipeline `%s`...",
self.name,
pipeline.name,
)
start_time = time.time()
original_cache_boolean = pipeline.enable_cache
if "enable_cache" in runtime_configuration:
logger.info(
"Runtime configuration overwriting the pipeline cache settings"
" to enable_cache=`%s` for this pipeline run. The default "
"caching strategy is retained for future pipeline runs.",
runtime_configuration["enable_cache"],
)
pipeline.enable_cache = runtime_configuration.get("enable_cache")
self._register_pipeline_run(
pipeline=pipeline, runtime_configuration=runtime_configuration
)
return_value = self.orchestrator.run(
pipeline, stack=self, runtime_configuration=runtime_configuration
)
# Put pipeline level cache policy back to make sure the next runs
# default to that policy again in case the runtime configuration
# is not set explicitly
pipeline.enable_cache = original_cache_boolean
run_duration = time.time() - start_time
logger.info(
"Pipeline run `%s` has finished in %s.",
runtime_configuration.run_name,
string_utils.get_human_readable_time(run_duration),
)
for component in self.components.values():
component.cleanup_pipeline_run()
return return_value
def prepare_step_run(self) -> None:
"""Prepares running a step."""
for component in self.components.values():
component.prepare_step_run()
def cleanup_step_run(self) -> None:
"""Cleans up resources after the step run is finished."""
for component in self.components.values():
component.cleanup_step_run()
@property
def is_provisioned(self) -> bool:
"""If the stack provisioned resources to run locally.
Returns:
True if the stack provisioned resources to run locally.
"""
return all(
component.is_provisioned for component in self.components.values()
)
@property
def is_running(self) -> bool:
"""If the stack is running locally.
Returns:
True if the stack is running locally, False otherwise.
"""
return all(
component.is_running for component in self.components.values()
)
def provision(self) -> None:
"""Provisions resources to run the stack locally."""
logger.info("Provisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if not component.is_provisioned:
component.provision()
logger.info("Provisioned resources for %s.", component)
def deprovision(self) -> None:
"""Deprovisions all local resources of the stack."""
logger.info("Deprovisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if component.is_provisioned:
try:
component.deprovision()
logger.info("Deprovisioned resources for %s.", component)
except NotImplementedError as e:
logger.warning(e)
def resume(self) -> None:
"""Resumes the provisioned local resources of the stack.
Raises:
ProvisioningError: If any stack component is missing provisioned
resources.
"""
logger.info("Resuming provisioned resources for stack %s.", self.name)
for component in self.components.values():
if component.is_running:
# the component is already running, no need to resume anything
pass
elif component.is_provisioned:
component.resume()
logger.info("Resumed resources for %s.", component)
else:
raise ProvisioningError(
f"Unable to resume resources for {component}: No "
f"resources have been provisioned for this component."
)
def suspend(self) -> None:
"""Suspends the provisioned local resources of the stack."""
logger.info(
"Suspending provisioned resources for stack '%s'.", self.name
)
for component in self.components.values():
if not component.is_suspended:
try:
component.suspend()
logger.info("Suspended resources for %s.", component)
except NotImplementedError:
logger.warning(
"Suspending provisioned resources not implemented "
"for %s. Continuing without suspending resources...",
component,
)
alerter: Optional[BaseAlerter]
property
readonly
The alerter of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseAlerter] |
The alerter of the stack. |
annotator: Optional[BaseAnnotator]
property
readonly
The annotator of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseAnnotator] |
The annotator of the stack. |
artifact_store: BaseArtifactStore
property
readonly
The artifact store of the stack.
Returns:
Type | Description |
---|---|
BaseArtifactStore |
The artifact store of the stack. |
components: Dict[zenml.enums.StackComponentType, StackComponent]
property
readonly
All components of the stack.
Returns:
Type | Description |
---|---|
Dict[zenml.enums.StackComponentType, StackComponent] |
A dictionary of all components of the stack. |
container_registry: Optional[BaseContainerRegistry]
property
readonly
The container registry of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseContainerRegistry] |
The container registry of the stack or None if the stack does not have a container registry. |
data_validator: Optional[BaseDataValidator]
property
readonly
The data validator of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseDataValidator] |
The data validator of the stack. |
experiment_tracker: Optional[BaseExperimentTracker]
property
readonly
The experiment tracker of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseExperimentTracker] |
The experiment tracker of the stack. |
feature_store: Optional[BaseFeatureStore]
property
readonly
The feature store of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseFeatureStore] |
The feature store of the stack. |
is_provisioned: bool
property
readonly
If the stack provisioned resources to run locally.
Returns:
Type | Description |
---|---|
bool |
True if the stack provisioned resources to run locally. |
is_running: bool
property
readonly
If the stack is running locally.
Returns:
Type | Description |
---|---|
bool |
True if the stack is running locally, False otherwise. |
metadata_store: BaseMetadataStore
property
readonly
The metadata store of the stack.
Returns:
Type | Description |
---|---|
BaseMetadataStore |
The metadata store of the stack. |
model_deployer: Optional[BaseModelDeployer]
property
readonly
The model deployer of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseModelDeployer] |
The model deployer of the stack. |
name: str
property
readonly
The name of the stack.
Returns:
Type | Description |
---|---|
str |
The name of the stack. |
orchestrator: BaseOrchestrator
property
readonly
The orchestrator of the stack.
Returns:
Type | Description |
---|---|
BaseOrchestrator |
The orchestrator of the stack. |
runtime_options: Dict[str, Any]
property
readonly
Runtime options that are available to configure this stack.
This method combines the available runtime options for all components
of this stack. See StackComponent.runtime_options()
for
more information.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary of runtime options. |
secrets_manager: Optional[BaseSecretsManager]
property
readonly
The secrets manager of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseSecretsManager] |
The secrets manager of the stack. |
step_operator: Optional[BaseStepOperator]
property
readonly
The step operator of the stack.
Returns:
Type | Description |
---|---|
Optional[BaseStepOperator] |
The step operator of the stack. |
__init__(self, name, *, orchestrator, metadata_store, artifact_store, container_registry=None, secrets_manager=None, step_operator=None, feature_store=None, model_deployer=None, experiment_tracker=None, alerter=None, annotator=None, data_validator=None)
special
Initializes and validates a stack instance.
noqa: DAR402
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the stack. |
required |
orchestrator |
BaseOrchestrator |
Orchestrator component of the stack. |
required |
metadata_store |
BaseMetadataStore |
Metadata store component of the stack. |
required |
artifact_store |
BaseArtifactStore |
Artifact store component of the stack. |
required |
container_registry |
Optional[BaseContainerRegistry] |
Container registry component of the stack. |
None |
secrets_manager |
Optional[BaseSecretsManager] |
Secrets manager component of the stack. |
None |
step_operator |
Optional[BaseStepOperator] |
Step operator component of the stack. |
None |
feature_store |
Optional[BaseFeatureStore] |
Feature store component of the stack. |
None |
model_deployer |
Optional[BaseModelDeployer] |
Model deployer component of the stack. |
None |
experiment_tracker |
Optional[BaseExperimentTracker] |
Experiment tracker component of the stack. |
None |
alerter |
Optional[BaseAlerter] |
Alerter component of the stack. |
None |
annotator |
Optional[BaseAnnotator] |
Annotator component of the stack. |
None |
data_validator |
Optional[BaseDataValidator] |
Data validator component of the stack. |
None |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack configuration is not valid. |
Source code in zenml/stack/stack.py
def __init__(
self,
name: str,
*,
orchestrator: "BaseOrchestrator",
metadata_store: "BaseMetadataStore",
artifact_store: "BaseArtifactStore",
container_registry: Optional["BaseContainerRegistry"] = None,
secrets_manager: Optional["BaseSecretsManager"] = None,
step_operator: Optional["BaseStepOperator"] = None,
feature_store: Optional["BaseFeatureStore"] = None,
model_deployer: Optional["BaseModelDeployer"] = None,
experiment_tracker: Optional["BaseExperimentTracker"] = None,
alerter: Optional["BaseAlerter"] = None,
annotator: Optional["BaseAnnotator"] = None,
data_validator: Optional["BaseDataValidator"] = None,
):
"""Initializes and validates a stack instance.
# noqa: DAR402
Args:
name: Name of the stack.
orchestrator: Orchestrator component of the stack.
metadata_store: Metadata store component of the stack.
artifact_store: Artifact store component of the stack.
container_registry: Container registry component of the stack.
secrets_manager: Secrets manager component of the stack.
step_operator: Step operator component of the stack.
feature_store: Feature store component of the stack.
model_deployer: Model deployer component of the stack.
experiment_tracker: Experiment tracker component of the stack.
alerter: Alerter component of the stack.
annotator: Annotator component of the stack.
data_validator: Data validator component of the stack.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self._name = name
self._orchestrator = orchestrator
self._metadata_store = metadata_store
self._artifact_store = artifact_store
self._container_registry = container_registry
self._step_operator = step_operator
self._secrets_manager = secrets_manager
self._feature_store = feature_store
self._model_deployer = model_deployer
self._experiment_tracker = experiment_tracker
self._alerter = alerter
self._annotator = annotator
self._data_validator = data_validator
cleanup_step_run(self)
Cleans up resources after the step run is finished.
Source code in zenml/stack/stack.py
def cleanup_step_run(self) -> None:
"""Cleans up resources after the step run is finished."""
for component in self.components.values():
component.cleanup_step_run()
default_local_stack()
classmethod
Creates a stack instance which is configured to run locally.
Returns:
Type | Description |
---|---|
Stack |
A stack instance configured to run locally. |
Source code in zenml/stack/stack.py
@classmethod
def default_local_stack(cls) -> "Stack":
"""Creates a stack instance which is configured to run locally.
Returns:
A stack instance configured to run locally.
"""
from zenml.artifact_stores import LocalArtifactStore
from zenml.metadata_stores import SQLiteMetadataStore
from zenml.orchestrators import LocalOrchestrator
orchestrator = LocalOrchestrator(name="default")
artifact_store_uuid = uuid.uuid4()
artifact_store_path = os.path.join(
GlobalConfiguration().config_directory,
"local_stores",
str(artifact_store_uuid),
)
io_utils.create_dir_recursive_if_not_exists(artifact_store_path)
artifact_store = LocalArtifactStore(
name="default",
uuid=artifact_store_uuid,
path=artifact_store_path,
)
metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
metadata_store = SQLiteMetadataStore(
name="default", uri=metadata_store_path
)
return cls(
name="default",
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
)
deploy_pipeline(self, pipeline, runtime_configuration)
Deploys a pipeline on this stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline to deploy. |
required |
runtime_configuration |
RuntimeConfiguration |
Contains all the runtime configuration options specified for the pipeline run. |
required |
Returns:
Type | Description |
---|---|
Any |
The return value of the call to |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack configuration is not valid. |
Source code in zenml/stack/stack.py
def deploy_pipeline(
self,
pipeline: "BasePipeline",
runtime_configuration: RuntimeConfiguration,
) -> Any:
"""Deploys a pipeline on this stack.
Args:
pipeline: The pipeline to deploy.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self.validate()
for component in self.components.values():
if not component.is_running:
raise StackValidationError(
f"The '{component.name}' {component.TYPE} stack component "
f"is not currently running. Please run the following "
f"command to provision and start the component:\n\n"
f" `zenml stack up`\n"
)
for component in self.components.values():
component.prepare_pipeline_deployment(
pipeline=pipeline,
stack=self,
runtime_configuration=runtime_configuration,
)
for component in self.components.values():
component.prepare_pipeline_run()
runtime_configuration[
RUN_NAME_OPTION_KEY
] = runtime_configuration.run_name or (
f"{pipeline.name}-"
f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
)
logger.info(
"Using stack `%s` to run pipeline `%s`...",
self.name,
pipeline.name,
)
start_time = time.time()
original_cache_boolean = pipeline.enable_cache
if "enable_cache" in runtime_configuration:
logger.info(
"Runtime configuration overwriting the pipeline cache settings"
" to enable_cache=`%s` for this pipeline run. The default "
"caching strategy is retained for future pipeline runs.",
runtime_configuration["enable_cache"],
)
pipeline.enable_cache = runtime_configuration.get("enable_cache")
self._register_pipeline_run(
pipeline=pipeline, runtime_configuration=runtime_configuration
)
return_value = self.orchestrator.run(
pipeline, stack=self, runtime_configuration=runtime_configuration
)
# Put pipeline level cache policy back to make sure the next runs
# default to that policy again in case the runtime configuration
# is not set explicitly
pipeline.enable_cache = original_cache_boolean
run_duration = time.time() - start_time
logger.info(
"Pipeline run `%s` has finished in %s.",
runtime_configuration.run_name,
string_utils.get_human_readable_time(run_duration),
)
for component in self.components.values():
component.cleanup_pipeline_run()
return return_value
deprovision(self)
Deprovisions all local resources of the stack.
Source code in zenml/stack/stack.py
def deprovision(self) -> None:
"""Deprovisions all local resources of the stack."""
logger.info("Deprovisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if component.is_provisioned:
try:
component.deprovision()
logger.info("Deprovisioned resources for %s.", component)
except NotImplementedError as e:
logger.warning(e)
dict(self)
Converts the stack into a dictionary.
Returns:
Type | Description |
---|---|
Dict[str, str] |
A dictionary containing the stack components. |
Source code in zenml/stack/stack.py
def dict(self) -> Dict[str, str]:
"""Converts the stack into a dictionary.
Returns:
A dictionary containing the stack components.
"""
component_dict = {
component_type.value: component.json(sort_keys=True)
for component_type, component in self.components.items()
}
component_dict.update({"name": self.name})
return component_dict
from_components(name, components)
classmethod
Creates a stack instance from a dict of stack components.
noqa: DAR402
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the stack. |
required |
components |
Dict[zenml.enums.StackComponentType, StackComponent] |
The components of the stack. |
required |
Returns:
Type | Description |
---|---|
Stack |
A stack instance consisting of the given components. |
Exceptions:
Type | Description |
---|---|
TypeError |
If a required component is missing or a component doesn't inherit from the expected base class. |
Source code in zenml/stack/stack.py
@classmethod
def from_components(
cls, name: str, components: Dict[StackComponentType, "StackComponent"]
) -> "Stack":
"""Creates a stack instance from a dict of stack components.
# noqa: DAR402
Args:
name: The name of the stack.
components: The components of the stack.
Returns:
A stack instance consisting of the given components.
Raises:
TypeError: If a required component is missing or a component
doesn't inherit from the expected base class.
"""
from zenml.alerter import BaseAlerter
from zenml.annotators import BaseAnnotator
from zenml.artifact_stores import BaseArtifactStore
from zenml.container_registries import BaseContainerRegistry
from zenml.data_validators import BaseDataValidator
from zenml.experiment_trackers import BaseExperimentTracker
from zenml.feature_stores import BaseFeatureStore
from zenml.metadata_stores import BaseMetadataStore
from zenml.model_deployers import BaseModelDeployer
from zenml.orchestrators import BaseOrchestrator
from zenml.secrets_managers import BaseSecretsManager
from zenml.step_operators import BaseStepOperator
def _raise_type_error(
component: Optional["StackComponent"], expected_class: Type[Any]
) -> NoReturn:
"""Raises a TypeError that the component has an unexpected type.
Args:
component: The component that has an unexpected type.
expected_class: The expected type of the component.
Raises:
TypeError: If the component has an unexpected type.
"""
raise TypeError(
f"Unable to create stack: Wrong stack component type "
f"`{component.__class__.__name__}` (expected: subclass "
f"of `{expected_class.__name__}`)"
)
orchestrator = components.get(StackComponentType.ORCHESTRATOR)
if not isinstance(orchestrator, BaseOrchestrator):
_raise_type_error(orchestrator, BaseOrchestrator)
metadata_store = components.get(StackComponentType.METADATA_STORE)
if not isinstance(metadata_store, BaseMetadataStore):
_raise_type_error(metadata_store, BaseMetadataStore)
artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
if not isinstance(artifact_store, BaseArtifactStore):
_raise_type_error(artifact_store, BaseArtifactStore)
container_registry = components.get(
StackComponentType.CONTAINER_REGISTRY
)
if container_registry is not None and not isinstance(
container_registry, BaseContainerRegistry
):
_raise_type_error(container_registry, BaseContainerRegistry)
secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
if secrets_manager is not None and not isinstance(
secrets_manager, BaseSecretsManager
):
_raise_type_error(secrets_manager, BaseSecretsManager)
step_operator = components.get(StackComponentType.STEP_OPERATOR)
if step_operator is not None and not isinstance(
step_operator, BaseStepOperator
):
_raise_type_error(step_operator, BaseStepOperator)
feature_store = components.get(StackComponentType.FEATURE_STORE)
if feature_store is not None and not isinstance(
feature_store, BaseFeatureStore
):
_raise_type_error(feature_store, BaseFeatureStore)
model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
if model_deployer is not None and not isinstance(
model_deployer, BaseModelDeployer
):
_raise_type_error(model_deployer, BaseModelDeployer)
experiment_tracker = components.get(
StackComponentType.EXPERIMENT_TRACKER
)
if experiment_tracker is not None and not isinstance(
experiment_tracker, BaseExperimentTracker
):
_raise_type_error(experiment_tracker, BaseExperimentTracker)
alerter = components.get(StackComponentType.ALERTER)
if alerter is not None and not isinstance(alerter, BaseAlerter):
_raise_type_error(alerter, BaseAlerter)
annotator = components.get(StackComponentType.ANNOTATOR)
if annotator is not None and not isinstance(annotator, BaseAnnotator):
_raise_type_error(annotator, BaseAnnotator)
data_validator = components.get(StackComponentType.DATA_VALIDATOR)
if data_validator is not None and not isinstance(
data_validator, BaseDataValidator
):
_raise_type_error(data_validator, BaseDataValidator)
return Stack(
name=name,
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
container_registry=container_registry,
secrets_manager=secrets_manager,
step_operator=step_operator,
feature_store=feature_store,
model_deployer=model_deployer,
experiment_tracker=experiment_tracker,
alerter=alerter,
annotator=annotator,
data_validator=data_validator,
)
prepare_step_run(self)
Prepares running a step.
Source code in zenml/stack/stack.py
def prepare_step_run(self) -> None:
"""Prepares running a step."""
for component in self.components.values():
component.prepare_step_run()
provision(self)
Provisions resources to run the stack locally.
Source code in zenml/stack/stack.py
def provision(self) -> None:
"""Provisions resources to run the stack locally."""
logger.info("Provisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if not component.is_provisioned:
component.provision()
logger.info("Provisioned resources for %s.", component)
requirements(self, exclude_components=None)
Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in exclude_components
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exclude_components |
Optional[AbstractSet[zenml.enums.StackComponentType]] |
Set of component types for which the requirements should not be included in the output. |
None |
Returns:
Type | Description |
---|---|
Set[str] |
Set of PyPI requirements. |
Source code in zenml/stack/stack.py
def requirements(
self,
exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
"""Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in `exclude_components`).
Args:
exclude_components: Set of component types for which the
requirements should not be included in the output.
Returns:
Set of PyPI requirements.
"""
exclude_components = exclude_components or set()
requirements = [
component.requirements
for component in self.components.values()
if component.TYPE not in exclude_components
]
return set.union(*requirements) if requirements else set()
resume(self)
Resumes the provisioned local resources of the stack.
Exceptions:
Type | Description |
---|---|
ProvisioningError |
If any stack component is missing provisioned resources. |
Source code in zenml/stack/stack.py
def resume(self) -> None:
"""Resumes the provisioned local resources of the stack.
Raises:
ProvisioningError: If any stack component is missing provisioned
resources.
"""
logger.info("Resuming provisioned resources for stack %s.", self.name)
for component in self.components.values():
if component.is_running:
# the component is already running, no need to resume anything
pass
elif component.is_provisioned:
component.resume()
logger.info("Resumed resources for %s.", component)
else:
raise ProvisioningError(
f"Unable to resume resources for {component}: No "
f"resources have been provisioned for this component."
)
suspend(self)
Suspends the provisioned local resources of the stack.
Source code in zenml/stack/stack.py
def suspend(self) -> None:
"""Suspends the provisioned local resources of the stack."""
logger.info(
"Suspending provisioned resources for stack '%s'.", self.name
)
for component in self.components.values():
if not component.is_suspended:
try:
component.suspend()
logger.info("Suspended resources for %s.", component)
except NotImplementedError:
logger.warning(
"Suspending provisioned resources not implemented "
"for %s. Continuing without suspending resources...",
component,
)
validate(self)
Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- all components must support the execution mode (either local or
remote execution) specified by the orchestrator of the stack
- the StackValidator
of each stack component has to validate the
stack to make sure all the components are compatible with each other
Source code in zenml/stack/stack.py
def validate(self) -> None:
"""Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- all components must support the execution mode (either local or
remote execution) specified by the orchestrator of the stack
- the `StackValidator` of each stack component has to validate the
stack to make sure all the components are compatible with each other
"""
for component in self.components.values():
if component.validator:
component.validator.validate(stack=self)
stack_component
Implementation of the ZenML Stack Component class.
StackComponent (BaseModel, ABC)
pydantic-model
Abstract StackComponent class for all components of a ZenML stack.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of the component. |
uuid |
UUID |
Unique identifier of the component. |
Source code in zenml/stack/stack_component.py
class StackComponent(BaseModel, ABC):
"""Abstract StackComponent class for all components of a ZenML stack.
Attributes:
name: The name of the component.
uuid: Unique identifier of the component.
"""
name: str
uuid: UUID = Field(default_factory=uuid4)
# Class Configuration
TYPE: ClassVar[StackComponentType]
FLAVOR: ClassVar[str]
@property
def log_file(self) -> Optional[str]:
"""Optional path to a log file for the stack component.
Returns:
Optional path to a log file for the stack component.
"""
# TODO [ENG-136]: Add support for multiple log files for a stack
# component. E.g. let each component return a generator that yields
# logs instead of specifying a single file path.
return None
@property
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options that are available to configure this component.
The items of the dictionary should map option names (which can be used
to configure the option in the `RuntimeConfiguration`) to default
values for the option (or `None` if there is no default value).
Returns:
A dictionary of runtime options.
"""
return {}
@property
def requirements(self) -> Set[str]:
"""Set of PyPI requirements for the component.
Returns:
A set of PyPI requirements for the component.
"""
from zenml.integrations.utils import get_requirements_for_module
return set(get_requirements_for_module(self.__module__))
@property
def local_path(self) -> Optional[str]:
"""Path to a local directory used by the component to store persistent information.
This property should only be implemented by components that need to
store persistent information in a directory on the local machine and
also need that information to be available during pipeline runs.
IMPORTANT: the path returned by this property must always be a path
that is relative to the ZenML global config directory. The local
Kubeflow orchestrator relies on this convention to correctly mount the
local folders in the Kubeflow containers. This is an example of a valid
path:
```python
from zenml.utils.io_utils import get_global_config_directory
from zenml.constants import LOCAL_STORES_DIRECTORY_NAME
...
@property
def local_path(self) -> Optional[str]:
return os.path.join(
get_global_config_directory(),
LOCAL_STORES_DIRECTORY_NAME,
str(uuid),
)
```
Returns:
A path to a local directory used by the component to store
persistent information.
"""
return None
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed.
Subclasses should override it if they require runtime configuration
options or if they need to run code before the pipeline deployment.
Args:
pipeline: The pipeline that will be deployed.
stack: The stack on which the pipeline will be deployed.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
"""
def prepare_pipeline_run(self) -> None:
"""Prepares running the pipeline."""
def cleanup_pipeline_run(self) -> None:
"""Cleans up resources after the pipeline run is finished."""
def prepare_step_run(self) -> None:
"""Prepares running a step."""
def cleanup_step_run(self) -> None:
"""Cleans up resources after the step run is finished."""
@property
def post_registration_message(self) -> Optional[str]:
"""Optional message that will be printed after the stack component is registered.
Returns:
An optional message.
"""
return None
@property
def validator(self) -> Optional["StackValidator"]:
"""The optional validator of the stack component.
This validator will be called each time a stack with the stack
component is initialized. Subclasses should override this property
and return a `StackValidator` that makes sure they're not included in
any stack that they're not compatible with.
Returns:
An optional `StackValidator` instance.
"""
return None
@property
def is_provisioned(self) -> bool:
"""If the component provisioned resources to run.
Returns:
True if the component provisioned resources to run.
"""
return True
@property
def is_running(self) -> bool:
"""If the component is running.
Returns:
True if the component is running.
"""
return True
@property
def is_suspended(self) -> bool:
"""If the component is suspended.
Returns:
True if the component is suspended.
"""
return not self.is_running
def provision(self) -> None:
"""Provisions resources to run the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Provisioning resources not implemented for {self}."
)
def deprovision(self) -> None:
"""Deprovisions all resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Deprovisioning resource not implemented for {self}."
)
def resume(self) -> None:
"""Resumes the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Resuming provisioned resources not implemented for {self}."
)
def suspend(self) -> None:
"""Suspends the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Suspending provisioned resources not implemented for {self}."
)
def __repr__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
attribute_representation = ", ".join(
f"{key}={value}" for key, value in self.dict().items()
)
return (
f"{self.__class__.__qualname__}(type={self.TYPE}, "
f"flavor={self.FLAVOR}, {attribute_representation})"
)
def __str__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
return self.__repr__()
@root_validator(skip_on_failure=True)
def _ensure_stack_component_complete(cls, values: Dict[str, Any]) -> Any:
"""Ensures that the stack component is complete.
Args:
values: The values of the stack component.
Returns:
The values of the stack component.
Raises:
StackComponentInterfaceError: If the stack component is not
implemented correctly.
"""
try:
stack_component_type = getattr(cls, "TYPE")
assert stack_component_type in StackComponentType
except (AttributeError, AssertionError):
raise StackComponentInterfaceError(
textwrap.dedent(
"""
When you are working with any classes which subclass from
`zenml.stack.StackComponent` please make sure that your
class has a ClassVar named `TYPE` and its value is set to a
`StackComponentType` from `from zenml.enums import
StackComponentType`.
In most of the cases, this is already done for you within
the implementation of the base concept.
Example:
class BaseArtifactStore(StackComponent):
# Instance Variables
path: str
# Class Variables
TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
"""
)
)
try:
getattr(cls, "FLAVOR")
except AttributeError:
raise StackComponentInterfaceError(
textwrap.dedent(
"""
When you are working with any classes which subclass from
`zenml.stack.StackComponent` please make sure that your
class has a defined ClassVar `FLAVOR`.
Example:
class LocalArtifactStore(BaseArtifactStore):
...
# Define flavor as a ClassVar
FLAVOR: ClassVar[str] = "local"
...
"""
)
)
return values
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# prevent extra attributes during model initialization
extra = Extra.forbid
is_provisioned: bool
property
readonly
If the component provisioned resources to run.
Returns:
Type | Description |
---|---|
bool |
True if the component provisioned resources to run. |
is_running: bool
property
readonly
If the component is running.
Returns:
Type | Description |
---|---|
bool |
True if the component is running. |
is_suspended: bool
property
readonly
If the component is suspended.
Returns:
Type | Description |
---|---|
bool |
True if the component is suspended. |
local_path: Optional[str]
property
readonly
Path to a local directory used by the component to store persistent information.
This property should only be implemented by components that need to store persistent information in a directory on the local machine and also need that information to be available during pipeline runs.
IMPORTANT: the path returned by this property must always be a path that is relative to the ZenML global config directory. The local Kubeflow orchestrator relies on this convention to correctly mount the local folders in the Kubeflow containers. This is an example of a valid path:
from zenml.utils.io_utils import get_global_config_directory
from zenml.constants import LOCAL_STORES_DIRECTORY_NAME
...
@property
def local_path(self) -> Optional[str]:
return os.path.join(
get_global_config_directory(),
LOCAL_STORES_DIRECTORY_NAME,
str(uuid),
)
Returns:
Type | Description |
---|---|
Optional[str] |
A path to a local directory used by the component to store persistent information. |
log_file: Optional[str]
property
readonly
Optional path to a log file for the stack component.
Returns:
Type | Description |
---|---|
Optional[str] |
Optional path to a log file for the stack component. |
post_registration_message: Optional[str]
property
readonly
Optional message that will be printed after the stack component is registered.
Returns:
Type | Description |
---|---|
Optional[str] |
An optional message. |
requirements: Set[str]
property
readonly
Set of PyPI requirements for the component.
Returns:
Type | Description |
---|---|
Set[str] |
A set of PyPI requirements for the component. |
runtime_options: Dict[str, Any]
property
readonly
Runtime options that are available to configure this component.
The items of the dictionary should map option names (which can be used
to configure the option in the RuntimeConfiguration
) to default
values for the option (or None
if there is no default value).
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary of runtime options. |
validator: Optional[StackValidator]
property
readonly
The optional validator of the stack component.
This validator will be called each time a stack with the stack
component is initialized. Subclasses should override this property
and return a StackValidator
that makes sure they're not included in
any stack that they're not compatible with.
Returns:
Type | Description |
---|---|
Optional[StackValidator] |
An optional |
Config
Pydantic configuration class.
Source code in zenml/stack/stack_component.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# prevent extra attributes during model initialization
extra = Extra.forbid
__repr__(self)
special
String representation of the stack component.
Returns:
Type | Description |
---|---|
str |
A string representation of the stack component. |
Source code in zenml/stack/stack_component.py
def __repr__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
attribute_representation = ", ".join(
f"{key}={value}" for key, value in self.dict().items()
)
return (
f"{self.__class__.__qualname__}(type={self.TYPE}, "
f"flavor={self.FLAVOR}, {attribute_representation})"
)
__str__(self)
special
String representation of the stack component.
Returns:
Type | Description |
---|---|
str |
A string representation of the stack component. |
Source code in zenml/stack/stack_component.py
def __str__(self) -> str:
"""String representation of the stack component.
Returns:
A string representation of the stack component.
"""
return self.__repr__()
cleanup_pipeline_run(self)
Cleans up resources after the pipeline run is finished.
Source code in zenml/stack/stack_component.py
def cleanup_pipeline_run(self) -> None:
"""Cleans up resources after the pipeline run is finished."""
cleanup_step_run(self)
Cleans up resources after the step run is finished.
Source code in zenml/stack/stack_component.py
def cleanup_step_run(self) -> None:
"""Cleans up resources after the step run is finished."""
deprovision(self)
Deprovisions all resources of the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def deprovision(self) -> None:
"""Deprovisions all resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Deprovisioning resource not implemented for {self}."
)
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)
Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed. Subclasses should override it if they require runtime configuration options or if they need to run code before the pipeline deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline that will be deployed. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
runtime_configuration |
RuntimeConfiguration |
Contains all the runtime configuration options specified for the pipeline run. |
required |
Source code in zenml/stack/stack_component.py
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed.
Subclasses should override it if they require runtime configuration
options or if they need to run code before the pipeline deployment.
Args:
pipeline: The pipeline that will be deployed.
stack: The stack on which the pipeline will be deployed.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
"""
prepare_pipeline_run(self)
Prepares running the pipeline.
Source code in zenml/stack/stack_component.py
def prepare_pipeline_run(self) -> None:
"""Prepares running the pipeline."""
prepare_step_run(self)
Prepares running a step.
Source code in zenml/stack/stack_component.py
def prepare_step_run(self) -> None:
"""Prepares running a step."""
provision(self)
Provisions resources to run the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def provision(self) -> None:
"""Provisions resources to run the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Provisioning resources not implemented for {self}."
)
resume(self)
Resumes the provisioned resources of the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def resume(self) -> None:
"""Resumes the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Resuming provisioned resources not implemented for {self}."
)
suspend(self)
Suspends the provisioned resources of the component.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the component does not implement this method. |
Source code in zenml/stack/stack_component.py
def suspend(self) -> None:
"""Suspends the provisioned resources of the component.
Raises:
NotImplementedError: If the component does not implement this
method.
"""
raise NotImplementedError(
f"Suspending provisioned resources not implemented for {self}."
)
stack_validator
Implementation of the ZenML Stack Validator.
StackValidator
A StackValidator
is used to validate a stack configuration.
Each StackComponent
can provide a StackValidator
to make sure it is
compatible with all components of the stack. The KubeflowOrchestrator
for example will always require the stack to have a container registry
in order to push the docker images that are required to run a pipeline
in Kubeflow Pipelines.
Source code in zenml/stack/stack_validator.py
class StackValidator:
"""A `StackValidator` is used to validate a stack configuration.
Each `StackComponent` can provide a `StackValidator` to make sure it is
compatible with all components of the stack. The `KubeflowOrchestrator`
for example will always require the stack to have a container registry
in order to push the docker images that are required to run a pipeline
in Kubeflow Pipelines.
"""
def __init__(
self,
required_components: Optional[AbstractSet[StackComponentType]] = None,
custom_validation_function: Optional[
Callable[["Stack"], Tuple[bool, str]]
] = None,
):
"""Initializes a `StackValidator` instance.
Args:
required_components: Optional set of stack components that must
exist in the stack.
custom_validation_function: Optional function that returns whether
a stack is valid and an error message to show if not valid.
"""
self._required_components = required_components or set()
self._custom_validation_function = custom_validation_function
def validate(self, stack: "Stack") -> None:
"""Validates the given stack.
Checks if the stack contains all the required components and passes
the custom validation function of the validator.
Args:
stack: The stack to validate.
Raises:
StackValidationError: If the stack does not meet all the
validation criteria.
"""
missing_components = self._required_components - set(stack.components)
if missing_components:
raise StackValidationError(
f"Missing stack components {missing_components} for "
f"stack: {stack.name}"
)
if self._custom_validation_function:
valid, err_msg = self._custom_validation_function(stack)
if not valid:
raise StackValidationError(
f"Custom validation function failed to validate "
f"stack '{stack.name}': {err_msg}"
)
__init__(self, required_components=None, custom_validation_function=None)
special
Initializes a StackValidator
instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
required_components |
Optional[AbstractSet[zenml.enums.StackComponentType]] |
Optional set of stack components that must exist in the stack. |
None |
custom_validation_function |
Optional[Callable[[Stack], Tuple[bool, str]]] |
Optional function that returns whether a stack is valid and an error message to show if not valid. |
None |
Source code in zenml/stack/stack_validator.py
def __init__(
self,
required_components: Optional[AbstractSet[StackComponentType]] = None,
custom_validation_function: Optional[
Callable[["Stack"], Tuple[bool, str]]
] = None,
):
"""Initializes a `StackValidator` instance.
Args:
required_components: Optional set of stack components that must
exist in the stack.
custom_validation_function: Optional function that returns whether
a stack is valid and an error message to show if not valid.
"""
self._required_components = required_components or set()
self._custom_validation_function = custom_validation_function
validate(self, stack)
Validates the given stack.
Checks if the stack contains all the required components and passes the custom validation function of the validator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
Stack |
The stack to validate. |
required |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack does not meet all the validation criteria. |
Source code in zenml/stack/stack_validator.py
def validate(self, stack: "Stack") -> None:
"""Validates the given stack.
Checks if the stack contains all the required components and passes
the custom validation function of the validator.
Args:
stack: The stack to validate.
Raises:
StackValidationError: If the stack does not meet all the
validation criteria.
"""
missing_components = self._required_components - set(stack.components)
if missing_components:
raise StackValidationError(
f"Missing stack components {missing_components} for "
f"stack: {stack.name}"
)
if self._custom_validation_function:
valid, err_msg = self._custom_validation_function(stack)
if not valid:
raise StackValidationError(
f"Custom validation function failed to validate "
f"stack '{stack.name}': {err_msg}"
)