Pipelines
        zenml.pipelines
  
      special
  
    A ZenML pipeline consists of tasks that execute in order and yield artifacts.
The artifacts are automatically stored within the artifact store and metadata 
is tracked by ZenML. Each individual task within a pipeline is known as a
step. The standard pipelines within ZenML are designed to have easy interfaces
to add pre-decided steps, with the order also pre-decided. Other sorts of
pipelines can be created as well from scratch, building on the BasePipeline class.
Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.
        base_pipeline
    Legacy ZenML pipeline class definition.
        
BasePipeline            (Pipeline, ABC)
        
    Legacy pipeline class.
Source code in zenml/pipelines/base_pipeline.py
          class BasePipeline(Pipeline, ABC):
    """Legacy pipeline class."""
    _CLASS_CONFIGURATION: ClassVar[Optional[Dict[str, Any]]] = None
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initializes a pipeline.
        Args:
            *args: Initialization arguments.
            **kwargs: Initialization keyword arguments.
        """
        config = self._CLASS_CONFIGURATION or {}
        pipeline_name = (
            config.pop(PARAM_PIPELINE_NAME, None) or self.__class__.__name__
        )
        self._steps = self._verify_steps(
            *args, __name__=pipeline_name, **kwargs
        )
        def entrypoint() -> None:
            self.connect(**self._steps)
        super().__init__(
            name=pipeline_name,
            entrypoint=entrypoint,
            **config,
        )
    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns the steps of the pipeline.
        Returns:
            The steps of the pipeline.
        """
        return self._steps
    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Abstract method that connects the pipeline steps.
        Args:
            *args: Connect method arguments.
            **kwargs: Connect method keyword arguments.
        """
        raise NotImplementedError
    def resolve(self) -> "Source":
        """Resolves the pipeline.
        Returns:
            The pipeline source.
        """
        return source_utils.resolve(self.__class__)
    @property
    def source_object(self) -> Any:
        """The source object of this pipeline.
        Returns:
            The source object of this pipeline.
        """
        return self.connect
    def run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBase", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
    ) -> None:
        """Runs the pipeline on the active stack.
        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline run.
            enable_step_logs: If step logs should be enabled for this pipeline
                run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: Whether to prevent the reuse of a build.
        """
        pipeline_copy = self.with_options(
            run_name=run_name,
            schedule=schedule,
            build=build,
            step_configurations=step_configurations,
            config_path=config_path,
            unlisted=unlisted,
            prevent_build_reuse=prevent_build_reuse,
        )
        new_run_args = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "enable_artifact_visualization": enable_artifact_visualization,
                "enable_step_logs": enable_step_logs,
                "settings": settings,
                "extra": extra,
            }
        )
        pipeline_copy._run_args.update(new_run_args)
        pipeline_copy()
    def _compute_invocation_id(
        self,
        step: "BaseStep",
        custom_id: Optional[str] = None,
        allow_suffix: bool = True,
    ) -> str:
        """Compute the invocation ID.
        Args:
            step: The step for which to compute the ID.
            custom_id: Custom ID to use for the invocation.
            allow_suffix: Whether a suffix can be appended to the invocation
                ID.
        Returns:
            The invocation ID.
        """
        custom_id = getattr(step, TEMPLATE_NAME_ATTRIBUTE, None)
        return super()._compute_invocation_id(
            step=step, custom_id=custom_id, allow_suffix=False
        )
    def _verify_steps(
        self, *args: Any, __name__: str, **kwargs: Any
    ) -> Dict[str, "BaseStep"]:
        """Verifies the initialization args and kwargs of this pipeline.
        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline.
        Args:
            *args: The args passed to the init method of this pipeline.
            __name__: The pipeline name. The naming of this argument is to avoid
                conflicts with other arguments.
            **kwargs: The kwargs passed to the init method of this pipeline.
        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.
        Returns:
            The verified steps.
        """
        signature = inspect.signature(self.connect, follow_wrapped=True)
        try:
            bound_args = signature.bind(*args, **kwargs)
        except TypeError as e:
            raise PipelineInterfaceError(
                f"Wrong arguments when initializing pipeline '{__name__}': {e}"
            ) from e
        steps = {}
        for key, potential_step in bound_args.arguments.items():
            step_class = type(potential_step)
            if inspect.isclass(potential_step) and issubclass(
                potential_step, BaseStep
            ):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{__name__}'. "
                    f"A `BaseStep` subclass was provided instead of an "
                    f"instance. "
                    f"This might have been caused due to missing brackets of "
                    f"your steps when creating a pipeline with `@step` "
                    f"decorated functions, "
                    f"for which the correct syntax is `pipeline(step=step())`."
                )
            if not isinstance(potential_step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{__name__}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )
            steps[key] = potential_step
            setattr(potential_step, TEMPLATE_NAME_ATTRIBUTE, key)
        return steps
source_object: Any
  
      property
      readonly
  
    The source object of this pipeline.
Returns:
| Type | Description | 
|---|---|
| Any | The source object of this pipeline. | 
steps: Dict[str, zenml.steps.base_step.BaseStep]
  
      property
      readonly
  
    Returns the steps of the pipeline.
Returns:
| Type | Description | 
|---|---|
| Dict[str, zenml.steps.base_step.BaseStep] | The steps of the pipeline. | 
__init__(self, *args, **kwargs)
  
      special
  
    Initializes a pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Initialization arguments. | () | 
| **kwargs | Any | Initialization keyword arguments. | {} | 
Source code in zenml/pipelines/base_pipeline.py
          def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initializes a pipeline.
    Args:
        *args: Initialization arguments.
        **kwargs: Initialization keyword arguments.
    """
    config = self._CLASS_CONFIGURATION or {}
    pipeline_name = (
        config.pop(PARAM_PIPELINE_NAME, None) or self.__class__.__name__
    )
    self._steps = self._verify_steps(
        *args, __name__=pipeline_name, **kwargs
    )
    def entrypoint() -> None:
        self.connect(**self._steps)
    super().__init__(
        name=pipeline_name,
        entrypoint=entrypoint,
        **config,
    )
connect(self, *args, **kwargs)
    Abstract method that connects the pipeline steps.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | BaseStep | Connect method arguments. | () | 
| **kwargs | BaseStep | Connect method keyword arguments. | {} | 
Source code in zenml/pipelines/base_pipeline.py
          @abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Abstract method that connects the pipeline steps.
    Args:
        *args: Connect method arguments.
        **kwargs: Connect method keyword arguments.
    """
    raise NotImplementedError
resolve(self)
    Resolves the pipeline.
Returns:
| Type | Description | 
|---|---|
| Source | The pipeline source. | 
Source code in zenml/pipelines/base_pipeline.py
          def resolve(self) -> "Source":
    """Resolves the pipeline.
    Returns:
        The pipeline source.
    """
    return source_utils.resolve(self.__class__)
run(self, *, run_name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, schedule=None, build=None, settings=None, step_configurations=None, extra=None, config_path=None, unlisted=False, prevent_build_reuse=False)
    Runs the pipeline on the active stack.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_name | Optional[str] | Name of the pipeline run. | None | 
| enable_cache | Optional[bool] | If caching should be enabled for this pipeline run. | None | 
| enable_artifact_metadata | Optional[bool] | If artifact metadata should be enabled for this pipeline run. | None | 
| enable_artifact_visualization | Optional[bool] | If artifact visualization should be enabled for this pipeline run. | None | 
| enable_step_logs | Optional[bool] | If step logs should be enabled for this pipeline run. | None | 
| schedule | Optional[zenml.config.schedule.Schedule] | Optional schedule to use for the run. | None | 
| build | Union[str, UUID, PipelineBuildBase] | Optional build to use for the run. | None | 
| settings | Optional[Mapping[str, SettingsOrDict]] | Settings for this pipeline run. | None | 
| step_configurations | Optional[Mapping[str, StepConfigurationUpdateOrDict]] | Configurations for steps of the pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline run. | None | 
| config_path | Optional[str] | Path to a yaml configuration file. This file will
be parsed as a
 | None | 
| unlisted | bool | Whether the pipeline run should be unlisted (not assigned to any pipeline). | False | 
| prevent_build_reuse | bool | Whether to prevent the reuse of a build. | False | 
Source code in zenml/pipelines/base_pipeline.py
          def run(
    self,
    *,
    run_name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBase", None] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    extra: Optional[Dict[str, Any]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
    prevent_build_reuse: bool = False,
) -> None:
    """Runs the pipeline on the active stack.
    Args:
        run_name: Name of the pipeline run.
        enable_cache: If caching should be enabled for this pipeline run.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this pipeline run.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline run.
        enable_step_logs: If step logs should be enabled for this pipeline
            run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        settings: Settings for this pipeline run.
        step_configurations: Configurations for steps of the pipeline.
        extra: Extra configurations for this pipeline run.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
        prevent_build_reuse: Whether to prevent the reuse of a build.
    """
    pipeline_copy = self.with_options(
        run_name=run_name,
        schedule=schedule,
        build=build,
        step_configurations=step_configurations,
        config_path=config_path,
        unlisted=unlisted,
        prevent_build_reuse=prevent_build_reuse,
    )
    new_run_args = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "enable_step_logs": enable_step_logs,
            "settings": settings,
            "extra": extra,
        }
    )
    pipeline_copy._run_args.update(new_run_args)
    pipeline_copy()
        pipeline_decorator
    Legacy ZenML pipeline decorator definition.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None)
    Outer decorator function for the creation of a ZenML pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| _func | Optional[~F] | The decorated function. | None | 
| name | Optional[str] | The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. | None | 
| enable_cache | Optional[bool] | Whether to use caching or not. | None | 
| enable_artifact_metadata | Optional[bool] | Whether to enable artifact metadata or not. | None | 
| enable_artifact_visualization | Optional[bool] | Whether to enable artifact visualization. | None | 
| enable_step_logs | Optional[bool] | Whether to enable step logs. | None | 
| settings | Optional[Dict[str, SettingsOrDict]] | Settings for this pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline. | None | 
| on_failure | Optional[HookSpecification] | Callback function in event of failure of the step. Can be a
function with a single argument of type  | None | 
| on_success | Optional[HookSpecification] | Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g.  | None | 
| model | Optional[Model] | configuration of the model in the Model Control Plane. | None | 
Returns:
| Type | Description | 
|---|---|
| Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]] | the inner decorator which creates the pipeline class based on the ZenML BasePipeline | 
Source code in zenml/pipelines/pipeline_decorator.py
          def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline.
    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_artifact_visualization: Whether to enable artifact visualization.
        enable_step_logs: Whether to enable step logs.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """
    def inner_decorator(func: F) -> Type[BasePipeline]:
        pipeline_name = name or func.__name__
        logger.warning(
            "The `@pipeline` decorator that you used to define your "
            f"{pipeline_name} pipeline is deprecated. Check out the 0.40.0 "
            "migration guide for more information on how to migrate your "
            "pipelines to the new syntax: "
            "https://docs.zenml.io/reference/migration-guide/migration-zero-forty.html"
        )
        return type(
            name or func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),
                CLASS_CONFIGURATION: {
                    PARAM_PIPELINE_NAME: name,
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_ENABLE_ARTIFACT_METADATA: enable_artifact_metadata,
                    PARAM_ENABLE_ARTIFACT_VISUALIZATION: enable_artifact_visualization,
                    PARAM_ENABLE_STEP_LOGS: enable_step_logs,
                    PARAM_SETTINGS: settings,
                    PARAM_EXTRA_OPTIONS: extra,
                    PARAM_ON_FAILURE: on_failure,
                    PARAM_ON_SUCCESS: on_success,
                    PARAM_MODEL: model,
                },
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )
    return inner_decorator if _func is None else inner_decorator(_func)