Skip to content

Pipelines

zenml.pipelines special

A ZenML pipeline consists of tasks that execute in order and yield artifacts.

The artifacts are automatically stored within the artifact store and metadata is tracked by ZenML. Each individual task within a pipeline is known as a step. The standard pipelines within ZenML are designed to have easy interfaces to add pre-decided steps, with the order also pre-decided. Other sorts of pipelines can be created as well from scratch, building on the BasePipeline class.

Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.

base_pipeline

Abstract base class for all ZenML pipelines.

BasePipeline

Abstract base class for all ZenML pipelines.

Attributes:

Name Type Description
name

The name of this pipeline.

enable_cache

A boolean indicating if caching is enabled for this pipeline.

enable_artifact_metadata

A boolean indicating if artifact metadata is enabled for this pipeline.

Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(metaclass=BasePipelineMeta):
    """Abstract base class for all ZenML pipelines.

    Attributes:
        name: The name of this pipeline.
        enable_cache: A boolean indicating if caching is enabled for this
            pipeline.
        enable_artifact_metadata: A boolean indicating if artifact metadata
            is enabled for this pipeline.
    """

    STEP_SPEC: ClassVar[Dict[str, Any]] = None  # type: ignore[assignment]

    INSTANCE_CONFIGURATION: Dict[str, Any] = {}

    def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
        """Initialize the BasePipeline.

        Args:
            *args: The steps to be executed by this pipeline.
            **kwargs: The configuration for this pipeline.
        """
        kwargs.update(self.INSTANCE_CONFIGURATION)

        self._configuration = PipelineConfiguration(
            name=self.__class__.__name__,
            enable_cache=kwargs.pop(PARAM_ENABLE_CACHE, None),
            enable_artifact_metadata=kwargs.pop(
                PARAM_ENABLE_ARTIFACT_METADATA, None
            ),
        )
        self._apply_class_configuration(kwargs)

        self.__steps: Dict[str, BaseStep] = {}
        self._verify_steps(*args, **kwargs)

    @property
    def name(self) -> str:
        """The name of the pipeline.

        Returns:
            The name of the pipeline.
        """
        return self.configuration.name

    @property
    def enable_cache(self) -> Optional[bool]:
        """If caching is enabled for the pipeline.

        Returns:
            If caching is enabled for the pipeline.
        """
        return self.configuration.enable_cache

    @property
    def configuration(self) -> PipelineConfiguration:
        """The configuration of the pipeline.

        Returns:
            The configuration of the pipeline.
        """
        return self._configuration

    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns a dictionary of pipeline steps.

        Returns:
            A dictionary of pipeline steps.
        """
        return self.__steps

    @classmethod
    def from_model(cls: Type[T], model: "PipelineResponseModel") -> T:
        """Creates a pipeline instance from a model.

        Args:
            model: The model to load the pipeline instance from.

        Returns:
            The pipeline instance.

        Raises:
            ValueError: If the spec version of the given model is <0.2
        """
        if version.parse(model.spec.version) < version.parse("0.2"):
            raise ValueError(
                "Loading a pipeline is only possible for pipeline specs with "
                "version 0.2 or higher."
            )

        steps = cls._load_and_verify_steps(pipeline_spec=model.spec)
        connect_method = cls._generate_connect_method(model=model)

        pipeline_class: Type[T] = type(
            model.name,
            (cls,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(connect_method),
                "__doc__": model.docstring,
            },
        )

        pipeline_instance = pipeline_class(**steps)

        version_hash = pipeline_instance._compute_unique_identifier(
            pipeline_spec=model.spec
        )
        if version_hash != model.version_hash:
            logger.warning(
                "Trying to load pipeline version `%s`, but the local step code "
                "changed since this pipeline version was registered. Using "
                "this pipeline instance will result in a different pipeline "
                "version being registered or reused.",
                model.version,
            )

        return pipeline_instance

    def configure(
        self: T,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        merge: bool = True,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
    ) -> T:
        """Configures the pipeline.

        Configuration merging example:
        * `merge==True`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=True)
            pipeline.configuration.extra # {"key1": 1, "key2": 2}
        * `merge==False`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=False)
            pipeline.configuration.extra # {"key2": 2}

        Args:
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            merge: If `True`, will merge the given dictionary configurations
                like `extra` and `settings` with existing
                configurations. If `False` the given configurations will
                overwrite all existing ones. See the general description of this
                method for an example.
            on_failure: Callback function in event of failure of the step. Can be
                a function with three possible parameters, `StepContext`, `BaseParameters`,
                and `BaseException`, or a source path to a function of the same specifications
                (e.g. `module.my_function`)
            on_success: Callback function in event of failure of the step. Can be
                a function with two possible parameters, `StepContext` and `BaseParameters, or
                a source path to a function of the same specifications
                (e.g. `module.my_function`).

        Returns:
            The pipeline instance that this method was called on.
        """
        failure_hook_source = None
        if on_failure:
            # string of on_failure hook function to be used for this pipeline
            failure_hook_source = resolve_and_validate_hook(on_failure)

        success_hook_source = None
        if on_success:
            # string of on_success hook function to be used for this pipeline
            success_hook_source = resolve_and_validate_hook(on_success)

        values = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "settings": settings,
                "extra": extra,
                "failure_hook_source": failure_hook_source,
                "success_hook_source": success_hook_source,
            }
        )
        config = PipelineConfigurationUpdate(**values)
        self._apply_configuration(config, merge=merge)
        return self

    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Function that connects inputs and outputs of the pipeline steps.

        Args:
            *args: The positional arguments passed to the pipeline.
            **kwargs: The keyword arguments passed to the pipeline.

        Raises:
            NotImplementedError: Always.
        """
        raise NotImplementedError

    def register(self) -> "PipelineResponseModel":
        """Register the pipeline in the server.

        Returns:
            The registered pipeline model.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        custom_configurations = self.configuration.dict(
            exclude_defaults=True, exclude={"name"}
        )
        if custom_configurations:
            logger.warning(
                f"The pipeline `{self.name}` that you're registering has "
                "custom configurations applied to it. These will not be "
                "registered with the pipeline and won't be set when you build "
                "images or run the pipeline from the CLI. To provide these "
                "configurations, use the `--config` option of the `zenml "
                "pipeline build/run` commands."
            )

        pipeline_spec = Compiler().compile_spec(self)
        return self._register(pipeline_spec=pipeline_spec)

    def build(
        self,
        *,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        config_path: Optional[str] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Builds Docker images for the pipeline.

        Args:
            settings: Settings for the pipeline.
            step_configurations: Configurations for steps of the pipeline.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.

        Returns:
            The build output.
        """
        with event_handler(event=AnalyticsEvent.BUILD_PIPELINE, v2=True):
            deployment, pipeline_spec, _, _ = self._compile(
                config_path=config_path,
                steps=step_configurations,
                settings=settings,
            )
            pipeline_id = self._register(pipeline_spec=pipeline_spec).id

            return self._build(deployment=deployment, pipeline_id=pipeline_id)

    def run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
    ) -> None:
        """Runs the pipeline on the active stack of the current repository.

        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls inside docker containers which should only
            # run a single step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return

        with event_handler(
            event=AnalyticsEvent.RUN_PIPELINE, v2=True
        ) as analytics_handler:
            deployment, pipeline_spec, schedule, build = self._compile(
                config_path=config_path,
                run_name=run_name,
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                steps=step_configurations,
                settings=settings,
                schedule=schedule,
                build=build,
                extra=extra,
            )

            skip_pipeline_registration = constants.handle_bool_env_var(
                constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
                default=False,
            )

            register_pipeline = not (skip_pipeline_registration or unlisted)

            pipeline_id = None
            if register_pipeline:
                pipeline_id = self._register(pipeline_spec=pipeline_spec).id

            # TODO: check whether orchestrator even support scheduling before
            # registering the schedule
            schedule_id = None
            if schedule:
                if schedule.name:
                    schedule_name = schedule.name
                else:
                    date = datetime.utcnow().strftime("%Y_%m_%d")
                    time = datetime.utcnow().strftime("%H_%M_%S_%f")
                    schedule_name = deployment.run_name_template.format(
                        date=date, time=time
                    )
                components = Client().active_stack_model.components
                orchestrator = components[StackComponentType.ORCHESTRATOR][0]
                schedule_model = ScheduleRequestModel(
                    workspace=Client().active_workspace.id,
                    user=Client().active_user.id,
                    pipeline_id=pipeline_id,
                    orchestrator_id=orchestrator.id,
                    name=schedule_name,
                    active=True,
                    cron_expression=schedule.cron_expression,
                    start_time=schedule.start_time,
                    end_time=schedule.end_time,
                    interval_second=schedule.interval_second,
                    catchup=schedule.catchup,
                )
                schedule_id = (
                    Client().zen_store.create_schedule(schedule_model).id
                )
                logger.info(
                    f"Created schedule `{schedule_name}` for pipeline "
                    f"`{deployment.pipeline_configuration.name}`."
                )

            stack = Client().active_stack

            build_model = self._load_or_create_pipeline_build(
                deployment=deployment,
                pipeline_spec=pipeline_spec,
                pipeline_id=pipeline_id,
                build=build,
            )
            build_id = build_model.id if build_model else None

            deployment_request = PipelineDeploymentRequestModel(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=stack.id,
                pipeline=pipeline_id,
                build=build_id,
                schedule=schedule_id,
                **deployment.dict(),
            )
            deployment_model = Client().zen_store.create_deployment(
                deployment=deployment_request
            )

            analytics_handler.metadata = self._get_pipeline_analytics_metadata(
                deployment=deployment_model, stack=stack
            )
            caching_status = (
                "enabled"
                if deployment.pipeline_configuration.enable_cache is not False
                else "disabled"
            )
            logger.info(
                "%s %s on stack `%s` (caching %s)",
                "Scheduling" if deployment_model.schedule else "Running",
                f"pipeline `{deployment_model.pipeline_configuration.name}`"
                if register_pipeline
                else "unlisted pipeline",
                stack.name,
                caching_status,
            )

            stack.prepare_pipeline_deployment(deployment=deployment_model)

            # Prevent execution of nested pipelines which might lead to
            # unexpected behavior
            constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
            try:
                stack.deploy_pipeline(
                    deployment=deployment_model,
                )
            finally:
                constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False

            # Log the dashboard URL
            dashboard_utils.print_run_url(
                run_name=deployment.run_name_template,
                pipeline_id=pipeline_id,
            )

    @classmethod
    def get_runs(cls) -> Optional[List["PipelineRunView"]]:
        """Get all past runs from the associated PipelineView.

        Returns:
            A list of all past PipelineRunViews.

        Raises:
            RuntimeError: In case the repository does not contain the view
                of the current pipeline.
        """
        from zenml.post_execution import get_pipeline

        pipeline_view = get_pipeline(cls)
        if pipeline_view:
            return pipeline_view.runs  # type: ignore[no-any-return]
        else:
            raise RuntimeError(
                f"The PipelineView for `{cls.__name__}` could "
                f"not be found. Are you sure this pipeline has "
                f"been run already?"
            )

    def write_run_configuration_template(
        self, path: str, stack: Optional["Stack"] = None
    ) -> None:
        """Writes a run configuration yaml template.

        Args:
            path: The path where the template will be written.
            stack: The stack for which the template should be generated. If
                not given, the active stack will be used.
        """
        from zenml.config.base_settings import ConfigurationLevel
        from zenml.config.step_configurations import (
            PartialArtifactConfiguration,
        )

        stack = stack or Client().active_stack

        setting_classes = stack.setting_classes
        setting_classes.update(settings_utils.get_general_settings())

        pipeline_settings = {}
        step_settings = {}
        for key, setting_class in setting_classes.items():
            fields = pydantic_utils.TemplateGenerator(setting_class).run()
            if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
                pipeline_settings[key] = fields
            if ConfigurationLevel.STEP in setting_class.LEVEL:
                step_settings[key] = fields

        steps = {}
        for step_name, step in self.steps.items():
            parameters = (
                pydantic_utils.TemplateGenerator(step.PARAMETERS_CLASS).run()
                if step.PARAMETERS_CLASS
                else {}
            )
            outputs = {
                name: PartialArtifactConfiguration()
                for name in step.OUTPUT_SIGNATURE
            }
            step_template = StepConfigurationUpdate(
                parameters=parameters,
                settings=step_settings,
                outputs=outputs,
            )
            steps[step_name] = step_template

        run_config = PipelineRunConfiguration(
            settings=pipeline_settings, steps=steps
        )
        template = pydantic_utils.TemplateGenerator(run_config).run()
        yaml_string = yaml.dump(template)
        yaml_string = yaml_utils.comment_out_yaml(yaml_string)

        with open(path, "w") as f:
            f.write(yaml_string)

    def _apply_class_configuration(self, options: Dict[str, Any]) -> None:
        """Applies the configurations specified on the pipeline class.

        Args:
            options: Class configurations.
        """
        settings = options.pop(PARAM_SETTINGS, None)
        extra = options.pop(PARAM_EXTRA_OPTIONS, None)
        on_failure = options.pop(PARAM_ON_FAILURE, None)
        on_success = options.pop(PARAM_ON_SUCCESS, None)
        self.configure(
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
        )

    def _apply_configuration(
        self,
        config: PipelineConfigurationUpdate,
        merge: bool = True,
    ) -> None:
        """Applies an update to the pipeline configuration.

        Args:
            config: The configuration update.
            merge: Whether to merge the updates with the existing configuration
                or not. See the `BasePipeline.configure(...)` method for a
                detailed explanation.
        """
        self._validate_configuration(config)
        self._configuration = pydantic_utils.update_model(
            self._configuration, update=config, recursive=merge
        )
        logger.debug("Updated pipeline configuration:")
        logger.debug(self._configuration)

    @staticmethod
    def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
        """Validates a configuration update.

        Args:
            config: The configuration update to validate.
        """
        settings_utils.validate_setting_keys(list(config.settings))

    def _verify_steps(self, *steps: BaseStep, **kw_steps: Any) -> None:
        """Verifies the initialization args and kwargs of this pipeline.

        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline. If
        all arguments are correct, saves the steps to `self.__steps`.

        Args:
            *steps: The args passed to the init method of this pipeline.
            **kw_steps: The kwargs passed to the init method of this pipeline.

        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.
        """
        input_step_keys = list(self.STEP_SPEC.keys())
        if len(steps) > len(input_step_keys):
            raise PipelineInterfaceError(
                f"Too many input steps for pipeline '{self.name}'. "
                f"This pipeline expects {len(input_step_keys)} step(s) "
                f"but got {len(steps) + len(kw_steps)}."
            )

        combined_steps = {}
        step_ids: Dict[int, str] = {}

        def _verify_step(key: str, step: BaseStep) -> None:
            """Verifies a single step of the pipeline.

            Args:
                key: The key of the step.
                step: The step to verify.

            Raises:
                PipelineInterfaceError: If the step is not of the correct type
                    or is of the same class as another step.
            """
            step_class = type(step)

            if isinstance(step, BaseStepMeta):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. "
                    f"A `BaseStep` subclass was provided instead of an "
                    f"instance. "
                    f"This might have been caused due to missing brackets of "
                    f"your steps when creating a pipeline with `@step` "
                    f"decorated functions, "
                    f"for which the correct syntax is `pipeline(step=step())`."
                )

            if not isinstance(step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )

            if id(step) in step_ids:
                previous_key = step_ids[id(step)]
                raise PipelineInterfaceError(
                    f"Found the same step object for arguments "
                    f"'{previous_key}' and '{key}' in pipeline '{self.name}'. "
                    "Step object cannot be reused inside a ZenML pipeline. "
                    "A possible solution is to create two instances of the "
                    "same step class and assigning them different names: "
                    "`first_instance = step_class(name='s1')` and "
                    "`second_instance = step_class(name='s2')`."
                )

            step_ids[id(step)] = key
            combined_steps[key] = step

        # verify args
        for i, step in enumerate(steps):
            key = input_step_keys[i]
            _verify_step(key, step)

        # verify kwargs
        for key, step in kw_steps.items():
            if key in combined_steps:
                # a step for this key was already set by
                # the positional input steps
                raise PipelineInterfaceError(
                    f"Unexpected keyword argument '{key}' for pipeline "
                    f"'{self.name}'. A step for this key was "
                    f"already passed as a positional argument."
                )
            _verify_step(key, step)

        # check if there are any missing or unexpected steps
        expected_steps = set(self.STEP_SPEC.keys())
        actual_steps = set(combined_steps.keys())
        missing_steps = expected_steps - actual_steps
        unexpected_steps = actual_steps - expected_steps

        if missing_steps:
            raise PipelineInterfaceError(
                f"Missing input step(s) for pipeline "
                f"'{self.name}': {missing_steps}."
            )

        if unexpected_steps:
            raise PipelineInterfaceError(
                f"Unexpected input step(s) for pipeline "
                f"'{self.name}': {unexpected_steps}. This pipeline "
                f"only requires the following steps: {expected_steps}."
            )

        self.__steps = combined_steps

    def _get_pipeline_analytics_metadata(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
    ) -> Dict[str, Any]:
        """Returns the pipeline deployment metadata.

        Args:
            deployment: The pipeline deployment to track.
            stack: The stack on which the pipeline will be deployed.

        Returns:
            the metadata about the pipeline deployment
        """
        custom_materializer = False
        for step in deployment.step_configurations.values():
            for output in step.config.outputs.values():
                if not output.materializer_source.startswith("zenml."):
                    custom_materializer = True

        stack_creator = Client().get_stack(stack.id).user
        active_user = Client().active_user
        own_stack = stack_creator and stack_creator.id == active_user.id

        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        return {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "total_steps": len(self.steps),
            "schedule": bool(deployment.schedule),
            "custom_materializer": custom_materializer,
            "own_stack": own_stack,
        }

    @staticmethod
    def _load_and_verify_steps(
        pipeline_spec: "PipelineSpec",
    ) -> Dict[str, BaseStep]:
        """Loads steps and verifies their names and inputs/outputs names.

        Args:
            pipeline_spec: The pipeline spec from which to load the steps.

        Raises:
            RuntimeError: If the step names or input/output names of the
                loaded steps don't match with the names defined in the spec.

        Returns:
            The loaded steps.
        """
        steps = {}
        available_outputs: Dict[str, Set[str]] = {}

        for step_spec in pipeline_spec.steps:
            for upstream_step in step_spec.upstream_steps:
                if upstream_step not in available_outputs:
                    raise RuntimeError(
                        f"Unable to find upstream step `{upstream_step}`. "
                        "This is probably because the step was renamed in code."
                    )

            for input_spec in step_spec.inputs.values():
                if (
                    input_spec.output_name
                    not in available_outputs[input_spec.step_name]
                ):
                    raise RuntimeError(
                        f"Missing output `{input_spec.output_name}` for step "
                        f"`{input_spec.step_name}`. This is probably because "
                        "the output of the step was renamed."
                    )

            step = BaseStep.load_from_source(step_spec.source)
            input_names = set(step.INPUT_SIGNATURE)
            spec_input_names = set(step_spec.inputs)

            if input_names != spec_input_names:
                raise RuntimeError(
                    f"Input names of step {step_spec.source} and the spec "
                    f"from the database don't match. Step inputs: "
                    f"`{input_names}`, spec inputs: `{spec_input_names}`."
                )

            steps[step_spec.pipeline_parameter_name] = step
            available_outputs[step.name] = set(step.OUTPUT_SIGNATURE.keys())

        return steps

    @staticmethod
    def _generate_connect_method(
        model: "PipelineResponseModel",
    ) -> Callable[..., None]:
        """Dynamically generates a connect method for a pipeline model.

        Args:
            model: The model for which to generate the method.

        Returns:
            The generated connect method.
        """

        def connect(**steps: BaseStep) -> None:
            # Bind **steps to the connect signature assigned to this method
            # below. This ensures that the method inputs get verified and only
            # the arguments defined in the signature are allowed
            inspect.signature(connect).bind(**steps)

            step_outputs: Dict[str, Dict[str, BaseStep._OutputArtifact]] = {}
            for step_spec in model.spec.steps:
                step = steps[step_spec.pipeline_parameter_name]

                step_inputs = {}
                for input_name, input_ in step_spec.inputs.items():
                    try:
                        upstream_step = step_outputs[input_.step_name]
                        step_input = upstream_step[input_.output_name]
                        step_inputs[input_name] = step_input
                    except KeyError:
                        raise RuntimeError(
                            f"Unable to find upstream step "
                            f"`{input_.step_name}` in pipeline `{model.name}`. "
                            "This is probably due to configuring a new step "
                            "name after loading a pipeline using "
                            "`BasePipeline.from_model`."
                        )

                step_output = step(**step_inputs)
                output_keys = list(step.OUTPUT_SIGNATURE.keys())

                if isinstance(step_output, BaseStep._OutputArtifact):
                    step_output = [step_output]

                step_outputs[step.name] = {
                    key: step_output[i] for i, key in enumerate(output_keys)
                }

        # Create the connect method signature based on the expected steps
        parameters = [
            inspect.Parameter(
                name=step_spec.pipeline_parameter_name,
                kind=inspect.Parameter.POSITIONAL_OR_KEYWORD,
            )
            for step_spec in model.spec.steps
        ]
        signature = inspect.Signature(parameters=parameters)
        connect.__signature__ = signature  # type: ignore[attr-defined]

        return connect

    def _compile(
        self, config_path: Optional[str] = None, **run_configuration_args: Any
    ) -> Tuple[
        "PipelineDeploymentBaseModel",
        "PipelineSpec",
        Optional["Schedule"],
        Union["PipelineBuildBaseModel", UUID, None],
    ]:
        """Compiles the pipeline.

        Args:
            config_path: Path to a config file.
            **run_configuration_args: Configurations for the pipeline run.

        Returns:
            A tuple containing the deployment, spec, schedule and build of
            the compiled pipeline.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        if config_path:
            run_config = PipelineRunConfiguration.from_yaml(config_path)
        else:
            run_config = PipelineRunConfiguration()

        new_values = dict_utils.remove_none_values(run_configuration_args)
        update = PipelineRunConfiguration.parse_obj(new_values)

        # Update with the values in code so they take precedence
        run_config = pydantic_utils.update_model(run_config, update=update)

        deployment, pipeline_spec = Compiler().compile(
            pipeline=self,
            stack=Client().active_stack,
            run_configuration=run_config,
        )

        return deployment, pipeline_spec, run_config.schedule, run_config.build

    def _register(
        self, pipeline_spec: "PipelineSpec"
    ) -> "PipelineResponseModel":
        """Register the pipeline in the server.

        Args:
            pipeline_spec: The pipeline spec to register.

        Returns:
            The registered pipeline model.
        """
        version_hash = self._compute_unique_identifier(
            pipeline_spec=pipeline_spec
        )

        client = Client()
        matching_pipelines = client.list_pipelines(
            name=self.name,
            version_hash=version_hash,
            size=1,
            sort_by="desc:created",
        )
        if matching_pipelines.total:
            registered_pipeline = matching_pipelines.items[0]
            logger.info(
                "Reusing registered pipeline `%s` (version: %s).",
                registered_pipeline.name,
                registered_pipeline.version,
            )
            return registered_pipeline

        latest_version = self._get_latest_version() or 0
        version = str(latest_version + 1)

        request = PipelineRequestModel(
            workspace=client.active_workspace.id,
            user=client.active_user.id,
            name=self.name,
            version=version,
            version_hash=version_hash,
            spec=pipeline_spec,
            docstring=self.__doc__,
        )

        registered_pipeline = client.zen_store.create_pipeline(
            pipeline=request
        )
        logger.info(
            "Registered pipeline `%s` (version %s).",
            registered_pipeline.name,
            registered_pipeline.version,
        )
        return registered_pipeline

    def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
        """Computes a unique identifier from the pipeline spec and steps.

        Args:
            pipeline_spec: Compiled spec of the pipeline.

        Returns:
            The unique identifier of the pipeline.
        """
        hash_ = hashlib.md5()
        hash_.update(pipeline_spec.json(sort_keys=False).encode())

        for step_spec in pipeline_spec.steps:
            step_source = self.steps[
                step_spec.pipeline_parameter_name
            ].source_code
            hash_.update(step_source.encode())

        return hash_.hexdigest()

    def _get_latest_version(self) -> Optional[int]:
        """Gets the latest version of this pipeline.

        Returns:
            The latest version or `None` if no version exists.
        """
        all_pipelines = Client().list_pipelines(
            name=self.name, sort_by="desc:created", size=1
        )
        if all_pipelines.total:
            pipeline = all_pipelines.items[0]
            if pipeline.version == "UNVERSIONED":
                return None
            return int(all_pipelines.items[0].version)
        else:
            return None

    def _get_registered_model(self) -> Optional[PipelineResponseModel]:
        """Gets the registered pipeline model for this instance.

        Returns:
            The registered pipeline model or None if no model is registered yet.
        """
        pipeline_spec = Compiler().compile_spec(self)
        version_hash = self._compute_unique_identifier(
            pipeline_spec=pipeline_spec
        )

        pipelines = Client().list_pipelines(
            name=self.name, version_hash=version_hash
        )
        if len(pipelines) == 1:
            return pipelines.items[0]

        return None

    def _load_or_create_pipeline_build(
        self,
        deployment: "PipelineDeploymentBaseModel",
        pipeline_spec: "PipelineSpec",
        pipeline_id: Optional[UUID] = None,
        build: Union["UUID", "PipelineBuildBaseModel", None] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Loads or creates a pipeline build.

        Args:
            deployment: The pipeline deployment for which to load or create the
                build.
            pipeline_spec: Spec of the pipeline.
            pipeline_id: Optional ID of the pipeline to reference in the build.
            build: Optional existing build. If given, the build will be loaded
                (or registered) in the database. If not given, a new build will
                be created.

        Returns:
            The build response.
        """
        if not build:
            return self._build(deployment=deployment, pipeline_id=pipeline_id)

        logger.info(
            "Using an old build for a pipeline run can lead to "
            "unexpected behavior as the pipeline will run with the step "
            "code that was included in the Docker images which might "
            "differ from the code in your client environment."
        )

        build_model = None

        if isinstance(build, UUID):
            build_model = Client().zen_store.get_build(build_id=build)

            if build_model.pipeline:
                build_hash = build_model.pipeline.version_hash
                current_hash = self._compute_unique_identifier(
                    pipeline_spec=pipeline_spec
                )

                if build_hash != current_hash:
                    logger.warning(
                        "The pipeline associated with the build you "
                        "specified for this run has a different spec "
                        "or step code. This might lead to unexpected "
                        "behavior as this pipeline run will use the "
                        "code that was included in the Docker images which "
                        "might differ from the code in your client "
                        "environment."
                    )
        else:
            build_request = PipelineBuildRequestModel(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=Client().active_stack_model.id,
                pipeline=pipeline_id,
                **build.dict(),
            )
            build_model = Client().zen_store.create_build(build=build_request)

        if build_model.is_local:
            logger.warning(
                "You're using a local build to run your pipeline. This "
                "might lead to errors if the images don't exist on "
                "your local machine or the image tags have been "
                "overwritten since the original build happened."
            )

        return build_model

    def _build(
        self,
        deployment: "PipelineDeploymentBaseModel",
        pipeline_id: Optional[UUID] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Builds images and registers the output in the server.

        Args:
            deployment: The compiled pipeline deployment.
            pipeline_id: The ID of the pipeline.

        Returns:
            The build output.

        Raises:
            RuntimeError: If multiple builds with the same key but different
                settings were specified.
        """
        client = Client()
        stack = client.active_stack
        required_builds = stack.get_docker_builds(deployment=deployment)

        if not required_builds:
            logger.debug("No docker builds required.")
            return None

        logger.info(
            "Building Docker image(s) for pipeline `%s`.",
            deployment.pipeline_configuration.name,
        )

        docker_image_builder = PipelineDockerImageBuilder()
        images: Dict[str, BuildItem] = {}
        image_names: Dict[str, str] = {}

        for build_config in required_builds:
            combined_key = PipelineBuildBaseModel.get_image_key(
                component_key=build_config.key, step=build_config.step_name
            )
            checksum = build_config.compute_settings_checksum(stack=stack)

            if combined_key in images:
                previous_checksum = images[combined_key].settings_checksum

                if previous_checksum != checksum:
                    raise RuntimeError(
                        f"Trying to build image for key `{combined_key}` but "
                        "an image for this key was already built with a "
                        "different configuration. This happens if multiple "
                        "stack components specified Docker builds for the same "
                        "key in the `StackComponent.get_docker_builds(...)` "
                        "method. If you're using custom components, make sure "
                        "to provide unique keys when returning your build "
                        "configurations to avoid this error."
                    )
                else:
                    continue

            if checksum in image_names:
                image_name_or_digest = image_names[checksum]
            else:
                tag = deployment.pipeline_configuration.name
                if build_config.step_name:
                    tag += f"-{build_config.step_name}"
                tag += f"-{build_config.key}"

                image_name_or_digest = docker_image_builder.build_docker_image(
                    docker_settings=build_config.settings,
                    tag=tag,
                    stack=stack,
                    entrypoint=build_config.entrypoint,
                    extra_files=build_config.extra_files,
                )

            images[combined_key] = BuildItem(
                image=image_name_or_digest, settings_checksum=checksum
            )
            image_names[checksum] = image_name_or_digest

        logger.info("Finished building Docker image(s).")

        is_local = stack.container_registry is None
        build_request = PipelineBuildRequestModel(
            user=client.active_user.id,
            workspace=client.active_workspace.id,
            stack=client.active_stack_model.id,
            pipeline=pipeline_id,
            is_local=is_local,
            images=images,
        )
        return client.zen_store.create_build(build_request)
configuration: PipelineConfiguration property readonly

The configuration of the pipeline.

Returns:

Type Description
PipelineConfiguration

The configuration of the pipeline.

enable_cache: Optional[bool] property readonly

If caching is enabled for the pipeline.

Returns:

Type Description
Optional[bool]

If caching is enabled for the pipeline.

name: str property readonly

The name of the pipeline.

Returns:

Type Description
str

The name of the pipeline.

steps: Dict[str, zenml.steps.base_step.BaseStep] property readonly

Returns a dictionary of pipeline steps.

Returns:

Type Description
Dict[str, zenml.steps.base_step.BaseStep]

A dictionary of pipeline steps.

__init__(self, *args, **kwargs) special

Initialize the BasePipeline.

Parameters:

Name Type Description Default
*args BaseStep

The steps to be executed by this pipeline.

()
**kwargs Any

The configuration for this pipeline.

{}
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
    """Initialize the BasePipeline.

    Args:
        *args: The steps to be executed by this pipeline.
        **kwargs: The configuration for this pipeline.
    """
    kwargs.update(self.INSTANCE_CONFIGURATION)

    self._configuration = PipelineConfiguration(
        name=self.__class__.__name__,
        enable_cache=kwargs.pop(PARAM_ENABLE_CACHE, None),
        enable_artifact_metadata=kwargs.pop(
            PARAM_ENABLE_ARTIFACT_METADATA, None
        ),
    )
    self._apply_class_configuration(kwargs)

    self.__steps: Dict[str, BaseStep] = {}
    self._verify_steps(*args, **kwargs)
build(self, *, settings=None, step_configurations=None, config_path=None)

Builds Docker images for the pipeline.

Parameters:

Name Type Description Default
settings Optional[Mapping[str, SettingsOrDict]]

Settings for the pipeline.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None

Returns:

Type Description
Optional[PipelineBuildResponseModel]

The build output.

Source code in zenml/pipelines/base_pipeline.py
def build(
    self,
    *,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponseModel"]:
    """Builds Docker images for the pipeline.

    Args:
        settings: Settings for the pipeline.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.

    Returns:
        The build output.
    """
    with event_handler(event=AnalyticsEvent.BUILD_PIPELINE, v2=True):
        deployment, pipeline_spec, _, _ = self._compile(
            config_path=config_path,
            steps=step_configurations,
            settings=settings,
        )
        pipeline_id = self._register(pipeline_spec=pipeline_spec).id

        return self._build(deployment=deployment, pipeline_id=pipeline_id)
configure(self, enable_cache=None, enable_artifact_metadata=None, settings=None, extra=None, merge=True, on_failure=None, on_success=None)

Configures the pipeline.

Configuration merging example: * merge==True: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=True) pipeline.configuration.extra # {"key1": 1, "key2": 2} * merge==False: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=False) pipeline.configuration.extra # {"key2": 2}

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
merge bool

If True, will merge the given dictionary configurations like extra and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function)

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None

Returns:

Type Description
~T

The pipeline instance that this method was called on.

Source code in zenml/pipelines/base_pipeline.py
def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    merge: bool = True,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
) -> T:
    """Configures the pipeline.

    Configuration merging example:
    * `merge==True`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=True)
        pipeline.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=False)
        pipeline.configuration.extra # {"key2": 2}

    Args:
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        merge: If `True`, will merge the given dictionary configurations
            like `extra` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.
        on_failure: Callback function in event of failure of the step. Can be
            a function with three possible parameters, `StepContext`, `BaseParameters`,
            and `BaseException`, or a source path to a function of the same specifications
            (e.g. `module.my_function`)
        on_success: Callback function in event of failure of the step. Can be
            a function with two possible parameters, `StepContext` and `BaseParameters, or
            a source path to a function of the same specifications
            (e.g. `module.my_function`).

    Returns:
        The pipeline instance that this method was called on.
    """
    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this pipeline
        failure_hook_source = resolve_and_validate_hook(on_failure)

    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this pipeline
        success_hook_source = resolve_and_validate_hook(on_success)

    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "settings": settings,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
        }
    )
    config = PipelineConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
connect(self, *args, **kwargs)

Function that connects inputs and outputs of the pipeline steps.

Parameters:

Name Type Description Default
*args BaseStep

The positional arguments passed to the pipeline.

()
**kwargs BaseStep

The keyword arguments passed to the pipeline.

{}

Exceptions:

Type Description
NotImplementedError

Always.

Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Function that connects inputs and outputs of the pipeline steps.

    Args:
        *args: The positional arguments passed to the pipeline.
        **kwargs: The keyword arguments passed to the pipeline.

    Raises:
        NotImplementedError: Always.
    """
    raise NotImplementedError
from_model(model) classmethod

Creates a pipeline instance from a model.

Parameters:

Name Type Description Default
model PipelineResponseModel

The model to load the pipeline instance from.

required

Returns:

Type Description
~T

The pipeline instance.

Exceptions:

Type Description
ValueError

If the spec version of the given model is <0.2

Source code in zenml/pipelines/base_pipeline.py
@classmethod
def from_model(cls: Type[T], model: "PipelineResponseModel") -> T:
    """Creates a pipeline instance from a model.

    Args:
        model: The model to load the pipeline instance from.

    Returns:
        The pipeline instance.

    Raises:
        ValueError: If the spec version of the given model is <0.2
    """
    if version.parse(model.spec.version) < version.parse("0.2"):
        raise ValueError(
            "Loading a pipeline is only possible for pipeline specs with "
            "version 0.2 or higher."
        )

    steps = cls._load_and_verify_steps(pipeline_spec=model.spec)
    connect_method = cls._generate_connect_method(model=model)

    pipeline_class: Type[T] = type(
        model.name,
        (cls,),
        {
            PIPELINE_INNER_FUNC_NAME: staticmethod(connect_method),
            "__doc__": model.docstring,
        },
    )

    pipeline_instance = pipeline_class(**steps)

    version_hash = pipeline_instance._compute_unique_identifier(
        pipeline_spec=model.spec
    )
    if version_hash != model.version_hash:
        logger.warning(
            "Trying to load pipeline version `%s`, but the local step code "
            "changed since this pipeline version was registered. Using "
            "this pipeline instance will result in a different pipeline "
            "version being registered or reused.",
            model.version,
        )

    return pipeline_instance
get_runs() classmethod

Get all past runs from the associated PipelineView.

Returns:

Type Description
Optional[List[PipelineRunView]]

A list of all past PipelineRunViews.

Exceptions:

Type Description
RuntimeError

In case the repository does not contain the view of the current pipeline.

Source code in zenml/pipelines/base_pipeline.py
@classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
    """Get all past runs from the associated PipelineView.

    Returns:
        A list of all past PipelineRunViews.

    Raises:
        RuntimeError: In case the repository does not contain the view
            of the current pipeline.
    """
    from zenml.post_execution import get_pipeline

    pipeline_view = get_pipeline(cls)
    if pipeline_view:
        return pipeline_view.runs  # type: ignore[no-any-return]
    else:
        raise RuntimeError(
            f"The PipelineView for `{cls.__name__}` could "
            f"not be found. Are you sure this pipeline has "
            f"been run already?"
        )
register(self)

Register the pipeline in the server.

Returns:

Type Description
PipelineResponseModel

The registered pipeline model.

Source code in zenml/pipelines/base_pipeline.py
def register(self) -> "PipelineResponseModel":
    """Register the pipeline in the server.

    Returns:
        The registered pipeline model.
    """
    # Activating the built-in integrations to load all materializers
    from zenml.integrations.registry import integration_registry

    integration_registry.activate_integrations()

    custom_configurations = self.configuration.dict(
        exclude_defaults=True, exclude={"name"}
    )
    if custom_configurations:
        logger.warning(
            f"The pipeline `{self.name}` that you're registering has "
            "custom configurations applied to it. These will not be "
            "registered with the pipeline and won't be set when you build "
            "images or run the pipeline from the CLI. To provide these "
            "configurations, use the `--config` option of the `zenml "
            "pipeline build/run` commands."
        )

    pipeline_spec = Compiler().compile_spec(self)
    return self._register(pipeline_spec=pipeline_spec)
run(self, *, run_name=None, enable_cache=None, enable_artifact_metadata=None, schedule=None, build=None, settings=None, step_configurations=None, extra=None, config_path=None, unlisted=False)

Runs the pipeline on the active stack of the current repository.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
enable_cache Optional[bool]

If caching should be enabled for this pipeline run.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline run.

None
schedule Optional[zenml.config.schedule.Schedule]

Optional schedule to use for the run.

None
build Union[str, UUID, PipelineBuildBaseModel]

Optional build to use for the run.

None
settings Optional[Mapping[str, SettingsOrDict]]

Settings for this pipeline run.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline run.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None
unlisted bool

Whether the pipeline run should be unlisted (not assigned to any pipeline).

False
Source code in zenml/pipelines/base_pipeline.py
def run(
    self,
    *,
    run_name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    extra: Optional[Dict[str, Any]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
) -> None:
    """Runs the pipeline on the active stack of the current repository.

    Args:
        run_name: Name of the pipeline run.
        enable_cache: If caching should be enabled for this pipeline run.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        settings: Settings for this pipeline run.
        step_configurations: Configurations for steps of the pipeline.
        extra: Extra configurations for this pipeline run.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
    """
    if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
        # An environment variable was set to stop the execution of
        # pipelines. This is done to prevent execution of module-level
        # pipeline.run() calls inside docker containers which should only
        # run a single step.
        logger.info(
            "Preventing execution of pipeline '%s'. If this is not "
            "intended behavior, make sure to unset the environment "
            "variable '%s'.",
            self.name,
            constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
        )
        return

    with event_handler(
        event=AnalyticsEvent.RUN_PIPELINE, v2=True
    ) as analytics_handler:
        deployment, pipeline_spec, schedule, build = self._compile(
            config_path=config_path,
            run_name=run_name,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            steps=step_configurations,
            settings=settings,
            schedule=schedule,
            build=build,
            extra=extra,
        )

        skip_pipeline_registration = constants.handle_bool_env_var(
            constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
            default=False,
        )

        register_pipeline = not (skip_pipeline_registration or unlisted)

        pipeline_id = None
        if register_pipeline:
            pipeline_id = self._register(pipeline_spec=pipeline_spec).id

        # TODO: check whether orchestrator even support scheduling before
        # registering the schedule
        schedule_id = None
        if schedule:
            if schedule.name:
                schedule_name = schedule.name
            else:
                date = datetime.utcnow().strftime("%Y_%m_%d")
                time = datetime.utcnow().strftime("%H_%M_%S_%f")
                schedule_name = deployment.run_name_template.format(
                    date=date, time=time
                )
            components = Client().active_stack_model.components
            orchestrator = components[StackComponentType.ORCHESTRATOR][0]
            schedule_model = ScheduleRequestModel(
                workspace=Client().active_workspace.id,
                user=Client().active_user.id,
                pipeline_id=pipeline_id,
                orchestrator_id=orchestrator.id,
                name=schedule_name,
                active=True,
                cron_expression=schedule.cron_expression,
                start_time=schedule.start_time,
                end_time=schedule.end_time,
                interval_second=schedule.interval_second,
                catchup=schedule.catchup,
            )
            schedule_id = (
                Client().zen_store.create_schedule(schedule_model).id
            )
            logger.info(
                f"Created schedule `{schedule_name}` for pipeline "
                f"`{deployment.pipeline_configuration.name}`."
            )

        stack = Client().active_stack

        build_model = self._load_or_create_pipeline_build(
            deployment=deployment,
            pipeline_spec=pipeline_spec,
            pipeline_id=pipeline_id,
            build=build,
        )
        build_id = build_model.id if build_model else None

        deployment_request = PipelineDeploymentRequestModel(
            user=Client().active_user.id,
            workspace=Client().active_workspace.id,
            stack=stack.id,
            pipeline=pipeline_id,
            build=build_id,
            schedule=schedule_id,
            **deployment.dict(),
        )
        deployment_model = Client().zen_store.create_deployment(
            deployment=deployment_request
        )

        analytics_handler.metadata = self._get_pipeline_analytics_metadata(
            deployment=deployment_model, stack=stack
        )
        caching_status = (
            "enabled"
            if deployment.pipeline_configuration.enable_cache is not False
            else "disabled"
        )
        logger.info(
            "%s %s on stack `%s` (caching %s)",
            "Scheduling" if deployment_model.schedule else "Running",
            f"pipeline `{deployment_model.pipeline_configuration.name}`"
            if register_pipeline
            else "unlisted pipeline",
            stack.name,
            caching_status,
        )

        stack.prepare_pipeline_deployment(deployment=deployment_model)

        # Prevent execution of nested pipelines which might lead to
        # unexpected behavior
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
        try:
            stack.deploy_pipeline(
                deployment=deployment_model,
            )
        finally:
            constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False

        # Log the dashboard URL
        dashboard_utils.print_run_url(
            run_name=deployment.run_name_template,
            pipeline_id=pipeline_id,
        )
write_run_configuration_template(self, path, stack=None)

Writes a run configuration yaml template.

Parameters:

Name Type Description Default
path str

The path where the template will be written.

required
stack Optional[Stack]

The stack for which the template should be generated. If not given, the active stack will be used.

None
Source code in zenml/pipelines/base_pipeline.py
def write_run_configuration_template(
    self, path: str, stack: Optional["Stack"] = None
) -> None:
    """Writes a run configuration yaml template.

    Args:
        path: The path where the template will be written.
        stack: The stack for which the template should be generated. If
            not given, the active stack will be used.
    """
    from zenml.config.base_settings import ConfigurationLevel
    from zenml.config.step_configurations import (
        PartialArtifactConfiguration,
    )

    stack = stack or Client().active_stack

    setting_classes = stack.setting_classes
    setting_classes.update(settings_utils.get_general_settings())

    pipeline_settings = {}
    step_settings = {}
    for key, setting_class in setting_classes.items():
        fields = pydantic_utils.TemplateGenerator(setting_class).run()
        if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
            pipeline_settings[key] = fields
        if ConfigurationLevel.STEP in setting_class.LEVEL:
            step_settings[key] = fields

    steps = {}
    for step_name, step in self.steps.items():
        parameters = (
            pydantic_utils.TemplateGenerator(step.PARAMETERS_CLASS).run()
            if step.PARAMETERS_CLASS
            else {}
        )
        outputs = {
            name: PartialArtifactConfiguration()
            for name in step.OUTPUT_SIGNATURE
        }
        step_template = StepConfigurationUpdate(
            parameters=parameters,
            settings=step_settings,
            outputs=outputs,
        )
        steps[step_name] = step_template

    run_config = PipelineRunConfiguration(
        settings=pipeline_settings, steps=steps
    )
    template = pydantic_utils.TemplateGenerator(run_config).run()
    yaml_string = yaml.dump(template)
    yaml_string = yaml_utils.comment_out_yaml(yaml_string)

    with open(path, "w") as f:
        f.write(yaml_string)

BasePipelineMeta (type)

Pipeline Metaclass responsible for validating the pipeline definition.

Source code in zenml/pipelines/base_pipeline.py
class BasePipelineMeta(type):
    """Pipeline Metaclass responsible for validating the pipeline definition."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BasePipelineMeta":
        """Saves argument names for later verification purposes.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The class.
        """
        dct.setdefault(INSTANCE_CONFIGURATION, {})
        cls = cast(
            Type["BasePipeline"], super().__new__(mcs, name, bases, dct)
        )

        cls.STEP_SPEC = {}

        connect_spec = inspect.getfullargspec(
            inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
        )
        connect_args = connect_spec.args

        if connect_args and connect_args[0] == "self":
            connect_args.pop(0)

        for arg in connect_args:
            arg_type = connect_spec.annotations.get(arg, None)
            cls.STEP_SPEC.update({arg: arg_type})
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Saves argument names for later verification purposes.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
BasePipelineMeta

The class.

Source code in zenml/pipelines/base_pipeline.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
    """Saves argument names for later verification purposes.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The class.
    """
    dct.setdefault(INSTANCE_CONFIGURATION, {})
    cls = cast(
        Type["BasePipeline"], super().__new__(mcs, name, bases, dct)
    )

    cls.STEP_SPEC = {}

    connect_spec = inspect.getfullargspec(
        inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
    )
    connect_args = connect_spec.args

    if connect_args and connect_args[0] == "self":
        connect_args.pop(0)

    for arg in connect_args:
        arg_type = connect_spec.annotations.get(arg, None)
        cls.STEP_SPEC.update({arg: arg_type})
    return cls

pipeline_decorator

Decorator function for ZenML pipelines.

pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, settings=None, extra=None, on_failure=None, on_success=None)

Outer decorator function for the creation of a ZenML pipeline.

In order to be able to work with parameters such as "name", it features a nested decorator structure.

Parameters:

Name Type Description Default
_func Optional[~F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None

Returns:

Type Description
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]]

the inner decorator which creates the pipeline class based on the ZenML BasePipeline

Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline.

    In order to be able to work with parameters such as "name", it features a
    nested decorator structure.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be
            a function with three possible parameters,
            `StepContext`, `BaseParameters`, and `BaseException`,
            or a source path to a function of the same specifications
            (e.g. `module.my_function`).
        on_success: Callback function in event of failure of the step. Can be
            a function with two possible parameters, `StepContext` and
            `BaseParameters, or a source path to a function of the same specifications
            (e.g. `module.my_function`).

    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """

    def inner_decorator(func: F) -> Type[BasePipeline]:
        """Inner decorator function for the creation of a ZenML pipeline.

        Args:
            func: types.FunctionType, this function will be used as the
                "connect" method of the generated Pipeline

        Returns:
            the class of a newly generated ZenML Pipeline
        """
        return type(  # noqa
            name if name else func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),  # type: ignore[arg-type] # noqa
                INSTANCE_CONFIGURATION: {
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_ENABLE_ARTIFACT_METADATA: enable_artifact_metadata,
                    PARAM_SETTINGS: settings,
                    PARAM_EXTRA_OPTIONS: extra,
                    PARAM_ON_FAILURE: on_failure,
                    PARAM_ON_SUCCESS: on_success,
                },
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)