Pipelines
zenml.pipelines
special
A ZenML pipeline consists of tasks that execute in order and yield artifacts.
The artifacts are stored within the artifact store and indexed
via the metadata store. Each individual task within a pipeline is known as a
step. The standard pipelines within ZenML are designed to have easy interfaces
to add pre-decided steps, with the order also pre-decided. Other sorts of
pipelines can be created as well from scratch, building on the BasePipeline
class.
Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run
, a pipeline is compiled and passed directly to the orchestrator.
base_pipeline
Abstract base class for all ZenML pipelines.
BasePipeline
Abstract base class for all ZenML pipelines.
Attributes:
Name | Type | Description |
---|---|---|
name |
The name of this pipeline. |
|
enable_cache |
A boolean indicating if caching is enabled for this pipeline. |
|
requirements_file |
DEPRECATED: Optional path to a pip requirements file
that contains all requirements to run the pipeline. (Use
|
|
requirements |
Optional list of (string) pip requirements to run the |
|
required_integrations |
Optional set of integrations that need to be installed for this pipeline to run. |
Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(metaclass=BasePipelineMeta):
"""Abstract base class for all ZenML pipelines.
Attributes:
name: The name of this pipeline.
enable_cache: A boolean indicating if caching is enabled for this
pipeline.
requirements_file: DEPRECATED: Optional path to a pip requirements file
that contains all requirements to run the pipeline. (Use
`requirements` instead.)
requirements: Optional list of (string) pip requirements to run the
pipeline, or a string path to a requirements file.
required_integrations: Optional set of integrations that need to be
installed for this pipeline to run.
"""
STEP_SPEC: ClassVar[Dict[str, Any]] = None # type: ignore[assignment]
INSTANCE_CONFIGURATION: Dict[Text, Any] = {}
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
"""Initialize the BasePipeline.
Args:
*args: The steps to be executed by this pipeline.
**kwargs: The configuration for this pipeline.
"""
kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
self.secrets = kwargs.pop(PARAM_SECRETS, [])
self.docker_configuration = (
kwargs.pop(PARAM_DOCKER_CONFIGURATION, None)
or DockerConfiguration()
)
self._migrate_to_docker_config(kwargs)
self.name = self.__class__.__name__
self.__steps: Dict[str, BaseStep] = {}
self._verify_arguments(*args, **kwargs)
def _migrate_to_docker_config(self, kwargs: Dict[str, Any]) -> None:
"""Migrates legacy requirements to the new Docker configuration.
Args:
kwargs: Keyword arguments passed during pipeline initialization.
"""
attributes = [
(PARAM_REQUIREMENTS, "requirements"),
(PARAM_REQUIRED_INTEGRATIONS, "required_integrations"),
(PARAM_DOCKERIGNORE_FILE, "dockerignore"),
]
updates = {}
for kwarg_name, docker_config_attribute in attributes:
value = kwargs.pop(kwarg_name, None)
if value:
logger.warning(
f"The `{kwarg_name}` parameter has been deprecated. Please "
"use the `docker_configuration` parameter instead."
)
if getattr(self.docker_configuration, docker_config_attribute):
logger.warning(
f"Parameter {kwarg_name} was defined on the pipeline "
"as well as in the docker configuration. Ignoring the "
"value defined on the pipeline."
)
else:
# No value set on the docker config, migrate the old one
updates[docker_config_attribute] = value
if updates:
self.docker_configuration = self.docker_configuration.copy(
update=updates
)
def _verify_arguments(self, *steps: BaseStep, **kw_steps: BaseStep) -> None:
"""Verifies the initialization args and kwargs of this pipeline.
This method makes sure that no missing/unexpected arguments or
arguments of a wrong type are passed when creating a pipeline. If
all arguments are correct, saves the steps to `self.__steps`.
Args:
*steps: The args passed to the init method of this pipeline.
**kw_steps: The kwargs passed to the init method of this pipeline.
Raises:
PipelineInterfaceError: If there are too many/few arguments or
arguments with a wrong name/type.
"""
input_step_keys = list(self.STEP_SPEC.keys())
if len(steps) > len(input_step_keys):
raise PipelineInterfaceError(
f"Too many input steps for pipeline '{self.name}'. "
f"This pipeline expects {len(input_step_keys)} step(s) "
f"but got {len(steps) + len(kw_steps)}."
)
combined_steps = {}
step_classes: Dict[Type[BaseStep], str] = {}
def _verify_step(key: str, step: BaseStep) -> None:
"""Verifies a single step of the pipeline.
Args:
key: The key of the step.
step: The step to verify.
Raises:
PipelineInterfaceError: If the step is not of the correct type
or is of the same class as another step.
"""
step_class = type(step)
if isinstance(step, BaseStepMeta):
raise PipelineInterfaceError(
f"Wrong argument type (`{step_class}`) for argument "
f"'{key}' of pipeline '{self.name}'. "
f"A `BaseStep` subclass was provided instead of an "
f"instance. "
f"This might have been caused due to missing brackets of "
f"your steps when creating a pipeline with `@step` "
f"decorated functions, "
f"for which the correct syntax is `pipeline(step=step())`."
)
if not isinstance(step, BaseStep):
raise PipelineInterfaceError(
f"Wrong argument type (`{step_class}`) for argument "
f"'{key}' of pipeline '{self.name}'. Only "
f"`@step` decorated functions or instances of `BaseStep` "
f"subclasses can be used as arguments when creating "
f"a pipeline."
)
if step_class in step_classes:
previous_key = step_classes[step_class]
raise PipelineInterfaceError(
f"Found multiple step objects of the same class "
f"(`{step_class}`) for arguments '{previous_key}' and "
f"'{key}' in pipeline '{self.name}'. Only one step object "
f"per class is allowed inside a ZenML pipeline. A possible "
f"solution is to use the "
f"{clone_step.__module__}.{clone_step.__name__} utility to "
f"create multiple copies of the same step."
)
step.pipeline_parameter_name = key
combined_steps[key] = step
step_classes[step_class] = key
# verify args
for i, step in enumerate(steps):
key = input_step_keys[i]
_verify_step(key, step)
# verify kwargs
for key, step in kw_steps.items():
if key in combined_steps:
# a step for this key was already set by
# the positional input steps
raise PipelineInterfaceError(
f"Unexpected keyword argument '{key}' for pipeline "
f"'{self.name}'. A step for this key was "
f"already passed as a positional argument."
)
_verify_step(key, step)
# check if there are any missing or unexpected steps
expected_steps = set(self.STEP_SPEC.keys())
actual_steps = set(combined_steps.keys())
missing_steps = expected_steps - actual_steps
unexpected_steps = actual_steps - expected_steps
if missing_steps:
raise PipelineInterfaceError(
f"Missing input step(s) for pipeline "
f"'{self.name}': {missing_steps}."
)
if unexpected_steps:
raise PipelineInterfaceError(
f"Unexpected input step(s) for pipeline "
f"'{self.name}': {unexpected_steps}. This pipeline "
f"only requires the following steps: {expected_steps}."
)
self.__steps = combined_steps
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
"""Function that connects inputs and outputs of the pipeline steps.
Args:
*args: The positional arguments passed to the pipeline.
**kwargs: The keyword arguments passed to the pipeline.
Raises:
NotImplementedError: Always.
"""
raise NotImplementedError
@property
def steps(self) -> Dict[str, BaseStep]:
"""Returns a dictionary of pipeline steps.
Returns:
A dictionary of pipeline steps.
"""
return self.__steps
@steps.setter
def steps(self, steps: Dict[str, BaseStep]) -> NoReturn:
"""Setting the steps property is not allowed.
Args:
steps: The steps to set.
Raises:
PipelineInterfaceError: Always.
"""
raise PipelineInterfaceError("Cannot set steps manually!")
def validate_stack(self, stack: "Stack") -> None:
"""Validates if a stack is able to run this pipeline.
Args:
stack: The stack to validate.
Raises:
StackValidationError: If the step operator is not configured in the
active stack.
"""
available_step_operators = (
{stack.step_operator.name} if stack.step_operator else set()
)
for step in self.steps.values():
if (
step.custom_step_operator
and step.custom_step_operator not in available_step_operators
):
raise StackValidationError(
f"Step '{step.name}' requires custom step operator "
f"'{step.custom_step_operator}' which is not configured in "
f"the active stack. Available step operators: "
f"{available_step_operators}."
)
def _reset_step_flags(self) -> None:
"""Reset the `_has_been_called` flag at the beginning of a pipeline run.
This ensures a pipeline instance can be called more than once.
"""
for step in self.steps.values():
step._has_been_called = False
def run(
self,
*,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
**additional_parameters: Any,
) -> Any:
"""Runs the pipeline on the active stack of the current repository.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule of the pipeline.
additional_parameters: Additional parameters to pass to the
pipeline.
Returns:
The result of the pipeline.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls inside docker containers which should only
# run a single step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
logger.info("Creating run for pipeline: `%s`", self.name)
logger.info(
f'Cache {"enabled" if self.enable_cache else "disabled"} for '
f"pipeline `{self.name}`"
)
# Activating the built-in integrations through lazy loading
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
if not Environment.in_notebook():
# Path of the file where pipeline.run() was called. This is needed by
# the airflow orchestrator so it knows which file to copy into the DAG
# directory
dag_filepath = io_utils.resolve_relative_path(
inspect.currentframe().f_back.f_code.co_filename # type: ignore[union-attr]
)
additional_parameters.setdefault("dag_filepath", dag_filepath)
runtime_configuration = RuntimeConfiguration(
run_name=run_name,
schedule=schedule,
**additional_parameters,
)
stack = Repository().active_stack
stack_metadata = {
component_type.value: component.FLAVOR
for component_type, component in stack.components.items()
}
track_event(
event=AnalyticsEvent.RUN_PIPELINE,
metadata={
"store_type": Repository().active_profile.store_type.value,
**stack_metadata,
"total_steps": len(self.steps),
"schedule": bool(schedule),
},
)
self._reset_step_flags()
self.validate_stack(stack)
# Prevent execution of nested pipelines which might lead to unexpected
# behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
return_value = stack.deploy_pipeline(
self, runtime_configuration=runtime_configuration
)
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
return return_value
def with_config(
self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
"""Configures this pipeline using a yaml file.
Args:
config_file: Path to a yaml file which contains configuration
options for running this pipeline. See
https://docs.zenml.io/developer-guide/steps-and-pipelines/runtime-configuration#configuring-with-yaml-config-files
for details regarding the specification of this file.
overwrite_step_parameters: If set to `True`, values from the
configuration file will overwrite configuration parameters
passed in code.
Returns:
The pipeline object that this method was called on.
"""
config_yaml = yaml_utils.read_yaml(config_file)
if PipelineConfigurationKeys.STEPS in config_yaml:
self._read_config_steps(
config_yaml[PipelineConfigurationKeys.STEPS],
overwrite=overwrite_step_parameters,
)
return self
def _read_config_steps(
self, steps: Dict[str, Dict[str, Any]], overwrite: bool = False
) -> None:
"""Reads and sets step parameters from a config file.
Args:
steps: Maps step names to dicts of parameter names and values.
overwrite: If `True`, overwrite previously set step parameters.
Raises:
PipelineConfigurationError: If the configuration file contains
invalid data.
DuplicatedConfigurationError: If the configuration file contains
duplicate step names.
"""
for step_name, step_dict in steps.items():
StepConfigurationKeys.key_check(step_dict)
if step_name not in self.__steps:
raise PipelineConfigurationError(
f"Found '{step_name}' step in configuration yaml but it "
f"doesn't exist in the pipeline steps "
f"{list(self.__steps.keys())}."
)
step = self.__steps[step_name]
step_parameters = (
step.CONFIG_CLASS.__fields__.keys() if step.CONFIG_CLASS else {}
)
parameters = step_dict.get(StepConfigurationKeys.PARAMETERS_, {})
# pop the enable_cache
if PARAM_ENABLE_CACHE in parameters:
enable_cache = parameters.pop(PARAM_ENABLE_CACHE)
self.steps[step_name].enable_cache = enable_cache
for parameter, value in parameters.items():
if parameter not in step_parameters:
raise PipelineConfigurationError(
f"Found parameter '{parameter}' for '{step_name}' step "
f"in configuration yaml but it doesn't exist in the "
f"configuration class `{step.CONFIG_CLASS}`. Available "
f"parameters for this step: "
f"{list(step_parameters)}."
)
previous_value = step.PARAM_SPEC.get(parameter, None)
if overwrite:
step.PARAM_SPEC[parameter] = value
else:
step.PARAM_SPEC.setdefault(parameter, value)
if overwrite or not previous_value:
logger.debug(
"Setting parameter %s=%s for step '%s'.",
parameter,
value,
step_name,
)
if previous_value and not overwrite:
raise DuplicatedConfigurationError(
"The value for parameter '{}' is set twice for step "
"'{}' ({} vs. {}). This can happen when you "
"instantiate your step with a step configuration that "
"sets the parameter, while also setting the same "
"parameter within a config file that is added to the "
"pipeline instance using the `.with_config()` method. "
"Make sure each parameter is only defined **once**. \n"
"While it is not recommended, you can overwrite the "
"step configuration using the configuration file: \n"
"`.with_config('config.yaml', "
"overwrite_step_parameters=True)".format(
parameter, step_name, previous_value, value
)
)
@classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
"""Get all past runs from the associated PipelineView.
Returns:
A list of all past PipelineRunViews.
Raises:
RuntimeError: In case the repository does not contain the view
of the current pipeline.
"""
pipeline_view = Repository().get_pipeline(cls)
if pipeline_view:
return pipeline_view.runs # type: ignore[no-any-return]
else:
raise RuntimeError(
f"The PipelineView for `{cls.__name__}` could "
f"not be found. Are you sure this pipeline has "
f"been run already?"
)
@classmethod
def get_run(cls, run_name: str) -> Optional["PipelineRunView"]:
"""Get a specific past run from the associated PipelineView.
Args:
run_name: Name of the run
Returns:
The PipelineRunView of the specific pipeline run.
Raises:
RuntimeError: In case the repository does not contain the view
of the current pipeline.
"""
pipeline_view = Repository().get_pipeline(cls)
if pipeline_view:
return pipeline_view.get_run(run_name) # type: ignore[no-any-return]
else:
raise RuntimeError(
f"The PipelineView for `{cls.__name__}` could "
f"not be found. Are you sure this pipeline has "
f"been run already?"
)
steps: Dict[str, zenml.steps.base_step.BaseStep]
property
writable
Returns a dictionary of pipeline steps.
Returns:
Type | Description |
---|---|
Dict[str, zenml.steps.base_step.BaseStep] |
A dictionary of pipeline steps. |
__init__(self, *args, **kwargs)
special
Initialize the BasePipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
BaseStep |
The steps to be executed by this pipeline. |
() |
**kwargs |
Any |
The configuration for this pipeline. |
{} |
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
"""Initialize the BasePipeline.
Args:
*args: The steps to be executed by this pipeline.
**kwargs: The configuration for this pipeline.
"""
kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
self.secrets = kwargs.pop(PARAM_SECRETS, [])
self.docker_configuration = (
kwargs.pop(PARAM_DOCKER_CONFIGURATION, None)
or DockerConfiguration()
)
self._migrate_to_docker_config(kwargs)
self.name = self.__class__.__name__
self.__steps: Dict[str, BaseStep] = {}
self._verify_arguments(*args, **kwargs)
connect(self, *args, **kwargs)
Function that connects inputs and outputs of the pipeline steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
BaseStep |
The positional arguments passed to the pipeline. |
() |
**kwargs |
BaseStep |
The keyword arguments passed to the pipeline. |
{} |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Always. |
Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
"""Function that connects inputs and outputs of the pipeline steps.
Args:
*args: The positional arguments passed to the pipeline.
**kwargs: The keyword arguments passed to the pipeline.
Raises:
NotImplementedError: Always.
"""
raise NotImplementedError
get_run(run_name)
classmethod
Get a specific past run from the associated PipelineView.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
str |
Name of the run |
required |
Returns:
Type | Description |
---|---|
Optional[PipelineRunView] |
The PipelineRunView of the specific pipeline run. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
In case the repository does not contain the view of the current pipeline. |
Source code in zenml/pipelines/base_pipeline.py
@classmethod
def get_run(cls, run_name: str) -> Optional["PipelineRunView"]:
"""Get a specific past run from the associated PipelineView.
Args:
run_name: Name of the run
Returns:
The PipelineRunView of the specific pipeline run.
Raises:
RuntimeError: In case the repository does not contain the view
of the current pipeline.
"""
pipeline_view = Repository().get_pipeline(cls)
if pipeline_view:
return pipeline_view.get_run(run_name) # type: ignore[no-any-return]
else:
raise RuntimeError(
f"The PipelineView for `{cls.__name__}` could "
f"not be found. Are you sure this pipeline has "
f"been run already?"
)
get_runs()
classmethod
Get all past runs from the associated PipelineView.
Returns:
Type | Description |
---|---|
Optional[List[PipelineRunView]] |
A list of all past PipelineRunViews. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
In case the repository does not contain the view of the current pipeline. |
Source code in zenml/pipelines/base_pipeline.py
@classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
"""Get all past runs from the associated PipelineView.
Returns:
A list of all past PipelineRunViews.
Raises:
RuntimeError: In case the repository does not contain the view
of the current pipeline.
"""
pipeline_view = Repository().get_pipeline(cls)
if pipeline_view:
return pipeline_view.runs # type: ignore[no-any-return]
else:
raise RuntimeError(
f"The PipelineView for `{cls.__name__}` could "
f"not be found. Are you sure this pipeline has "
f"been run already?"
)
run(self, *, run_name=None, schedule=None, **additional_parameters)
Runs the pipeline on the active stack of the current repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
Optional[str] |
Name of the pipeline run. |
None |
schedule |
Optional[zenml.pipelines.schedule.Schedule] |
Optional schedule of the pipeline. |
None |
additional_parameters |
Any |
Additional parameters to pass to the pipeline. |
{} |
Returns:
Type | Description |
---|---|
Any |
The result of the pipeline. |
Source code in zenml/pipelines/base_pipeline.py
def run(
self,
*,
run_name: Optional[str] = None,
schedule: Optional[Schedule] = None,
**additional_parameters: Any,
) -> Any:
"""Runs the pipeline on the active stack of the current repository.
Args:
run_name: Name of the pipeline run.
schedule: Optional schedule of the pipeline.
additional_parameters: Additional parameters to pass to the
pipeline.
Returns:
The result of the pipeline.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls inside docker containers which should only
# run a single step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
logger.info("Creating run for pipeline: `%s`", self.name)
logger.info(
f'Cache {"enabled" if self.enable_cache else "disabled"} for '
f"pipeline `{self.name}`"
)
# Activating the built-in integrations through lazy loading
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
if not Environment.in_notebook():
# Path of the file where pipeline.run() was called. This is needed by
# the airflow orchestrator so it knows which file to copy into the DAG
# directory
dag_filepath = io_utils.resolve_relative_path(
inspect.currentframe().f_back.f_code.co_filename # type: ignore[union-attr]
)
additional_parameters.setdefault("dag_filepath", dag_filepath)
runtime_configuration = RuntimeConfiguration(
run_name=run_name,
schedule=schedule,
**additional_parameters,
)
stack = Repository().active_stack
stack_metadata = {
component_type.value: component.FLAVOR
for component_type, component in stack.components.items()
}
track_event(
event=AnalyticsEvent.RUN_PIPELINE,
metadata={
"store_type": Repository().active_profile.store_type.value,
**stack_metadata,
"total_steps": len(self.steps),
"schedule": bool(schedule),
},
)
self._reset_step_flags()
self.validate_stack(stack)
# Prevent execution of nested pipelines which might lead to unexpected
# behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
return_value = stack.deploy_pipeline(
self, runtime_configuration=runtime_configuration
)
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
return return_value
validate_stack(self, stack)
Validates if a stack is able to run this pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
Stack |
The stack to validate. |
required |
Exceptions:
Type | Description |
---|---|
StackValidationError |
If the step operator is not configured in the active stack. |
Source code in zenml/pipelines/base_pipeline.py
def validate_stack(self, stack: "Stack") -> None:
"""Validates if a stack is able to run this pipeline.
Args:
stack: The stack to validate.
Raises:
StackValidationError: If the step operator is not configured in the
active stack.
"""
available_step_operators = (
{stack.step_operator.name} if stack.step_operator else set()
)
for step in self.steps.values():
if (
step.custom_step_operator
and step.custom_step_operator not in available_step_operators
):
raise StackValidationError(
f"Step '{step.name}' requires custom step operator "
f"'{step.custom_step_operator}' which is not configured in "
f"the active stack. Available step operators: "
f"{available_step_operators}."
)
with_config(self, config_file, overwrite_step_parameters=False)
Configures this pipeline using a yaml file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_file |
str |
Path to a yaml file which contains configuration options for running this pipeline. See https://docs.zenml.io/developer-guide/steps-and-pipelines/runtime-configuration#configuring-with-yaml-config-files for details regarding the specification of this file. |
required |
overwrite_step_parameters |
bool |
If set to |
False |
Returns:
Type | Description |
---|---|
~T |
The pipeline object that this method was called on. |
Source code in zenml/pipelines/base_pipeline.py
def with_config(
self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
"""Configures this pipeline using a yaml file.
Args:
config_file: Path to a yaml file which contains configuration
options for running this pipeline. See
https://docs.zenml.io/developer-guide/steps-and-pipelines/runtime-configuration#configuring-with-yaml-config-files
for details regarding the specification of this file.
overwrite_step_parameters: If set to `True`, values from the
configuration file will overwrite configuration parameters
passed in code.
Returns:
The pipeline object that this method was called on.
"""
config_yaml = yaml_utils.read_yaml(config_file)
if PipelineConfigurationKeys.STEPS in config_yaml:
self._read_config_steps(
config_yaml[PipelineConfigurationKeys.STEPS],
overwrite=overwrite_step_parameters,
)
return self
BasePipelineMeta (type)
Pipeline Metaclass responsible for validating the pipeline definition.
Source code in zenml/pipelines/base_pipeline.py
class BasePipelineMeta(type):
"""Pipeline Metaclass responsible for validating the pipeline definition."""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
"""Saves argument names for later verification purposes.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The class.
"""
cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))
cls.STEP_SPEC = {}
connect_spec = inspect.getfullargspec(
inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
)
connect_args = connect_spec.args
if connect_args and connect_args[0] == "self":
connect_args.pop(0)
for arg in connect_args:
arg_type = connect_spec.annotations.get(arg, None)
cls.STEP_SPEC.update({arg: arg_type})
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Saves argument names for later verification purposes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the class. |
required |
bases |
Tuple[Type[Any], ...] |
The base classes of the class. |
required |
dct |
Dict[str, Any] |
The dictionary of the class. |
required |
Returns:
Type | Description |
---|---|
BasePipelineMeta |
The class. |
Source code in zenml/pipelines/base_pipeline.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
"""Saves argument names for later verification purposes.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The class.
"""
cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))
cls.STEP_SPEC = {}
connect_spec = inspect.getfullargspec(
inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
)
connect_args = connect_spec.args
if connect_args and connect_args[0] == "self":
connect_args.pop(0)
for arg in connect_args:
arg_type = connect_spec.annotations.get(arg, None)
cls.STEP_SPEC.update({arg: arg_type})
return cls
pipeline_decorator
Decorator function for ZenML pipelines.
pipeline(_func=None, *, name=None, enable_cache=True, required_integrations=(), requirements=None, dockerignore_file=None, docker_configuration=None, secrets=[])
Outer decorator function for the creation of a ZenML pipeline.
In order to be able to work with parameters such as "name", it features a nested decorator structure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[~F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
bool |
Whether to use caching or not. |
True |
required_integrations |
Sequence[str] |
DEPRECATED: Optional list of ZenML integrations
that are required to run this pipeline. Please use
|
() |
requirements |
Union[str, List[str]] |
DEPRECATED: Optional path to a requirements file or a
list of requirements. Please use |
None |
dockerignore_file |
Optional[str] |
DEPRECATED: Optional path to a dockerignore file to
use when building docker images for running this pipeline. Please
use |
None |
docker_configuration |
Optional[zenml.config.docker_configuration.DockerConfiguration] |
Configuration of all Docker options. |
None |
secrets |
Optional[List[str]] |
Optional list of secrets that are required to run this pipeline. |
[] |
Returns:
Type | Description |
---|---|
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]] |
the inner decorator which creates the pipeline class based on the ZenML BasePipeline |
Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
_func: Optional[F] = None,
*,
name: Optional[str] = None,
enable_cache: bool = True,
required_integrations: Sequence[str] = (),
requirements: Optional[Union[str, List[str]]] = None,
dockerignore_file: Optional[str] = None,
docker_configuration: Optional[DockerConfiguration] = None,
secrets: Optional[List[str]] = [],
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
"""Outer decorator function for the creation of a ZenML pipeline.
In order to be able to work with parameters such as "name", it features a
nested decorator structure.
Args:
_func: The decorated function.
name: The name of the pipeline. If left empty, the name of the
decorated function will be used as a fallback.
enable_cache: Whether to use caching or not.
required_integrations: DEPRECATED: Optional list of ZenML integrations
that are required to run this pipeline. Please use
`docker_configuration` instead.
requirements: DEPRECATED: Optional path to a requirements file or a
list of requirements. Please use `docker_configuration` instead.
dockerignore_file: DEPRECATED: Optional path to a dockerignore file to
use when building docker images for running this pipeline. Please
use `docker_configuration` instead.
docker_configuration: Configuration of all Docker options.
secrets: Optional list of secrets that are required to run this pipeline.
Returns:
the inner decorator which creates the pipeline class based on the
ZenML BasePipeline
"""
def inner_decorator(func: F) -> Type[BasePipeline]:
"""Inner decorator function for the creation of a ZenML pipeline.
Args:
func: types.FunctionType, this function will be used as the
"connect" method of the generated Pipeline
Returns:
the class of a newly generated ZenML Pipeline
"""
return type( # noqa
name if name else func.__name__,
(BasePipeline,),
{
PIPELINE_INNER_FUNC_NAME: staticmethod(func), # type: ignore[arg-type] # noqa
INSTANCE_CONFIGURATION: {
PARAM_ENABLE_CACHE: enable_cache,
PARAM_REQUIRED_INTEGRATIONS: required_integrations,
PARAM_REQUIREMENTS: requirements,
PARAM_DOCKERIGNORE_FILE: dockerignore_file,
PARAM_SECRETS: secrets,
PARAM_DOCKER_CONFIGURATION: docker_configuration,
},
"__module__": func.__module__,
"__doc__": func.__doc__,
},
)
if _func is None:
return inner_decorator
else:
return inner_decorator(_func)
run_pipeline
Running ZenML Pipelines from Code.
run_pipeline(python_file, config_path)
Runs pipeline specified by the given config YAML object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
python_file |
str |
Path to the python file that defines the pipeline. |
required |
config_path |
str |
Path to configuration YAML file. |
required |
Exceptions:
Type | Description |
---|---|
PipelineConfigurationError |
Error when pipeline configuration is faulty. |
RuntimeError |
Error when zenml repository is not found. |
Source code in zenml/pipelines/run_pipeline.py
def run_pipeline(python_file: str, config_path: str) -> None:
"""Runs pipeline specified by the given config YAML object.
Args:
python_file: Path to the python file that defines the pipeline.
config_path: Path to configuration YAML file.
Raises:
PipelineConfigurationError: Error when pipeline configuration is faulty.
RuntimeError: Error when zenml repository is not found.
"""
# If the file was run with `python run.py, this would happen automatically.
# In order to allow seamless switching between running directly and through
# zenml, this is done at this point
zenml_root = Repository().root
if not zenml_root:
raise RuntimeError(
"The `run_pipeline` function can only be called "
"within a zenml repo. Run `zenml init` before "
"running a pipeline using `run_pipeline`."
)
module = source_utils.import_python_file(python_file, str(zenml_root))
config = yaml_utils.read_yaml(config_path)
PipelineConfigurationKeys.key_check(config)
pipeline_name = config[PipelineConfigurationKeys.NAME]
pipeline_class = _get_module_attribute(module, pipeline_name)
# For docker-based orchestrators it is important for the supplied python
# module to be set as the main module instead of the calling process
constants.USER_MAIN_MODULE = source_utils.get_module_source_from_module(
module=module
)
steps = {}
for step_name, step_config in config[
PipelineConfigurationKeys.STEPS
].items():
StepConfigurationKeys.key_check(step_config)
source = step_config[StepConfigurationKeys.SOURCE_]
step_class = _load_class_from_module(module, source, str(zenml_root))
# It is necessary to support passing step instances for standard
# step implementations (e.g WhylogsProfilerStep) in order to
# support using the same step multiple times, once this problem is
# solved, this portion of the code can be simplified to only
# support classes.
if not isinstance(step_class, BaseStep):
step_instance = step_class()
else:
step_instance = step_class
materializers_config = step_config.get(
StepConfigurationKeys.MATERIALIZERS_, None
)
if materializers_config:
# We need to differentiate whether it's a single materializer
# or a dictionary mapping output names to materializers
if isinstance(materializers_config, str):
correct_input = textwrap.dedent(
f"""
{SourceConfigurationKeys.NAME_}: {materializers_config}
{SourceConfigurationKeys.FILE_}: optional/filepath.py
"""
)
raise PipelineConfigurationError(
"As of ZenML version 0.8.0 `str` entries are no "
"longer supported "
"to define steps or materializers. Instead you will "
"now need to "
"pass a dictionary. This dictionary **has to** "
"contain a "
f"`{SourceConfigurationKeys.NAME_}` which refers to "
f"the function/"
"class name. If this entity is defined outside the "
"main module,"
"you will need to additionally supply a "
f"{SourceConfigurationKeys.FILE_} with the relative "
f"forward-slash-"
"separated path to the file. \n"
f"You tried to pass in `{materializers_config}` "
f"- however you should have specified the name "
f"(and file) like this: \n "
f"{correct_input}"
)
elif isinstance(materializers_config, dict):
materializers = {
output_name: _load_class_from_module(
module, source, str(zenml_root)
)
for output_name, source in materializers_config.items()
}
else:
raise PipelineConfigurationError(
f"Only `dict` values are allowed for "
f"'materializers' attribute of a step configuration. "
f"You tried to pass in `{materializers_config}` (type: "
f"`{type(materializers_config).__name__}`)."
)
step_instance = step_instance.with_return_materializers(
materializers
)
steps[step_name] = step_instance
pipeline_instance = pipeline_class(**steps).with_config(
config_path, overwrite_step_parameters=True
)
logger.debug("Finished setting up pipeline '%s' from CLI", pipeline_name)
pipeline_instance.run()
schedule
Class for defining a pipeline schedule.
Schedule (BaseModel)
pydantic-model
Class for defining a pipeline schedule.
Attributes:
Name | Type | Description |
---|---|---|
cron_expression |
Optional[str] |
Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval. |
start_time |
Optional[datetime.datetime] |
datetime object to indicate when to start the schedule. |
end_time |
Optional[datetime.datetime] |
datetime object to indicate when to end the schedule. |
interval_second |
Optional[datetime.timedelta] |
datetime timedelta indicating the seconds between two recurring runs for a periodic schedule. |
catchup |
bool |
Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill. |
Source code in zenml/pipelines/schedule.py
class Schedule(BaseModel):
"""Class for defining a pipeline schedule.
Attributes:
cron_expression: Cron expression for the pipeline schedule. If a value
for this is set it takes precedence over the start time + interval.
start_time: datetime object to indicate when to start the schedule.
end_time: datetime object to indicate when to end the schedule.
interval_second: datetime timedelta indicating the seconds between two
recurring runs for a periodic schedule.
catchup: Whether the recurring run should catch up if behind schedule.
For example, if the recurring run is paused for a while and
re-enabled afterwards. If catchup=True, the scheduler will catch
up on (backfill) each missed interval. Otherwise, it only
schedules the latest interval if more than one interval is ready to
be scheduled. Usually, if your pipeline handles backfill
internally, you should turn catchup off to avoid duplicate backfill.
"""
cron_expression: Optional[str] = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
interval_second: Optional[datetime.timedelta] = None
catchup: bool = False
@root_validator
def _ensure_cron_or_periodic_schedule_configured(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Ensures that the cron expression or start time + interval are set.
Args:
values: All attributes of the schedule.
Returns:
All schedule attributes.
Raises:
ValueError: If no cron expression or start time + interval were
provided.
"""
cron_expression = values.get("cron_expression")
periodic_schedule = values.get("start_time") and values.get(
"interval_second"
)
if cron_expression and periodic_schedule:
logger.warning(
"This schedule was created with a cron expression as well as "
"values for `start_time` and `interval_seconds`. The resulting "
"behaviour depends on the concrete orchestrator implementation "
"but will usually ignore the interval and use the cron "
"expression."
)
return values
elif cron_expression or periodic_schedule:
return values
else:
raise ValueError(
"Either a cron expression or start time and interval seconds "
"need to be set for a valid schedule."
)
@property
def utc_start_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC start time.
Returns:
Optional ISO-formatted string of the UTC start time.
"""
if not self.start_time:
return None
return self.start_time.astimezone(datetime.timezone.utc).isoformat()
@property
def utc_end_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC end time.
Returns:
Optional ISO-formatted string of the UTC end time.
"""
if not self.end_time:
return None
return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str]
property
readonly
Optional ISO-formatted string of the UTC end time.
Returns:
Type | Description |
---|---|
Optional[str] |
Optional ISO-formatted string of the UTC end time. |
utc_start_time: Optional[str]
property
readonly
Optional ISO-formatted string of the UTC start time.
Returns:
Type | Description |
---|---|
Optional[str] |
Optional ISO-formatted string of the UTC start time. |