Pipelines
zenml.pipelines
special
A ZenML pipeline consists of tasks that execute in order and yield artifacts.
The artifacts are automatically stored within the artifact store and metadata
is tracked by ZenML. Each individual task within a pipeline is known as a
step. The standard pipelines within ZenML are designed to have easy interfaces
to add pre-decided steps, with the order also pre-decided. Other sorts of
pipelines can be created as well from scratch, building on the BasePipeline
class.
Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run
, a pipeline is compiled and passed directly to the orchestrator.
base_pipeline
Abstract base class for all ZenML pipelines.
BasePipeline
Abstract base class for all ZenML pipelines.
Attributes:
Name | Type | Description |
---|---|---|
name |
The name of this pipeline. |
|
enable_cache |
A boolean indicating if caching is enabled for this pipeline. |
Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(metaclass=BasePipelineMeta):
"""Abstract base class for all ZenML pipelines.
Attributes:
name: The name of this pipeline.
enable_cache: A boolean indicating if caching is enabled for this
pipeline.
"""
STEP_SPEC: ClassVar[Dict[str, Any]] = None # type: ignore[assignment]
INSTANCE_CONFIGURATION: Dict[str, Any] = {}
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
"""Initialize the BasePipeline.
Args:
*args: The steps to be executed by this pipeline.
**kwargs: The configuration for this pipeline.
"""
kwargs.update(self.INSTANCE_CONFIGURATION)
self._configuration = PipelineConfiguration(
name=self.__class__.__name__,
enable_cache=kwargs.pop(PARAM_ENABLE_CACHE, True),
)
self._apply_class_configuration(kwargs)
self.__steps: Dict[str, BaseStep] = {}
self._verify_steps(*args, **kwargs)
@property
def name(self) -> str:
"""The name of the pipeline.
Returns:
The name of the pipeline.
"""
return self.configuration.name
@property
def enable_cache(self) -> bool:
"""If caching is enabled for the pipeline.
Returns:
If caching is enabled for the pipeline.
"""
return self.configuration.enable_cache
@property
def configuration(self) -> PipelineConfiguration:
"""The configuration of the pipeline.
Returns:
The configuration of the pipeline.
"""
return self._configuration
@property
def steps(self) -> Dict[str, BaseStep]:
"""Returns a dictionary of pipeline steps.
Returns:
A dictionary of pipeline steps.
"""
return self.__steps
def configure(
self: T,
enable_cache: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
merge: bool = True,
) -> T:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
Returns:
The pipeline instance that this method was called on.
"""
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"settings": settings,
"extra": extra,
}
)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
def _apply_class_configuration(self, options: Dict[str, Any]) -> None:
"""Applies the configurations specified on the pipeline class.
Args:
options: Class configurations.
"""
settings = options.pop(PARAM_SETTINGS, None)
extra = options.pop(PARAM_EXTRA_OPTIONS, None)
self.configure(settings=settings, extra=extra)
def _verify_steps(self, *steps: BaseStep, **kw_steps: Any) -> None:
"""Verifies the initialization args and kwargs of this pipeline.
This method makes sure that no missing/unexpected arguments or
arguments of a wrong type are passed when creating a pipeline. If
all arguments are correct, saves the steps to `self.__steps`.
Args:
*steps: The args passed to the init method of this pipeline.
**kw_steps: The kwargs passed to the init method of this pipeline.
Raises:
PipelineInterfaceError: If there are too many/few arguments or
arguments with a wrong name/type.
"""
input_step_keys = list(self.STEP_SPEC.keys())
if len(steps) > len(input_step_keys):
raise PipelineInterfaceError(
f"Too many input steps for pipeline '{self.name}'. "
f"This pipeline expects {len(input_step_keys)} step(s) "
f"but got {len(steps) + len(kw_steps)}."
)
combined_steps = {}
step_ids: Dict[int, str] = {}
def _verify_step(key: str, step: BaseStep) -> None:
"""Verifies a single step of the pipeline.
Args:
key: The key of the step.
step: The step to verify.
Raises:
PipelineInterfaceError: If the step is not of the correct type
or is of the same class as another step.
"""
step_class = type(step)
if isinstance(step, BaseStepMeta):
raise PipelineInterfaceError(
f"Wrong argument type (`{step_class}`) for argument "
f"'{key}' of pipeline '{self.name}'. "
f"A `BaseStep` subclass was provided instead of an "
f"instance. "
f"This might have been caused due to missing brackets of "
f"your steps when creating a pipeline with `@step` "
f"decorated functions, "
f"for which the correct syntax is `pipeline(step=step())`."
)
if not isinstance(step, BaseStep):
raise PipelineInterfaceError(
f"Wrong argument type (`{step_class}`) for argument "
f"'{key}' of pipeline '{self.name}'. Only "
f"`@step` decorated functions or instances of `BaseStep` "
f"subclasses can be used as arguments when creating "
f"a pipeline."
)
if id(step) in step_ids:
previous_key = step_ids[id(step)]
raise PipelineInterfaceError(
f"Found the same step object for arguments "
f"'{previous_key}' and '{key}' in pipeline '{self.name}'. "
"Step object cannot be reused inside a ZenML pipeline. "
"A possible solution is to create two instances of the "
"same step class and assigning them different names: "
"`first_instance = step_class(name='s1')` and "
"`second_instance = step_class(name='s2')`."
)
step.pipeline_parameter_name = key
step_ids[id(step)] = key
combined_steps[key] = step
# verify args
for i, step in enumerate(steps):
key = input_step_keys[i]
_verify_step(key, step)
# verify kwargs
for key, step in kw_steps.items():
if key in combined_steps:
# a step for this key was already set by
# the positional input steps
raise PipelineInterfaceError(
f"Unexpected keyword argument '{key}' for pipeline "
f"'{self.name}'. A step for this key was "
f"already passed as a positional argument."
)
_verify_step(key, step)
# check if there are any missing or unexpected steps
expected_steps = set(self.STEP_SPEC.keys())
actual_steps = set(combined_steps.keys())
missing_steps = expected_steps - actual_steps
unexpected_steps = actual_steps - expected_steps
if missing_steps:
raise PipelineInterfaceError(
f"Missing input step(s) for pipeline "
f"'{self.name}': {missing_steps}."
)
if unexpected_steps:
raise PipelineInterfaceError(
f"Unexpected input step(s) for pipeline "
f"'{self.name}': {unexpected_steps}. This pipeline "
f"only requires the following steps: {expected_steps}."
)
self.__steps = combined_steps
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
"""Function that connects inputs and outputs of the pipeline steps.
Args:
*args: The positional arguments passed to the pipeline.
**kwargs: The keyword arguments passed to the pipeline.
Raises:
NotImplementedError: Always.
"""
raise NotImplementedError
def _get_pipeline_analytics_metadata(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Dict[str, Any]:
"""Returns the pipeline deployment metadata.
Args:
deployment: The pipeline deployment to track.
stack: The stack on which the pipeline will be deployed.
Returns:
the metadata about the pipeline deployment
"""
custom_materializer = False
custom_artifact = False
for step in deployment.steps.values():
for output in step.config.outputs.values():
if not output.materializer_source.startswith("zenml."):
custom_materializer = True
if not output.artifact_source.startswith("zenml."):
custom_artifact = True
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
return {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"total_steps": len(self.steps),
"schedule": bool(deployment.schedule),
"custom_materializer": custom_materializer,
"custom_artifact": custom_artifact,
}
def run(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
schedule: Optional[Schedule] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
) -> Any:
"""Runs the pipeline on the active stack of the current repository.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
schedule: Optional schedule of the pipeline.
settings: settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
Returns:
The result of the pipeline.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls inside docker containers which should only
# run a single step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
with event_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
stack = Client().active_stack
# Activating the built-in integrations through lazy loading
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
if config_path:
config_dict = yaml_utils.read_yaml(config_path)
run_config = PipelineRunConfiguration.parse_obj(config_dict)
else:
run_config = PipelineRunConfiguration()
new_values = dict_utils.remove_none_values(
{
"run_name": run_name,
"enable_cache": enable_cache,
"steps": step_configurations,
"settings": settings,
"schedule": schedule,
"extra": extra,
}
)
# Update with the values in code so they take precedence
run_config = pydantic_utils.update_model(
run_config, update=new_values
)
from zenml.config.compiler import Compiler
pipeline_deployment = Compiler().compile(
pipeline=self, stack=stack, run_configuration=run_config
)
skip_pipeline_registration = constants.handle_bool_env_var(
constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION, default=False
)
register_pipeline = not (skip_pipeline_registration or unlisted)
pipeline_id = None
if register_pipeline:
step_specs = [
step.spec for step in pipeline_deployment.steps.values()
]
pipeline_spec = PipelineSpec(steps=step_specs)
pipeline_id = Client().create_pipeline(
pipeline_name=pipeline_deployment.pipeline.name,
pipeline_spec=pipeline_spec,
pipeline_docstring=self.__doc__,
)
pipeline_deployment = pipeline_deployment.copy(
update={"pipeline_id": pipeline_id}
)
analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=pipeline_deployment, stack=stack
)
caching_status = (
"enabled"
if pipeline_deployment.pipeline.enable_cache
else "disabled"
)
logger.info(
"%s %s on stack `%s` (caching %s)",
"Scheduling" if pipeline_deployment.schedule else "Running",
f"pipeline `{pipeline_deployment.pipeline.name}`"
if register_pipeline
else "unlisted pipeline",
stack.name,
caching_status,
)
stack.prepare_pipeline_deployment(deployment=pipeline_deployment)
# Prevent execution of nested pipelines which might lead to
# unexpected behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
return_value = stack.deploy_pipeline(pipeline_deployment)
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
# Log the dashboard URL
dashboard_utils.print_run_url(
run_name=pipeline_deployment.run_name, pipeline_id=pipeline_id
)
return return_value
def _apply_configuration(
self,
config: PipelineConfigurationUpdate,
merge: bool = True,
) -> None:
"""Applies an update to the pipeline configuration.
Args:
config: The configuration update.
merge: Whether to merge the updates with the existing configuration
or not. See the `BasePipeline.configure(...)` method for a
detailed explanation.
"""
self._validate_configuration(config)
self._configuration = pydantic_utils.update_model(
self._configuration, update=config, recursive=merge
)
logger.debug("Updated pipeline configuration:")
logger.debug(self._configuration)
@staticmethod
def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
"""Validates a configuration update.
Args:
config: The configuration update to validate.
"""
settings_utils.validate_setting_keys(list(config.settings))
def with_config(
self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
"""DEPRECATED: Configures this pipeline using a yaml file.
Args:
config_file: Path to a yaml file which contains configuration
options for running this pipeline. See
https://docs.zenml.io/advanced-guide/pipelines/settings
for details regarding the specification of this file.
overwrite_step_parameters: If set to `True`, values from the
configuration file will overwrite configuration parameters
passed in code.
Returns:
The pipeline object that this method was called on.
"""
logger.warning(
"The `with_config(...)` method is deprecated. Use "
"`pipeline.configure(...)` or `pipeline.run(config_path=...)` "
"instead."
)
config_yaml = yaml_utils.read_yaml(config_file)
if PipelineConfigurationKeys.STEPS in config_yaml:
self._read_config_steps(
config_yaml[PipelineConfigurationKeys.STEPS],
overwrite=overwrite_step_parameters,
)
return self
def _read_config_steps(
self, steps: Dict[str, Dict[str, Any]], overwrite: bool = False
) -> None:
"""Reads and sets step parameters from a config file.
Args:
steps: Maps step names to dicts of parameter names and values.
overwrite: If `True`, overwrite previously set step parameters.
Raises:
PipelineConfigurationError: If the configuration file contains
invalid data.
"""
for step_name, step_dict in steps.items():
StepConfigurationKeys.key_check(step_dict)
if step_name not in self.__steps:
raise PipelineConfigurationError(
f"Found '{step_name}' step in configuration yaml but it "
f"doesn't exist in the pipeline steps "
f"{list(self.__steps.keys())}."
)
step = self.__steps[step_name]
parameters = step_dict.get(StepConfigurationKeys.PARAMETERS_, {})
enable_cache = parameters.pop(PARAM_ENABLE_CACHE, None)
if not overwrite:
parameters.update(step.configuration.parameters)
step.configure(
enable_cache=enable_cache,
parameters=parameters,
)
@classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
"""Get all past runs from the associated PipelineView.
Returns:
A list of all past PipelineRunViews.
Raises:
RuntimeError: In case the repository does not contain the view
of the current pipeline.
"""
from zenml.post_execution import get_pipeline
pipeline_view = get_pipeline(cls)
if pipeline_view:
return pipeline_view.runs # type: ignore[no-any-return]
else:
raise RuntimeError(
f"The PipelineView for `{cls.__name__}` could "
f"not be found. Are you sure this pipeline has "
f"been run already?"
)
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, step in self.steps.items():
parameters = (
pydantic_utils.TemplateGenerator(step.PARAMETERS_CLASS).run()
if step.PARAMETERS_CLASS
else {}
)
outputs = {
name: PartialArtifactConfiguration()
for name in step.OUTPUT_SIGNATURE
}
step_template = StepConfigurationUpdate(
parameters=parameters,
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
configuration: PipelineConfiguration
property
readonly
The configuration of the pipeline.
Returns:
Type | Description |
---|---|
PipelineConfiguration |
The configuration of the pipeline. |
enable_cache: bool
property
readonly
If caching is enabled for the pipeline.
Returns:
Type | Description |
---|---|
bool |
If caching is enabled for the pipeline. |
name: str
property
readonly
The name of the pipeline.
Returns:
Type | Description |
---|---|
str |
The name of the pipeline. |
steps: Dict[str, zenml.steps.base_step.BaseStep]
property
readonly
Returns a dictionary of pipeline steps.
Returns:
Type | Description |
---|---|
Dict[str, zenml.steps.base_step.BaseStep] |
A dictionary of pipeline steps. |
__init__(self, *args, **kwargs)
special
Initialize the BasePipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
BaseStep |
The steps to be executed by this pipeline. |
() |
**kwargs |
Any |
The configuration for this pipeline. |
{} |
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
"""Initialize the BasePipeline.
Args:
*args: The steps to be executed by this pipeline.
**kwargs: The configuration for this pipeline.
"""
kwargs.update(self.INSTANCE_CONFIGURATION)
self._configuration = PipelineConfiguration(
name=self.__class__.__name__,
enable_cache=kwargs.pop(PARAM_ENABLE_CACHE, True),
)
self._apply_class_configuration(kwargs)
self.__steps: Dict[str, BaseStep] = {}
self._verify_steps(*args, **kwargs)
configure(self, enable_cache=None, settings=None, extra=None, merge=True)
Configures the pipeline.
Configuration merging example:
* merge==True
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* merge==False
:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
merge |
bool |
If |
True |
Returns:
Type | Description |
---|---|
~T |
The pipeline instance that this method was called on. |
Source code in zenml/pipelines/base_pipeline.py
def configure(
self: T,
enable_cache: Optional[bool] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
merge: bool = True,
) -> T:
"""Configures the pipeline.
Configuration merging example:
* `merge==True`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=True)
pipeline.configuration.extra # {"key1": 1, "key2": 2}
* `merge==False`:
pipeline.configure(extra={"key1": 1})
pipeline.configure(extra={"key2": 2}, merge=False)
pipeline.configuration.extra # {"key2": 2}
Args:
enable_cache: If caching should be enabled for this pipeline.
settings: settings for this pipeline.
extra: Extra configurations for this pipeline.
merge: If `True`, will merge the given dictionary configurations
like `extra` and `settings` with existing
configurations. If `False` the given configurations will
overwrite all existing ones. See the general description of this
method for an example.
Returns:
The pipeline instance that this method was called on.
"""
values = dict_utils.remove_none_values(
{
"enable_cache": enable_cache,
"settings": settings,
"extra": extra,
}
)
config = PipelineConfigurationUpdate(**values)
self._apply_configuration(config, merge=merge)
return self
connect(self, *args, **kwargs)
Function that connects inputs and outputs of the pipeline steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
BaseStep |
The positional arguments passed to the pipeline. |
() |
**kwargs |
BaseStep |
The keyword arguments passed to the pipeline. |
{} |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Always. |
Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
"""Function that connects inputs and outputs of the pipeline steps.
Args:
*args: The positional arguments passed to the pipeline.
**kwargs: The keyword arguments passed to the pipeline.
Raises:
NotImplementedError: Always.
"""
raise NotImplementedError
get_runs()
classmethod
Get all past runs from the associated PipelineView.
Returns:
Type | Description |
---|---|
Optional[List[PipelineRunView]] |
A list of all past PipelineRunViews. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
In case the repository does not contain the view of the current pipeline. |
Source code in zenml/pipelines/base_pipeline.py
@classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
"""Get all past runs from the associated PipelineView.
Returns:
A list of all past PipelineRunViews.
Raises:
RuntimeError: In case the repository does not contain the view
of the current pipeline.
"""
from zenml.post_execution import get_pipeline
pipeline_view = get_pipeline(cls)
if pipeline_view:
return pipeline_view.runs # type: ignore[no-any-return]
else:
raise RuntimeError(
f"The PipelineView for `{cls.__name__}` could "
f"not be found. Are you sure this pipeline has "
f"been run already?"
)
run(self, *, run_name=None, enable_cache=None, schedule=None, settings=None, step_configurations=None, extra=None, config_path=None, unlisted=False)
Runs the pipeline on the active stack of the current repository.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
Optional[str] |
Name of the pipeline run. |
None |
enable_cache |
Optional[bool] |
If caching should be enabled for this pipeline run. |
None |
schedule |
Optional[zenml.config.schedule.Schedule] |
Optional schedule of the pipeline. |
None |
settings |
Optional[Mapping[str, SettingsOrDict]] |
settings for this pipeline run. |
None |
step_configurations |
Optional[Mapping[str, StepConfigurationUpdateOrDict]] |
Configurations for steps of the pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline run. |
None |
config_path |
Optional[str] |
Path to a yaml configuration file. This file will
be parsed as a |
None |
unlisted |
bool |
Whether the pipeline run should be unlisted (not assigned to any pipeline). |
False |
Returns:
Type | Description |
---|---|
Any |
The result of the pipeline. |
Source code in zenml/pipelines/base_pipeline.py
def run(
self,
*,
run_name: Optional[str] = None,
enable_cache: Optional[bool] = None,
schedule: Optional[Schedule] = None,
settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
step_configurations: Optional[
Mapping[str, "StepConfigurationUpdateOrDict"]
] = None,
extra: Optional[Dict[str, Any]] = None,
config_path: Optional[str] = None,
unlisted: bool = False,
) -> Any:
"""Runs the pipeline on the active stack of the current repository.
Args:
run_name: Name of the pipeline run.
enable_cache: If caching should be enabled for this pipeline run.
schedule: Optional schedule of the pipeline.
settings: settings for this pipeline run.
step_configurations: Configurations for steps of the pipeline.
extra: Extra configurations for this pipeline run.
config_path: Path to a yaml configuration file. This file will
be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
object. Options provided in this file will be overwritten by
options provided in code using the other arguments of this
method.
unlisted: Whether the pipeline run should be unlisted (not assigned
to any pipeline).
Returns:
The result of the pipeline.
"""
if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
# An environment variable was set to stop the execution of
# pipelines. This is done to prevent execution of module-level
# pipeline.run() calls inside docker containers which should only
# run a single step.
logger.info(
"Preventing execution of pipeline '%s'. If this is not "
"intended behavior, make sure to unset the environment "
"variable '%s'.",
self.name,
constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
)
return
with event_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
stack = Client().active_stack
# Activating the built-in integrations through lazy loading
from zenml.integrations.registry import integration_registry
integration_registry.activate_integrations()
if config_path:
config_dict = yaml_utils.read_yaml(config_path)
run_config = PipelineRunConfiguration.parse_obj(config_dict)
else:
run_config = PipelineRunConfiguration()
new_values = dict_utils.remove_none_values(
{
"run_name": run_name,
"enable_cache": enable_cache,
"steps": step_configurations,
"settings": settings,
"schedule": schedule,
"extra": extra,
}
)
# Update with the values in code so they take precedence
run_config = pydantic_utils.update_model(
run_config, update=new_values
)
from zenml.config.compiler import Compiler
pipeline_deployment = Compiler().compile(
pipeline=self, stack=stack, run_configuration=run_config
)
skip_pipeline_registration = constants.handle_bool_env_var(
constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION, default=False
)
register_pipeline = not (skip_pipeline_registration or unlisted)
pipeline_id = None
if register_pipeline:
step_specs = [
step.spec for step in pipeline_deployment.steps.values()
]
pipeline_spec = PipelineSpec(steps=step_specs)
pipeline_id = Client().create_pipeline(
pipeline_name=pipeline_deployment.pipeline.name,
pipeline_spec=pipeline_spec,
pipeline_docstring=self.__doc__,
)
pipeline_deployment = pipeline_deployment.copy(
update={"pipeline_id": pipeline_id}
)
analytics_handler.metadata = self._get_pipeline_analytics_metadata(
deployment=pipeline_deployment, stack=stack
)
caching_status = (
"enabled"
if pipeline_deployment.pipeline.enable_cache
else "disabled"
)
logger.info(
"%s %s on stack `%s` (caching %s)",
"Scheduling" if pipeline_deployment.schedule else "Running",
f"pipeline `{pipeline_deployment.pipeline.name}`"
if register_pipeline
else "unlisted pipeline",
stack.name,
caching_status,
)
stack.prepare_pipeline_deployment(deployment=pipeline_deployment)
# Prevent execution of nested pipelines which might lead to
# unexpected behavior
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
try:
return_value = stack.deploy_pipeline(pipeline_deployment)
finally:
constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
# Log the dashboard URL
dashboard_utils.print_run_url(
run_name=pipeline_deployment.run_name, pipeline_id=pipeline_id
)
return return_value
with_config(self, config_file, overwrite_step_parameters=False)
DEPRECATED: Configures this pipeline using a yaml file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_file |
str |
Path to a yaml file which contains configuration options for running this pipeline. See https://docs.zenml.io/advanced-guide/pipelines/settings for details regarding the specification of this file. |
required |
overwrite_step_parameters |
bool |
If set to |
False |
Returns:
Type | Description |
---|---|
~T |
The pipeline object that this method was called on. |
Source code in zenml/pipelines/base_pipeline.py
def with_config(
self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
"""DEPRECATED: Configures this pipeline using a yaml file.
Args:
config_file: Path to a yaml file which contains configuration
options for running this pipeline. See
https://docs.zenml.io/advanced-guide/pipelines/settings
for details regarding the specification of this file.
overwrite_step_parameters: If set to `True`, values from the
configuration file will overwrite configuration parameters
passed in code.
Returns:
The pipeline object that this method was called on.
"""
logger.warning(
"The `with_config(...)` method is deprecated. Use "
"`pipeline.configure(...)` or `pipeline.run(config_path=...)` "
"instead."
)
config_yaml = yaml_utils.read_yaml(config_file)
if PipelineConfigurationKeys.STEPS in config_yaml:
self._read_config_steps(
config_yaml[PipelineConfigurationKeys.STEPS],
overwrite=overwrite_step_parameters,
)
return self
write_run_configuration_template(self, path, stack=None)
Writes a run configuration yaml template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path where the template will be written. |
required |
stack |
Optional[Stack] |
The stack for which the template should be generated. If not given, the active stack will be used. |
None |
Source code in zenml/pipelines/base_pipeline.py
def write_run_configuration_template(
self, path: str, stack: Optional["Stack"] = None
) -> None:
"""Writes a run configuration yaml template.
Args:
path: The path where the template will be written.
stack: The stack for which the template should be generated. If
not given, the active stack will be used.
"""
from zenml.config.base_settings import ConfigurationLevel
from zenml.config.step_configurations import (
PartialArtifactConfiguration,
)
stack = stack or Client().active_stack
setting_classes = stack.setting_classes
setting_classes.update(settings_utils.get_general_settings())
pipeline_settings = {}
step_settings = {}
for key, setting_class in setting_classes.items():
fields = pydantic_utils.TemplateGenerator(setting_class).run()
if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
pipeline_settings[key] = fields
if ConfigurationLevel.STEP in setting_class.LEVEL:
step_settings[key] = fields
steps = {}
for step_name, step in self.steps.items():
parameters = (
pydantic_utils.TemplateGenerator(step.PARAMETERS_CLASS).run()
if step.PARAMETERS_CLASS
else {}
)
outputs = {
name: PartialArtifactConfiguration()
for name in step.OUTPUT_SIGNATURE
}
step_template = StepConfigurationUpdate(
parameters=parameters,
settings=step_settings,
outputs=outputs,
)
steps[step_name] = step_template
run_config = PipelineRunConfiguration(
settings=pipeline_settings, steps=steps
)
template = pydantic_utils.TemplateGenerator(run_config).run()
yaml_string = yaml.dump(template)
yaml_string = yaml_utils.comment_out_yaml(yaml_string)
with open(path, "w") as f:
f.write(yaml_string)
BasePipelineMeta (type)
Pipeline Metaclass responsible for validating the pipeline definition.
Source code in zenml/pipelines/base_pipeline.py
class BasePipelineMeta(type):
"""Pipeline Metaclass responsible for validating the pipeline definition."""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
"""Saves argument names for later verification purposes.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The class.
"""
dct.setdefault(INSTANCE_CONFIGURATION, {})
cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))
cls.STEP_SPEC = {}
connect_spec = inspect.getfullargspec(
inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
)
connect_args = connect_spec.args
if connect_args and connect_args[0] == "self":
connect_args.pop(0)
for arg in connect_args:
arg_type = connect_spec.annotations.get(arg, None)
cls.STEP_SPEC.update({arg: arg_type})
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Saves argument names for later verification purposes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the class. |
required |
bases |
Tuple[Type[Any], ...] |
The base classes of the class. |
required |
dct |
Dict[str, Any] |
The dictionary of the class. |
required |
Returns:
Type | Description |
---|---|
BasePipelineMeta |
The class. |
Source code in zenml/pipelines/base_pipeline.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
"""Saves argument names for later verification purposes.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The class.
"""
dct.setdefault(INSTANCE_CONFIGURATION, {})
cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))
cls.STEP_SPEC = {}
connect_spec = inspect.getfullargspec(
inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
)
connect_args = connect_spec.args
if connect_args and connect_args[0] == "self":
connect_args.pop(0)
for arg in connect_args:
arg_type = connect_spec.annotations.get(arg, None)
cls.STEP_SPEC.update({arg: arg_type})
return cls
pipeline_decorator
Decorator function for ZenML pipelines.
pipeline(_func=None, *, name=None, enable_cache=True, settings=None, extra=None)
Outer decorator function for the creation of a ZenML pipeline.
In order to be able to work with parameters such as "name", it features a nested decorator structure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
Optional[~F] |
The decorated function. |
None |
name |
Optional[str] |
The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. |
None |
enable_cache |
bool |
Whether to use caching or not. |
True |
settings |
Optional[Dict[str, SettingsOrDict]] |
Settings for this pipeline. |
None |
extra |
Optional[Dict[str, Any]] |
Extra configurations for this pipeline. |
None |
Returns:
Type | Description |
---|---|
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]] |
the inner decorator which creates the pipeline class based on the ZenML BasePipeline |
Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
_func: Optional[F] = None,
*,
name: Optional[str] = None,
enable_cache: bool = True,
settings: Optional[Dict[str, "SettingsOrDict"]] = None,
extra: Optional[Dict[str, Any]] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
"""Outer decorator function for the creation of a ZenML pipeline.
In order to be able to work with parameters such as "name", it features a
nested decorator structure.
Args:
_func: The decorated function.
name: The name of the pipeline. If left empty, the name of the
decorated function will be used as a fallback.
enable_cache: Whether to use caching or not.
settings: Settings for this pipeline.
extra: Extra configurations for this pipeline.
Returns:
the inner decorator which creates the pipeline class based on the
ZenML BasePipeline
"""
def inner_decorator(func: F) -> Type[BasePipeline]:
"""Inner decorator function for the creation of a ZenML pipeline.
Args:
func: types.FunctionType, this function will be used as the
"connect" method of the generated Pipeline
Returns:
the class of a newly generated ZenML Pipeline
"""
return type( # noqa
name if name else func.__name__,
(BasePipeline,),
{
PIPELINE_INNER_FUNC_NAME: staticmethod(func), # type: ignore[arg-type] # noqa
INSTANCE_CONFIGURATION: {
PARAM_ENABLE_CACHE: enable_cache,
PARAM_SETTINGS: settings,
PARAM_EXTRA_OPTIONS: extra,
},
"__module__": func.__module__,
"__doc__": func.__doc__,
},
)
if _func is None:
return inner_decorator
else:
return inner_decorator(_func)
run_pipeline
Running ZenML Pipelines from Code.
run_pipeline(python_file, config_path)
Runs pipeline specified by the given config YAML object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
python_file |
str |
Path to the python file that defines the pipeline. |
required |
config_path |
str |
Path to configuration YAML file. |
required |
Exceptions:
Type | Description |
---|---|
PipelineConfigurationError |
Error when pipeline configuration is faulty. |
RuntimeError |
Error when zenml repository is not found. |
Source code in zenml/pipelines/run_pipeline.py
def run_pipeline(python_file: str, config_path: str) -> None:
"""Runs pipeline specified by the given config YAML object.
Args:
python_file: Path to the python file that defines the pipeline.
config_path: Path to configuration YAML file.
Raises:
PipelineConfigurationError: Error when pipeline configuration is faulty.
RuntimeError: Error when zenml repository is not found.
"""
# If the file was run with `python run.py, this would happen automatically.
# In order to allow seamless switching between running directly and through
# zenml, this is done at this point
zenml_root = Client().root
if not zenml_root:
raise RuntimeError(
"The `run_pipeline` function can only be called "
"within a zenml repo. Run `zenml init` before "
"running a pipeline using `run_pipeline`."
)
module = source_utils.import_python_file(python_file, str(zenml_root))
config = yaml_utils.read_yaml(config_path)
PipelineConfigurationKeys.key_check(config)
pipeline_name = config[PipelineConfigurationKeys.NAME]
pipeline_class = _get_module_attribute(module, pipeline_name)
# For docker-based orchestrators it is important for the supplied python
# module to be set as the main module instead of the calling process
constants.USER_MAIN_MODULE = source_utils.get_module_source_from_module(
module=module
)
steps = {}
for step_name, step_config in config[
PipelineConfigurationKeys.STEPS
].items():
StepConfigurationKeys.key_check(step_config)
source = step_config[StepConfigurationKeys.SOURCE_]
step_class = _load_class_from_module(module, source, str(zenml_root))
# It is necessary to support passing step instances for standard
# step implementations (e.g WhylogsProfilerStep) in order to
# support using the same step multiple times, once this problem is
# solved, this portion of the code can be simplified to only
# support classes.
if not isinstance(step_class, BaseStep):
step_instance = step_class()
else:
step_instance = step_class
materializers_config = step_config.get(
StepConfigurationKeys.MATERIALIZERS_, None
)
if materializers_config:
# We need to differentiate whether it's a single materializer
# or a dictionary mapping output names to materializers
if isinstance(materializers_config, str):
correct_input = textwrap.dedent(
f"""
{SourceConfigurationKeys.NAME_}: {materializers_config}
{SourceConfigurationKeys.FILE_}: optional/filepath.py
"""
)
raise PipelineConfigurationError(
"As of ZenML version 0.8.0 `str` entries are no "
"longer supported "
"to define steps or materializers. Instead you will "
"now need to "
"pass a dictionary. This dictionary **has to** "
"contain a "
f"`{SourceConfigurationKeys.NAME_}` which refers to "
f"the function/"
"class name. If this entity is defined outside the "
"main module,"
"you will need to additionally supply a "
f"{SourceConfigurationKeys.FILE_} with the relative "
f"forward-slash-"
"separated path to the file. \n"
f"You tried to pass in `{materializers_config}` "
f"- however you should have specified the name "
f"(and file) like this: \n "
f"{correct_input}"
)
elif isinstance(materializers_config, dict):
materializers = {
output_name: _load_class_from_module(
module, source, str(zenml_root)
)
for output_name, source in materializers_config.items()
}
else:
raise PipelineConfigurationError(
f"Only `dict` values are allowed for "
f"'materializers' attribute of a step configuration. "
f"You tried to pass in `{materializers_config}` (type: "
f"`{type(materializers_config).__name__}`)."
)
step_instance = step_instance.with_return_materializers(
materializers
)
steps[step_name] = step_instance
pipeline_instance = pipeline_class(**steps).with_config(
config_path, overwrite_step_parameters=True
)
logger.debug("Finished setting up pipeline '%s' from CLI", pipeline_name)
pipeline_instance.run()