Stack
zenml.stack
special
stack
Stack
ZenML stack class.
A ZenML stack is a collection of multiple stack components that are required to run ZenML pipelines. Some of these components (orchestrator, metadata store and artifact store) are required to run any kind of pipeline, other components like the container registry are only required if other stack components depend on them.
Source code in zenml/stack/stack.py
class Stack:
"""ZenML stack class.
A ZenML stack is a collection of multiple stack components that are
required to run ZenML pipelines. Some of these components (orchestrator,
metadata store and artifact store) are required to run any kind of
pipeline, other components like the container registry are only required
if other stack components depend on them.
"""
def __init__(
self,
name: str,
*,
orchestrator: "BaseOrchestrator",
metadata_store: "BaseMetadataStore",
artifact_store: "BaseArtifactStore",
container_registry: Optional["BaseContainerRegistry"] = None,
step_operator: Optional["BaseStepOperator"] = None,
):
"""Initializes and validates a stack instance.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self._name = name
self._orchestrator = orchestrator
self._metadata_store = metadata_store
self._artifact_store = artifact_store
self._container_registry = container_registry
self._step_operator = step_operator
self.validate()
@classmethod
def from_components(
cls, name: str, components: Dict[StackComponentType, "StackComponent"]
) -> "Stack":
"""Creates a stack instance from a dict of stack components.
Args:
name: The name of the stack.
components: The components of the stack.
Returns:
A stack instance consisting of the given components.
Raises:
TypeError: If a required component is missing or a component
doesn't inherit from the expected base class.
"""
from zenml.artifact_stores import BaseArtifactStore
from zenml.container_registries import BaseContainerRegistry
from zenml.metadata_stores import BaseMetadataStore
from zenml.orchestrators import BaseOrchestrator
from zenml.step_operators import BaseStepOperator
def _raise_type_error(
component: Optional["StackComponent"], expected_class: Type[Any]
) -> NoReturn:
"""Raises a TypeError that the component has an unexpected type."""
raise TypeError(
f"Unable to create stack: Wrong stack component type "
f"`{component.__class__.__name__}` (expected: subclass "
f"of `{expected_class.__name__}`)"
)
orchestrator = components.get(StackComponentType.ORCHESTRATOR)
if not isinstance(orchestrator, BaseOrchestrator):
_raise_type_error(orchestrator, BaseOrchestrator)
metadata_store = components.get(StackComponentType.METADATA_STORE)
if not isinstance(metadata_store, BaseMetadataStore):
_raise_type_error(metadata_store, BaseMetadataStore)
artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
if not isinstance(artifact_store, BaseArtifactStore):
_raise_type_error(artifact_store, BaseArtifactStore)
container_registry = components.get(
StackComponentType.CONTAINER_REGISTRY
)
if container_registry is not None and not isinstance(
container_registry, BaseContainerRegistry
):
_raise_type_error(container_registry, BaseContainerRegistry)
step_operator = components.get(StackComponentType.STEP_OPERATOR)
if step_operator is not None and not isinstance(
step_operator, BaseStepOperator
):
_raise_type_error(step_operator, BaseStepOperator)
return Stack(
name=name,
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
container_registry=container_registry,
step_operator=step_operator,
)
@classmethod
def default_local_stack(cls) -> "Stack":
"""Creates a stack instance which is configured to run locally."""
from zenml.artifact_stores import LocalArtifactStore
from zenml.metadata_stores import SQLiteMetadataStore
from zenml.orchestrators import LocalOrchestrator
orchestrator = LocalOrchestrator(name="local_orchestrator")
artifact_store_uuid = uuid.uuid4()
artifact_store_path = os.path.join(
GlobalConfig.config_directory(),
"local_stores",
str(artifact_store_uuid),
)
artifact_store = LocalArtifactStore(
name="local_artifact_store",
uuid=artifact_store_uuid,
path=artifact_store_path,
)
metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
metadata_store = SQLiteMetadataStore(
name="local_metadata_store", uri=metadata_store_path
)
return cls(
name="local_stack",
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
)
@property
def components(self) -> Dict[StackComponentType, "StackComponent"]:
"""All components of the stack."""
return {
component.type: component
for component in [
self.orchestrator,
self.metadata_store,
self.artifact_store,
self.container_registry,
self.step_operator,
]
if component is not None
}
@property
def name(self) -> str:
"""The name of the stack."""
return self._name
@property
def orchestrator(self) -> "BaseOrchestrator":
"""The orchestrator of the stack."""
return self._orchestrator
@property
def metadata_store(self) -> "BaseMetadataStore":
"""The metadata store of the stack."""
return self._metadata_store
@property
def artifact_store(self) -> "BaseArtifactStore":
"""The artifact store of the stack."""
return self._artifact_store
@property
def container_registry(self) -> Optional["BaseContainerRegistry"]:
"""The container registry of the stack."""
return self._container_registry
@property
def step_operator(self) -> Optional["BaseStepOperator"]:
"""The step operator of the stack."""
return self._step_operator
@property
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options that are available to configure this stack.
This method combines the available runtime options for all components
of this stack. See `StackComponent.runtime_options()` for
more information.
"""
runtime_options: Dict[str, Any] = {}
for component in self.components.values():
duplicate_runtime_options = (
runtime_options.keys() & component.runtime_options.keys()
)
if duplicate_runtime_options:
logger.warning(
"Found duplicate runtime options %s.",
duplicate_runtime_options,
)
runtime_options.update(component.runtime_options)
return runtime_options
def dict(self) -> Dict[str, str]:
"""Converts the stack into a dictionary."""
component_dict = {
component_type.value: component.json(sort_keys=True)
for component_type, component in self.components.items()
}
component_dict.update({"name": self.name})
return component_dict
def requirements(
self,
exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
"""Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in `exclude_components`).
Args:
exclude_components: Set of component types for which the
requirements should not be included in the output.
"""
exclude_components = exclude_components or set()
requirements = [
component.requirements
for component in self.components.values()
if component.type not in exclude_components
]
return set.union(*requirements) if requirements else set()
def validate(self) -> None:
"""Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- all components must support the execution mode (either local or
remote execution) specified by the orchestrator of the stack
- the `StackValidator` of each stack component has to validate the
stack to make sure all the components are compatible with each other
Raises:
StackValidationError: If the stack configuration is not valid.
"""
for component in self.components.values():
if component.validator:
component.validator.validate(stack=self)
def deploy_pipeline(
self,
pipeline: "BasePipeline",
runtime_configuration: RuntimeConfiguration,
) -> Any:
"""Deploys a pipeline on this stack.
Args:
pipeline: The pipeline to deploy.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
"""
for component in self.components.values():
component.prepare_pipeline_deployment(
pipeline=pipeline,
stack=self,
runtime_configuration=runtime_configuration,
)
for component in self.components.values():
component.prepare_pipeline_run()
runtime_configuration[
RUN_NAME_OPTION_KEY
] = runtime_configuration.run_name or (
f"{pipeline.name}-"
f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
)
logger.info(
"Using stack `%s` to run pipeline `%s`...",
self.name,
pipeline.name,
)
start_time = time.time()
return_value = self.orchestrator.run_pipeline(
pipeline, stack=self, runtime_configuration=runtime_configuration
)
run_duration = time.time() - start_time
logger.info(
"Pipeline run `%s` has finished in %s.",
runtime_configuration.run_name,
string_utils.get_human_readable_time(run_duration),
)
for component in self.components.values():
component.cleanup_pipeline_run()
return return_value
@property
def is_provisioned(self) -> bool:
"""If the stack provisioned resources to run locally."""
return all(
component.is_provisioned for component in self.components.values()
)
@property
def is_running(self) -> bool:
"""If the stack is running locally."""
return all(
component.is_running for component in self.components.values()
)
def provision(self) -> None:
"""Provisions resources to run the stack locally.
Raises:
NotImplementedError: If any unprovisioned component does not
implement provisioning.
"""
logger.info("Provisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if not component.is_provisioned:
component.provision()
logger.info("Provisioned resources for %s.", component)
def deprovision(self) -> None:
"""Deprovisions all local resources of the stack.
Raises:
NotImplementedError: If any provisioned component does not
implement deprovisioning.
"""
logger.info("Deprovisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if component.is_provisioned:
try:
component.deprovision()
logger.info("Deprovisioned resources for %s.", component)
except NotImplementedError as e:
logger.warning(e)
def resume(self) -> None:
"""Resumes the provisioned local resources of the stack.
Raises:
ProvisioningError: If any stack component is missing provisioned
resources.
"""
logger.info("Resuming provisioned resources for stack %s.", self.name)
for component in self.components.values():
if component.is_running:
# the component is already running, no need to resume anything
pass
elif component.is_provisioned:
component.resume()
logger.info("Resumed resources for %s.", component)
else:
raise ProvisioningError(
f"Unable to resume resources for {component}: No "
f"resources have been provisioned for this component."
)
def suspend(self) -> None:
"""Suspends the provisioned local resources of the stack."""
logger.info(
"Suspending provisioned resources for stack '%s'.", self.name
)
for component in self.components.values():
if component.is_running:
try:
component.suspend()
logger.info("Suspended resources for %s.", component)
except NotImplementedError:
logger.warning(
"Suspending provisioned resources not implemented "
"for %s. Continuing without suspending resources...",
component,
)
artifact_store: BaseArtifactStore
property
readonly
The artifact store of the stack.
components: Dict[zenml.enums.StackComponentType, StackComponent]
property
readonly
All components of the stack.
container_registry: Optional[BaseContainerRegistry]
property
readonly
The container registry of the stack.
is_provisioned: bool
property
readonly
If the stack provisioned resources to run locally.
is_running: bool
property
readonly
If the stack is running locally.
metadata_store: BaseMetadataStore
property
readonly
The metadata store of the stack.
name: str
property
readonly
The name of the stack.
orchestrator: BaseOrchestrator
property
readonly
The orchestrator of the stack.
runtime_options: Dict[str, Any]
property
readonly
Runtime options that are available to configure this stack.
This method combines the available runtime options for all components
of this stack. See StackComponent.runtime_options()
for
more information.
step_operator: Optional[BaseStepOperator]
property
readonly
The step operator of the stack.
__init__(self, name, *, orchestrator, metadata_store, artifact_store, container_registry=None, step_operator=None)
special
Initializes and validates a stack instance.
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack configuration is not valid. |
Source code in zenml/stack/stack.py
def __init__(
self,
name: str,
*,
orchestrator: "BaseOrchestrator",
metadata_store: "BaseMetadataStore",
artifact_store: "BaseArtifactStore",
container_registry: Optional["BaseContainerRegistry"] = None,
step_operator: Optional["BaseStepOperator"] = None,
):
"""Initializes and validates a stack instance.
Raises:
StackValidationError: If the stack configuration is not valid.
"""
self._name = name
self._orchestrator = orchestrator
self._metadata_store = metadata_store
self._artifact_store = artifact_store
self._container_registry = container_registry
self._step_operator = step_operator
self.validate()
default_local_stack()
classmethod
Creates a stack instance which is configured to run locally.
Source code in zenml/stack/stack.py
@classmethod
def default_local_stack(cls) -> "Stack":
"""Creates a stack instance which is configured to run locally."""
from zenml.artifact_stores import LocalArtifactStore
from zenml.metadata_stores import SQLiteMetadataStore
from zenml.orchestrators import LocalOrchestrator
orchestrator = LocalOrchestrator(name="local_orchestrator")
artifact_store_uuid = uuid.uuid4()
artifact_store_path = os.path.join(
GlobalConfig.config_directory(),
"local_stores",
str(artifact_store_uuid),
)
artifact_store = LocalArtifactStore(
name="local_artifact_store",
uuid=artifact_store_uuid,
path=artifact_store_path,
)
metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
metadata_store = SQLiteMetadataStore(
name="local_metadata_store", uri=metadata_store_path
)
return cls(
name="local_stack",
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
)
deploy_pipeline(self, pipeline, runtime_configuration)
Deploys a pipeline on this stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline to deploy. |
required |
runtime_configuration |
RuntimeConfiguration |
Contains all the runtime configuration options specified for the pipeline run. |
required |
Returns:
Type | Description |
---|---|
Any |
The return value of the call to |
Source code in zenml/stack/stack.py
def deploy_pipeline(
self,
pipeline: "BasePipeline",
runtime_configuration: RuntimeConfiguration,
) -> Any:
"""Deploys a pipeline on this stack.
Args:
pipeline: The pipeline to deploy.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
Returns:
The return value of the call to `orchestrator.run_pipeline(...)`.
"""
for component in self.components.values():
component.prepare_pipeline_deployment(
pipeline=pipeline,
stack=self,
runtime_configuration=runtime_configuration,
)
for component in self.components.values():
component.prepare_pipeline_run()
runtime_configuration[
RUN_NAME_OPTION_KEY
] = runtime_configuration.run_name or (
f"{pipeline.name}-"
f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
)
logger.info(
"Using stack `%s` to run pipeline `%s`...",
self.name,
pipeline.name,
)
start_time = time.time()
return_value = self.orchestrator.run_pipeline(
pipeline, stack=self, runtime_configuration=runtime_configuration
)
run_duration = time.time() - start_time
logger.info(
"Pipeline run `%s` has finished in %s.",
runtime_configuration.run_name,
string_utils.get_human_readable_time(run_duration),
)
for component in self.components.values():
component.cleanup_pipeline_run()
return return_value
deprovision(self)
Deprovisions all local resources of the stack.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If any provisioned component does not implement deprovisioning. |
Source code in zenml/stack/stack.py
def deprovision(self) -> None:
"""Deprovisions all local resources of the stack.
Raises:
NotImplementedError: If any provisioned component does not
implement deprovisioning.
"""
logger.info("Deprovisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if component.is_provisioned:
try:
component.deprovision()
logger.info("Deprovisioned resources for %s.", component)
except NotImplementedError as e:
logger.warning(e)
dict(self)
Converts the stack into a dictionary.
Source code in zenml/stack/stack.py
def dict(self) -> Dict[str, str]:
"""Converts the stack into a dictionary."""
component_dict = {
component_type.value: component.json(sort_keys=True)
for component_type, component in self.components.items()
}
component_dict.update({"name": self.name})
return component_dict
from_components(name, components)
classmethod
Creates a stack instance from a dict of stack components.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the stack. |
required |
components |
Dict[zenml.enums.StackComponentType, StackComponent] |
The components of the stack. |
required |
Returns:
Type | Description |
---|---|
Stack |
A stack instance consisting of the given components. |
Exceptions:
Type | Description |
---|---|
TypeError |
If a required component is missing or a component doesn't inherit from the expected base class. |
Source code in zenml/stack/stack.py
@classmethod
def from_components(
cls, name: str, components: Dict[StackComponentType, "StackComponent"]
) -> "Stack":
"""Creates a stack instance from a dict of stack components.
Args:
name: The name of the stack.
components: The components of the stack.
Returns:
A stack instance consisting of the given components.
Raises:
TypeError: If a required component is missing or a component
doesn't inherit from the expected base class.
"""
from zenml.artifact_stores import BaseArtifactStore
from zenml.container_registries import BaseContainerRegistry
from zenml.metadata_stores import BaseMetadataStore
from zenml.orchestrators import BaseOrchestrator
from zenml.step_operators import BaseStepOperator
def _raise_type_error(
component: Optional["StackComponent"], expected_class: Type[Any]
) -> NoReturn:
"""Raises a TypeError that the component has an unexpected type."""
raise TypeError(
f"Unable to create stack: Wrong stack component type "
f"`{component.__class__.__name__}` (expected: subclass "
f"of `{expected_class.__name__}`)"
)
orchestrator = components.get(StackComponentType.ORCHESTRATOR)
if not isinstance(orchestrator, BaseOrchestrator):
_raise_type_error(orchestrator, BaseOrchestrator)
metadata_store = components.get(StackComponentType.METADATA_STORE)
if not isinstance(metadata_store, BaseMetadataStore):
_raise_type_error(metadata_store, BaseMetadataStore)
artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
if not isinstance(artifact_store, BaseArtifactStore):
_raise_type_error(artifact_store, BaseArtifactStore)
container_registry = components.get(
StackComponentType.CONTAINER_REGISTRY
)
if container_registry is not None and not isinstance(
container_registry, BaseContainerRegistry
):
_raise_type_error(container_registry, BaseContainerRegistry)
step_operator = components.get(StackComponentType.STEP_OPERATOR)
if step_operator is not None and not isinstance(
step_operator, BaseStepOperator
):
_raise_type_error(step_operator, BaseStepOperator)
return Stack(
name=name,
orchestrator=orchestrator,
metadata_store=metadata_store,
artifact_store=artifact_store,
container_registry=container_registry,
step_operator=step_operator,
)
provision(self)
Provisions resources to run the stack locally.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If any unprovisioned component does not implement provisioning. |
Source code in zenml/stack/stack.py
def provision(self) -> None:
"""Provisions resources to run the stack locally.
Raises:
NotImplementedError: If any unprovisioned component does not
implement provisioning.
"""
logger.info("Provisioning resources for stack '%s'.", self.name)
for component in self.components.values():
if not component.is_provisioned:
component.provision()
logger.info("Provisioned resources for %s.", component)
requirements(self, exclude_components=None)
Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in exclude_components
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exclude_components |
Optional[AbstractSet[zenml.enums.StackComponentType]] |
Set of component types for which the requirements should not be included in the output. |
None |
Source code in zenml/stack/stack.py
def requirements(
self,
exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
"""Set of PyPI requirements for the stack.
This method combines the requirements of all stack components (except
the ones specified in `exclude_components`).
Args:
exclude_components: Set of component types for which the
requirements should not be included in the output.
"""
exclude_components = exclude_components or set()
requirements = [
component.requirements
for component in self.components.values()
if component.type not in exclude_components
]
return set.union(*requirements) if requirements else set()
resume(self)
Resumes the provisioned local resources of the stack.
Exceptions:
Type | Description |
---|---|
ProvisioningError |
If any stack component is missing provisioned resources. |
Source code in zenml/stack/stack.py
def resume(self) -> None:
"""Resumes the provisioned local resources of the stack.
Raises:
ProvisioningError: If any stack component is missing provisioned
resources.
"""
logger.info("Resuming provisioned resources for stack %s.", self.name)
for component in self.components.values():
if component.is_running:
# the component is already running, no need to resume anything
pass
elif component.is_provisioned:
component.resume()
logger.info("Resumed resources for %s.", component)
else:
raise ProvisioningError(
f"Unable to resume resources for {component}: No "
f"resources have been provisioned for this component."
)
suspend(self)
Suspends the provisioned local resources of the stack.
Source code in zenml/stack/stack.py
def suspend(self) -> None:
"""Suspends the provisioned local resources of the stack."""
logger.info(
"Suspending provisioned resources for stack '%s'.", self.name
)
for component in self.components.values():
if component.is_running:
try:
component.suspend()
logger.info("Suspended resources for %s.", component)
except NotImplementedError:
logger.warning(
"Suspending provisioned resources not implemented "
"for %s. Continuing without suspending resources...",
component,
)
validate(self)
Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- all components must support the execution mode (either local or
remote execution) specified by the orchestrator of the stack
- the StackValidator
of each stack component has to validate the
stack to make sure all the components are compatible with each other
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack configuration is not valid. |
Source code in zenml/stack/stack.py
def validate(self) -> None:
"""Checks whether the stack configuration is valid.
To check if a stack configuration is valid, the following criteria must
be met:
- all components must support the execution mode (either local or
remote execution) specified by the orchestrator of the stack
- the `StackValidator` of each stack component has to validate the
stack to make sure all the components are compatible with each other
Raises:
StackValidationError: If the stack configuration is not valid.
"""
for component in self.components.values():
if component.validator:
component.validator.validate(stack=self)
stack_component
StackComponent (BaseModel, ABC)
pydantic-model
Abstract StackComponent class for all components of a ZenML stack.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
The name of the component. |
uuid |
UUID |
Unique identifier of the component. |
supports_local_execution |
bool |
If the component supports running locally. |
supports_remote_execution |
bool |
If the component supports running remotely. |
Source code in zenml/stack/stack_component.py
class StackComponent(BaseModel, ABC):
"""Abstract StackComponent class for all components of a ZenML stack.
Attributes:
name: The name of the component.
uuid: Unique identifier of the component.
supports_local_execution: If the component supports running locally.
supports_remote_execution: If the component supports running remotely.
"""
name: str
uuid: UUID = Field(default_factory=uuid4)
supports_local_execution: bool
supports_remote_execution: bool
@property
@abstractmethod
def type(self) -> StackComponentType:
"""The component type."""
@property
@abstractmethod
def flavor(self) -> StackComponentFlavor:
"""The component flavor."""
@property
def log_file(self) -> Optional[str]:
"""Optional path to a log file for the stack component."""
# TODO [ENG-136]: Add support for multiple log files for a stack
# component. E.g. let each component return a generator that yields
# logs instead of specifying a single file path.
return None
@property
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options that are available to configure this component.
The items of the dictionary should map option names (which can be used
to configure the option in the `RuntimeConfiguration`) to default
values for the option (or `None` if there is no default value).
"""
return {}
@property
def requirements(self) -> Set[str]:
"""Set of PyPI requirements for the component."""
return set(get_requirements_for_module(self.__module__))
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed.
Subclasses should override it if they require runtime configuration
options or if they need to run code before the pipeline deployment.
Args:
pipeline: The pipeline that will be deployed.
stack: The stack on which the pipeline will be deployed.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
"""
def prepare_pipeline_run(self) -> None:
"""Prepares running the pipeline."""
def cleanup_pipeline_run(self) -> None:
"""Cleans up resources after the pipeline run is finished."""
@property
def validator(self) -> Optional["StackValidator"]:
"""The optional validator of the stack component.
This validator will be called each time a stack with the stack
component is initialized. Subclasses should override this property
and return a `StackValidator` that makes sure they're not included in
any stack that they're not compatible with.
"""
return None
@property
def is_provisioned(self) -> bool:
"""If the component provisioned resources to run locally."""
return True
@property
def is_running(self) -> bool:
"""If the component is running locally."""
return True
def provision(self) -> None:
"""Provisions resources to run the component locally."""
raise NotImplementedError(
f"Provisioning local resources not implemented for {self}."
)
def deprovision(self) -> None:
"""Deprovisions all local resources of the component."""
raise NotImplementedError(
f"Deprovisioning local resource not implemented for {self}."
)
def resume(self) -> None:
"""Resumes the provisioned local resources of the component."""
raise NotImplementedError(
f"Resuming provisioned resources not implemented for {self}."
)
def suspend(self) -> None:
"""Suspends the provisioned local resources of the component."""
raise NotImplementedError(
f"Suspending provisioned resources not implemented for {self}."
)
def __repr__(self) -> str:
"""String representation of the stack component."""
attribute_representation = ", ".join(
f"{key}={value}" for key, value in self.dict().items()
)
return (
f"{self.__class__.__qualname__}(type={self.type}, "
f"flavor={self.flavor}, {attribute_representation})"
)
def __str__(self) -> str:
"""String representation of the stack component."""
return self.__repr__()
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# exclude these two fields from being serialized
fields = {
"supports_local_execution": {"exclude": True},
"supports_remote_execution": {"exclude": True},
}
flavor: StackComponentFlavor
property
readonly
The component flavor.
is_provisioned: bool
property
readonly
If the component provisioned resources to run locally.
is_running: bool
property
readonly
If the component is running locally.
log_file: Optional[str]
property
readonly
Optional path to a log file for the stack component.
requirements: Set[str]
property
readonly
Set of PyPI requirements for the component.
runtime_options: Dict[str, Any]
property
readonly
Runtime options that are available to configure this component.
The items of the dictionary should map option names (which can be used
to configure the option in the RuntimeConfiguration
) to default
values for the option (or None
if there is no default value).
type: StackComponentType
property
readonly
The component type.
validator: Optional[StackValidator]
property
readonly
The optional validator of the stack component.
This validator will be called each time a stack with the stack
component is initialized. Subclasses should override this property
and return a StackValidator
that makes sure they're not included in
any stack that they're not compatible with.
Config
Pydantic configuration class.
Source code in zenml/stack/stack_component.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# exclude these two fields from being serialized
fields = {
"supports_local_execution": {"exclude": True},
"supports_remote_execution": {"exclude": True},
}
__repr__(self)
special
String representation of the stack component.
Source code in zenml/stack/stack_component.py
def __repr__(self) -> str:
"""String representation of the stack component."""
attribute_representation = ", ".join(
f"{key}={value}" for key, value in self.dict().items()
)
return (
f"{self.__class__.__qualname__}(type={self.type}, "
f"flavor={self.flavor}, {attribute_representation})"
)
__str__(self)
special
String representation of the stack component.
Source code in zenml/stack/stack_component.py
def __str__(self) -> str:
"""String representation of the stack component."""
return self.__repr__()
cleanup_pipeline_run(self)
Cleans up resources after the pipeline run is finished.
Source code in zenml/stack/stack_component.py
def cleanup_pipeline_run(self) -> None:
"""Cleans up resources after the pipeline run is finished."""
deprovision(self)
Deprovisions all local resources of the component.
Source code in zenml/stack/stack_component.py
def deprovision(self) -> None:
"""Deprovisions all local resources of the component."""
raise NotImplementedError(
f"Deprovisioning local resource not implemented for {self}."
)
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)
Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed. Subclasses should override it if they require runtime configuration options or if they need to run code before the pipeline deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline that will be deployed. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
runtime_configuration |
RuntimeConfiguration |
Contains all the runtime configuration options specified for the pipeline run. |
required |
Source code in zenml/stack/stack_component.py
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Prepares deploying the pipeline.
This method gets called immediately before a pipeline is deployed.
Subclasses should override it if they require runtime configuration
options or if they need to run code before the pipeline deployment.
Args:
pipeline: The pipeline that will be deployed.
stack: The stack on which the pipeline will be deployed.
runtime_configuration: Contains all the runtime configuration
options specified for the pipeline run.
"""
prepare_pipeline_run(self)
Prepares running the pipeline.
Source code in zenml/stack/stack_component.py
def prepare_pipeline_run(self) -> None:
"""Prepares running the pipeline."""
provision(self)
Provisions resources to run the component locally.
Source code in zenml/stack/stack_component.py
def provision(self) -> None:
"""Provisions resources to run the component locally."""
raise NotImplementedError(
f"Provisioning local resources not implemented for {self}."
)
resume(self)
Resumes the provisioned local resources of the component.
Source code in zenml/stack/stack_component.py
def resume(self) -> None:
"""Resumes the provisioned local resources of the component."""
raise NotImplementedError(
f"Resuming provisioned resources not implemented for {self}."
)
suspend(self)
Suspends the provisioned local resources of the component.
Source code in zenml/stack/stack_component.py
def suspend(self) -> None:
"""Suspends the provisioned local resources of the component."""
raise NotImplementedError(
f"Suspending provisioned resources not implemented for {self}."
)
stack_component_class_registry
StackComponentClassRegistry
Registry for stack component classes.
All stack component classes must be registered here so they can be instantiated from the component type and flavor specified inside the ZenML repository configuration.
Source code in zenml/stack/stack_component_class_registry.py
class StackComponentClassRegistry:
"""Registry for stack component classes.
All stack component classes must be registered here so they can be
instantiated from the component type and flavor specified inside the
ZenML repository configuration.
"""
component_classes: ClassVar[
DefaultDict[StackComponentType, Dict[str, Type[StackComponent]]]
] = defaultdict(dict)
@classmethod
def register_class(
cls,
component_type: StackComponentType,
component_flavor: StackComponentFlavor,
component_class: Type[StackComponent],
) -> None:
"""Registers a stack component class.
Args:
component_type: The type of the component class to register.
component_flavor: The flavor of the component class to register.
component_class: The component class to register.
"""
component_flavor = component_flavor.value
flavors = cls.component_classes[component_type]
if component_flavor in flavors:
logger.warning(
"Overwriting previously registered stack component class `%s` "
"for type '%s' and flavor '%s'.",
flavors[component_flavor].__class__.__name__,
component_type.value,
component_flavor,
)
flavors[component_flavor] = component_class
logger.debug(
"Registered stack component class for type '%s' and flavor '%s'.",
component_type.value,
component_flavor,
)
@classmethod
def get_class(
cls,
component_type: StackComponentType,
component_flavor: Union[StackComponentFlavor, str],
) -> Type[StackComponent]:
"""Returns the stack component class for the given type and flavor.
Args:
component_type: The type of the component class to return.
component_flavor: The flavor of the component class to return.
Raises:
KeyError: If no component class is registered for the given type
and flavor.
"""
if isinstance(component_flavor, StackComponentFlavor):
component_flavor = component_flavor.value
available_flavors = cls.component_classes[component_type]
try:
return available_flavors[component_flavor]
except KeyError:
# The stack component might be part of an integration
# -> Activate the integrations and try again
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
try:
return available_flavors[component_flavor]
except KeyError:
raise KeyError(
f"No stack component class found for type {component_type} "
f"and flavor {component_flavor}. Registered flavors for "
f"this type: {set(available_flavors)}. If your stack "
f"component class is part of a ZenML integration, make "
f"sure the corresponding integration is installed by "
f"running `zenml integration install INTEGRATION_NAME`."
) from None
get_class(component_type, component_flavor)
classmethod
Returns the stack component class for the given type and flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
The type of the component class to return. |
required |
component_flavor |
Union[zenml.enums.StackComponentFlavor, str] |
The flavor of the component class to return. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If no component class is registered for the given type and flavor. |
Source code in zenml/stack/stack_component_class_registry.py
@classmethod
def get_class(
cls,
component_type: StackComponentType,
component_flavor: Union[StackComponentFlavor, str],
) -> Type[StackComponent]:
"""Returns the stack component class for the given type and flavor.
Args:
component_type: The type of the component class to return.
component_flavor: The flavor of the component class to return.
Raises:
KeyError: If no component class is registered for the given type
and flavor.
"""
if isinstance(component_flavor, StackComponentFlavor):
component_flavor = component_flavor.value
available_flavors = cls.component_classes[component_type]
try:
return available_flavors[component_flavor]
except KeyError:
# The stack component might be part of an integration
# -> Activate the integrations and try again
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
try:
return available_flavors[component_flavor]
except KeyError:
raise KeyError(
f"No stack component class found for type {component_type} "
f"and flavor {component_flavor}. Registered flavors for "
f"this type: {set(available_flavors)}. If your stack "
f"component class is part of a ZenML integration, make "
f"sure the corresponding integration is installed by "
f"running `zenml integration install INTEGRATION_NAME`."
) from None
register_class(component_type, component_flavor, component_class)
classmethod
Registers a stack component class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
The type of the component class to register. |
required |
component_flavor |
StackComponentFlavor |
The flavor of the component class to register. |
required |
component_class |
Type[zenml.stack.stack_component.StackComponent] |
The component class to register. |
required |
Source code in zenml/stack/stack_component_class_registry.py
@classmethod
def register_class(
cls,
component_type: StackComponentType,
component_flavor: StackComponentFlavor,
component_class: Type[StackComponent],
) -> None:
"""Registers a stack component class.
Args:
component_type: The type of the component class to register.
component_flavor: The flavor of the component class to register.
component_class: The component class to register.
"""
component_flavor = component_flavor.value
flavors = cls.component_classes[component_type]
if component_flavor in flavors:
logger.warning(
"Overwriting previously registered stack component class `%s` "
"for type '%s' and flavor '%s'.",
flavors[component_flavor].__class__.__name__,
component_type.value,
component_flavor,
)
flavors[component_flavor] = component_class
logger.debug(
"Registered stack component class for type '%s' and flavor '%s'.",
component_type.value,
component_flavor,
)
register_stack_component_class(component_type, component_flavor)
Parametrized decorator function to register stack component classes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
StackComponentType |
The type of the component class to register. |
required |
component_flavor |
StackComponentFlavor |
The flavor of the component class to register. |
required |
Returns:
Type | Description |
---|---|
Callable[[Type[~C]], Type[~C]] |
A decorator function that registers and returns the decorated stack component class. |
Source code in zenml/stack/stack_component_class_registry.py
def register_stack_component_class(
component_type: StackComponentType, component_flavor: StackComponentFlavor
) -> Callable[[Type[C]], Type[C]]:
"""Parametrized decorator function to register stack component classes.
Args:
component_type: The type of the component class to register.
component_flavor: The flavor of the component class to register.
Returns:
A decorator function that registers and returns the decorated stack
component class.
"""
def decorator_function(cls: Type[C]) -> Type[C]:
"""Registers the stack component class and returns it unmodified."""
StackComponentClassRegistry.register_class(
component_type=component_type,
component_flavor=component_flavor,
component_class=cls,
)
return cls
return decorator_function
stack_validator
StackValidator
A StackValidator
is used to validate a stack configuration.
Each StackComponent
can provide a StackValidator
to make sure it is
compatible with all components of the stack. The KubeflowOrchestrator
for example will always require the stack to have a container registry
in order to push the docker images that are required to run a pipeline
in Kubeflow Pipelines.
Source code in zenml/stack/stack_validator.py
class StackValidator:
"""A `StackValidator` is used to validate a stack configuration.
Each `StackComponent` can provide a `StackValidator` to make sure it is
compatible with all components of the stack. The `KubeflowOrchestrator`
for example will always require the stack to have a container registry
in order to push the docker images that are required to run a pipeline
in Kubeflow Pipelines.
"""
def __init__(
self,
required_components: Optional[AbstractSet[StackComponentType]] = None,
custom_validation_function: Optional[Callable[["Stack"], bool]] = None,
):
"""Initializes a `StackValidator` instance.
Args:
required_components: Optional set of stack components that must
exist in the stack.
custom_validation_function: Optional function that returns whether
a stack is valid.
"""
self._required_components = required_components or set()
self._custom_validation_function = custom_validation_function
def validate(self, stack: "Stack") -> None:
"""Validates the given stack.
Checks if the stack contains all the required components and passes
the custom validation function of the validator.
Raises:
StackValidationError: If the stack does not meet all the
validation criteria.
"""
missing_components = self._required_components - set(stack.components)
if missing_components:
raise StackValidationError(
f"Missing stack components {missing_components} for "
f"stack: {stack}"
)
if (
self._custom_validation_function
and not self._custom_validation_function(stack)
):
raise StackValidationError(
f"Custom validation function failed to validate "
f"stack: {stack}"
)
__init__(self, required_components=None, custom_validation_function=None)
special
Initializes a StackValidator
instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
required_components |
Optional[AbstractSet[zenml.enums.StackComponentType]] |
Optional set of stack components that must exist in the stack. |
None |
custom_validation_function |
Optional[Callable[[Stack], bool]] |
Optional function that returns whether a stack is valid. |
None |
Source code in zenml/stack/stack_validator.py
def __init__(
self,
required_components: Optional[AbstractSet[StackComponentType]] = None,
custom_validation_function: Optional[Callable[["Stack"], bool]] = None,
):
"""Initializes a `StackValidator` instance.
Args:
required_components: Optional set of stack components that must
exist in the stack.
custom_validation_function: Optional function that returns whether
a stack is valid.
"""
self._required_components = required_components or set()
self._custom_validation_function = custom_validation_function
validate(self, stack)
Validates the given stack.
Checks if the stack contains all the required components and passes the custom validation function of the validator.
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the stack does not meet all the validation criteria. |
Source code in zenml/stack/stack_validator.py
def validate(self, stack: "Stack") -> None:
"""Validates the given stack.
Checks if the stack contains all the required components and passes
the custom validation function of the validator.
Raises:
StackValidationError: If the stack does not meet all the
validation criteria.
"""
missing_components = self._required_components - set(stack.components)
if missing_components:
raise StackValidationError(
f"Missing stack components {missing_components} for "
f"stack: {stack}"
)
if (
self._custom_validation_function
and not self._custom_validation_function(stack)
):
raise StackValidationError(
f"Custom validation function failed to validate "
f"stack: {stack}"
)