Pipelines
zenml.pipelines
special
A ZenML pipeline consists of tasks that execute in order and yield artifacts.
The artifacts are automatically stored within the artifact store and metadata
is tracked by ZenML. Each individual task within a pipeline is known as a
step. The standard pipelines within ZenML are designed to have easy interfaces
to add pre-decided steps, with the order also pre-decided. Other sorts of
pipelines can be created as well from scratch, building on the BasePipeline
class.
Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run
, a pipeline is compiled and passed directly to the orchestrator.
base_pipeline
Legacy ZenML pipeline class definition.
BasePipeline (Pipeline, ABC)
Legacy pipeline class.
Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(Pipeline, ABC):
"""Legacy pipeline class."""
_CLASS_CONFIGURATION: ClassVar[Optional[Dict[str, Any]]] = None
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initializes a pipeline.
Args:
*args: Initialization arguments.
**kwargs: Initialization keyword arguments.
"""
config = self._CLASS_CONFIGURATION or {}
pipeline_name = (
config.pop(PARAM_PIPELINE_NAME, None) or self.__class__.__name__
)
self._steps = self._verify_steps(
*args, __name__=pipeline_name, **kwargs
)
def entrypoint() -> None:
self.connect(**self._steps)
super().__init__(
name=pipeline_name,
entrypoint=entrypoint,
**config,
)
@property
def steps(self) -> Dict[str, BaseStep]:
"""Returns the steps of the pipeline.
Returns:
The steps of the pipeline.
"""
return self._steps
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
"""Abstract method that connects the pipeline steps.
Args:
*args: Connect method arguments.
**kwargs: Connect method keyword arguments.
"""
raise NotImplementedError
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.__class__)
@property
def source_object(self) -> Any:
"""The source object of this pipeline.
Returns:
The source object of this pipeline.
"""
return self.connect
def run(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
) -> None:
"""Runs the pipeline on the active stack.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
enable_artifact_metadata: If artifact metadata should be enabled
for this pipeline run.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
settings: Settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
"""
pipeline_copy = self.with_options(
run_name=run_name,
schedule=schedule,
build=build,
step_configurations=step_configurations,
config_path=config_path,
unlisted=unlisted,
prevent_build_reuse=prevent_build_reuse,
)
new_run_args = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"settings": settings,
"extra": extra,
}
)
pipeline_copy._run_args.update(new_run_args)
pipeline_copy()
def _compute_invocation_id(
self,
step: "BaseStep",
custom_id: Optional[str] = None,
allow_suffix: bool = True,
) -> str:
"""Compute the invocation ID.
Args:
step: The step for which to compute the ID.
custom_id: Custom ID to use for the invocation.
allow_suffix: Whether a suffix can be appended to the invocation
ID.
Returns:
The invocation ID.
"""
custom_id = getattr(step, TEMPLATE_NAME_ATTRIBUTE, None)
return super()._compute_invocation_id(
step=step, custom_id=custom_id, allow_suffix=False
)
def _verify_steps(
self, *args: Any, __name__: str, **kwargs: Any
) -> Dict[str, "BaseStep"]:
"""Verifies the initialization args and kwargs of this pipeline.
This method makes sure that no missing/unexpected arguments or
arguments of a wrong type are passed when creating a pipeline.
Args:
*args: The args passed to the init method of this pipeline.
__name__: The pipeline name. The naming of this argument is to avoid
conflicts with other arguments.
**kwargs: The kwargs passed to the init method of this pipeline.
Raises:
PipelineInterfaceError: If there are too many/few arguments or
arguments with a wrong name/type.
Returns:
The verified steps.
"""
signature = inspect.signature(self.connect, follow_wrapped=True)
try:
bound_args = signature.bind(*args, **kwargs)
except TypeError as e:
raise PipelineInterfaceError(
f"Wrong arguments when initializing pipeline '{__name__}': {e}"
) from e
steps = {}
for key, potential_step in bound_args.arguments.items():
step_class = type(potential_step)
if inspect.isclass(potential_step) and issubclass(
potential_step, BaseStep
):
raise PipelineInterfaceError(
f"Wrong argument type (`{step_class}`) for argument "
f"'{key}' of pipeline '{__name__}'. "
f"A `BaseStep` subclass was provided instead of an "
f"instance. "
f"This might have been caused due to missing brackets of "
f"your steps when creating a pipeline with `@step` "
f"decorated functions, "
f"for which the correct syntax is `pipeline(step=step())`."
)
if not isinstance(potential_step, BaseStep):
raise PipelineInterfaceError(
f"Wrong argument type (`{step_class}`) for argument "
f"'{key}' of pipeline '{__name__}'. Only "
f"`@step` decorated functions or instances of `BaseStep` "
f"subclasses can be used as arguments when creating "
f"a pipeline."
)
steps[key] = potential_step
setattr(potential_step, TEMPLATE_NAME_ATTRIBUTE, key)
return steps
source_object: Any
property
readonly
The source object of this pipeline.
Returns:
Type | Description |
---|---|
Any |
The source object of this pipeline. |
steps: Dict[str, zenml.steps.base_step.BaseStep]
property
readonly
Returns the steps of the pipeline.
Returns:
Type | Description |
---|---|
Dict[str, zenml.steps.base_step.BaseStep] |
The steps of the pipeline. |
__init__(self, *args, **kwargs)
special
Initializes a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Initialization arguments. |
() |
**kwargs |
Any |
Initialization keyword arguments. |
{} |
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initializes a pipeline.
Args:
*args: Initialization arguments.
**kwargs: Initialization keyword arguments.
"""
config = self._CLASS_CONFIGURATION or {}
pipeline_name = (
config.pop(PARAM_PIPELINE_NAME, None) or self.__class__.__name__
)
self._steps = self._verify_steps(
*args, __name__=pipeline_name, **kwargs
)
def entrypoint() -> None:
self.connect(**self._steps)
super().__init__(
name=pipeline_name,
entrypoint=entrypoint,
**config,
)
connect(self, *args, **kwargs)
Abstract method that connects the pipeline steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
BaseStep |
Connect method arguments. |
() |
**kwargs |
BaseStep |
Connect method keyword arguments. |
{} |
Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
"""Abstract method that connects the pipeline steps.
Args:
*args: Connect method arguments.
**kwargs: Connect method keyword arguments.
"""
raise NotImplementedError
resolve(self)
Resolves the pipeline.
Returns:
Type | Description |
---|---|
Source |
The pipeline source. |
Source code in zenml/pipelines/base_pipeline.py
def resolve(self) -> "Source":
"""Resolves the pipeline.
Returns:
The pipeline source.
"""
return source_utils.resolve(self.__class__)
run(self, *, run_name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, schedule=None, build=None, settings=None, step_configurations=None, extra=None, config_path=None, unlisted=False, prevent_build_reuse=False)
Runs the pipeline on the active stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
Optional[str] |
Name of the pipeline run. |
None |
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline run. |
None |
enable_artifact_metadata |
Optional[bool] |
If artifact metadata should be enabled for this pipeline run. |
None |
enable_artifact_visualization |
Optional[bool] |
If artifact visualization should be enabled for this pipeline run. |
None |
schedule |
Optional[zenml.config.schedule.Schedule] |
Optional schedule to use for the run. |
None |
build |
Union[str, UUID, PipelineBuildBaseModel] |
Optional build to use for the run. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
Settings for this pipeline run. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline run. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a
|
None |
unlisted |
bool |
Whether the pipeline run should be unlisted (not assigned to any pipeline). |
False |
prevent_build_reuse |
bool |
Whether to prevent the reuse of a build. |
False |
Source code in zenml/pipelines/base_pipeline.py
def run(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
schedule: Optional[Schedule] = None,
build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
prevent_build_reuse: bool = False,
) -> None:
"""Runs the pipeline on the active stack.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
enable_artifact_metadata: If artifact metadata should be enabled
for this pipeline run.
enable_artifact_visualization: If artifact visualization should be
enabled for this pipeline run.
schedule: Optional schedule to use for the run.
build: Optional build to use for the run.
settings: Settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a
`zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
prevent_build_reuse: Whether to prevent the reuse of a build.
"""
pipeline_copy = self.with_options(
run_name=run_name,
schedule=schedule,
build=build,
step_configurations=step_configurations,
config_path=config_path,
unlisted=unlisted,
prevent_build_reuse=prevent_build_reuse,
)
new_run_args = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"enable_artifact_metadata": enable_artifact_metadata,
"enable_artifact_visualization": enable_artifact_visualization,
"settings": settings,
"extra": extra,
}
)
pipeline_copy._run_args.update(new_run_args)
pipeline_copy()
pipeline_decorator
Legacy ZenML pipeline decorator definition.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, settings=None, extra=None, on_failure=None, on_success=None)
Outer decorator function for the creation of a ZenML pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[~F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
Optional[bool] |
Whether to use caching or not. |
None |
enable_artifact_metadata |
Optional[bool] |
Whether to enable artifact metadata or not. |
None |
enable_artifact_visualization |
Optional[bool] |
Whether to enable artifact visualization. |
None |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
on_failure |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be
a function with three possible parameters,
|
None |
on_success |
Optional[HookSpecification] |
Callback function in event of failure of the step. Can be
a function with two possible parameters, |
None |
Returns:
Type | Description |
---|---|
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]] |
the inner decorator which creates the pipeline class based on the ZenML BasePipeline |
Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
_func: Optional[F] = None,
*,
name: Optional[str] = None,
enable_cache: Optional[bool] = None,
enable_artifact_metadata: Optional[bool] = None,
enable_artifact_visualization: Optional[bool] = None,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
on_failure: Optional["HookSpecification"] = None,
on_success: Optional["HookSpecification"] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
"""Outer decorator function for the creation of a ZenML pipeline.
Args:
_func: The decorated function.
name: The name of the pipeline. If left empty, the name of the
decorated function will be used as a fallback.
enable_cache: Whether to use caching or not.
enable_artifact_metadata: Whether to enable artifact metadata or not.
enable_artifact_visualization: Whether to enable artifact visualization.
settings: Settings for this pipeline.
extra: Extra configurations for this pipeline.
on_failure: Callback function in event of failure of the step. Can be
a function with three possible parameters,
`StepContext`, `BaseParameters`, and `BaseException`,
or a source path to a function of the same specifications
(e.g. `module.my_function`).
on_success: Callback function in event of failure of the step. Can be
a function with two possible parameters, `StepContext` and
`BaseParameters, or a source path to a function of the same specifications
(e.g. `module.my_function`).
Returns:
the inner decorator which creates the pipeline class based on the
ZenML BasePipeline
"""
logger.warning(
"The `@pipeline` decorator that you use to define your pipeline is "
"deprecated. Check out our docs https://docs.zenml.io for "
"information on how to define pipelines in a more intuitive and "
"flexible way!"
)
def inner_decorator(func: F) -> Type[BasePipeline]:
return type(
name or func.__name__,
(BasePipeline,),
{
PIPELINE_INNER_FUNC_NAME: staticmethod(func), # type: ignore[arg-type]
CLASS_CONFIGURATION: {
PARAM_PIPELINE_NAME: name,
PARAM_ENABLE_CACHE: enable_cache,
PARAM_ENABLE_ARTIFACT_METADATA: enable_artifact_metadata,
PARAM_ENABLE_ARTIFACT_VISUALIZATION: enable_artifact_visualization,
PARAM_SETTINGS: settings,
PARAM_EXTRA_OPTIONS: extra,
PARAM_ON_FAILURE: on_failure,
PARAM_ON_SUCCESS: on_success,
},
"__module__": func.__module__,
"__doc__": func.__doc__,
},
)
if _func is None:
return inner_decorator
else:
return inner_decorator(_func)