Pipelines
        zenml.pipelines
  
      special
  
    A ZenML pipeline consists of tasks that execute in order and yield artifacts.
The artifacts are automatically stored within the artifact store and metadata 
is tracked by ZenML. Each individual task within a pipeline is known as a
step. The standard pipelines within ZenML are designed to have easy interfaces
to add pre-decided steps, with the order also pre-decided. Other sorts of
pipelines can be created as well from scratch, building on the BasePipeline class.
Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.
        base_pipeline
    Abstract base class for all ZenML pipelines.
        
BasePipeline        
    Abstract base class for all ZenML pipelines.
Attributes:
| Name | Type | Description | 
|---|---|---|
| name | The name of this pipeline. | |
| enable_cache | A boolean indicating if caching is enabled for this pipeline. | |
| enable_artifact_metadata | A boolean indicating if artifact metadata is enabled for this pipeline. | 
Source code in zenml/pipelines/base_pipeline.py
          class BasePipeline(metaclass=BasePipelineMeta):
    """Abstract base class for all ZenML pipelines.
    Attributes:
        name: The name of this pipeline.
        enable_cache: A boolean indicating if caching is enabled for this
            pipeline.
        enable_artifact_metadata: A boolean indicating if artifact metadata
            is enabled for this pipeline.
    """
    STEP_SPEC: ClassVar[Dict[str, Any]] = None  # type: ignore[assignment]
    INSTANCE_CONFIGURATION: Dict[str, Any] = {}
    def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
        """Initialize the BasePipeline.
        Args:
            *args: The steps to be executed by this pipeline.
            **kwargs: The configuration for this pipeline.
        """
        kwargs.update(self.INSTANCE_CONFIGURATION)
        self._configuration = PipelineConfiguration(
            name=self.__class__.__name__,
            enable_cache=kwargs.pop(PARAM_ENABLE_CACHE, None),
            enable_artifact_metadata=kwargs.pop(
                PARAM_ENABLE_ARTIFACT_METADATA, None
            ),
        )
        self._apply_class_configuration(kwargs)
        self.__steps: Dict[str, BaseStep] = {}
        self._verify_steps(*args, **kwargs)
    @property
    def name(self) -> str:
        """The name of the pipeline.
        Returns:
            The name of the pipeline.
        """
        return self.configuration.name
    @property
    def enable_cache(self) -> Optional[bool]:
        """If caching is enabled for the pipeline.
        Returns:
            If caching is enabled for the pipeline.
        """
        return self.configuration.enable_cache
    @property
    def configuration(self) -> PipelineConfiguration:
        """The configuration of the pipeline.
        Returns:
            The configuration of the pipeline.
        """
        return self._configuration
    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns a dictionary of pipeline steps.
        Returns:
            A dictionary of pipeline steps.
        """
        return self.__steps
    @classmethod
    def from_model(cls: Type[T], model: "PipelineResponseModel") -> T:
        """Creates a pipeline instance from a model.
        Args:
            model: The model to load the pipeline instance from.
        Returns:
            The pipeline instance.
        Raises:
            ValueError: If the spec version of the given model is <0.2
        """
        if version.parse(model.spec.version) < version.parse("0.2"):
            raise ValueError(
                "Loading a pipeline is only possible for pipeline specs with "
                "version 0.2 or higher."
            )
        steps = cls._load_and_verify_steps(pipeline_spec=model.spec)
        connect_method = cls._generate_connect_method(model=model)
        pipeline_class: Type[T] = type(
            model.name,
            (cls,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(connect_method),
                "__doc__": model.docstring,
            },
        )
        pipeline_instance = pipeline_class(**steps)
        version_hash = pipeline_instance._compute_unique_identifier(
            pipeline_spec=model.spec
        )
        if version_hash != model.version_hash:
            logger.warning(
                "Trying to load pipeline version `%s`, but the local step code "
                "changed since this pipeline version was registered. Using "
                "this pipeline instance will result in a different pipeline "
                "version being registered or reused.",
                model.version,
            )
        return pipeline_instance
    def configure(
        self: T,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        merge: bool = True,
    ) -> T:
        """Configures the pipeline.
        Configuration merging example:
        * `merge==True`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=True)
            pipeline.configuration.extra # {"key1": 1, "key2": 2}
        * `merge==False`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=False)
            pipeline.configuration.extra # {"key2": 2}
        Args:
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            merge: If `True`, will merge the given dictionary configurations
                like `extra` and `settings` with existing
                configurations. If `False` the given configurations will
                overwrite all existing ones. See the general description of this
                method for an example.
        Returns:
            The pipeline instance that this method was called on.
        """
        values = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "settings": settings,
                "extra": extra,
            }
        )
        config = PipelineConfigurationUpdate(**values)
        self._apply_configuration(config, merge=merge)
        return self
    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Function that connects inputs and outputs of the pipeline steps.
        Args:
            *args: The positional arguments passed to the pipeline.
            **kwargs: The keyword arguments passed to the pipeline.
        Raises:
            NotImplementedError: Always.
        """
        raise NotImplementedError
    def register(self) -> "PipelineResponseModel":
        """Register the pipeline in the server.
        Returns:
            The registered pipeline model.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry
        integration_registry.activate_integrations()
        custom_configurations = self.configuration.dict(
            exclude_defaults=True, exclude={"name"}
        )
        if custom_configurations:
            logger.warning(
                f"The pipeline `{self.name}` that you're registering has "
                "custom configurations applied to it. These will not be "
                "registered with the pipeline and won't be set when you build "
                "images or run the pipeline from the CLI. To provide these "
                "configurations, use the `--config` option of the `zenml "
                "pipeline build/run` commands."
            )
        pipeline_spec = Compiler().compile_spec(self)
        return self._register(pipeline_spec=pipeline_spec)
    def build(
        self,
        *,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        config_path: Optional[str] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Builds Docker images for the pipeline.
        Args:
            settings: Settings for the pipeline.
            step_configurations: Configurations for steps of the pipeline.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
        Returns:
            The build output.
        """
        deployment, pipeline_spec, _, _ = self._compile(
            config_path=config_path,
            steps=step_configurations,
            settings=settings,
        )
        pipeline_id = self._register(pipeline_spec=pipeline_spec).id
        return self._build(deployment=deployment, pipeline_id=pipeline_id)
    def run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
    ) -> None:
        """Runs the pipeline on the active stack of the current repository.
        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls inside docker containers which should only
            # run a single step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return
        with event_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
            deployment, pipeline_spec, schedule, build = self._compile(
                config_path=config_path,
                run_name=run_name,
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                steps=step_configurations,
                settings=settings,
                schedule=schedule,
                build=build,
                extra=extra,
            )
            skip_pipeline_registration = constants.handle_bool_env_var(
                constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION, default=False
            )
            register_pipeline = not (skip_pipeline_registration or unlisted)
            pipeline_id = None
            if register_pipeline:
                pipeline_id = self._register(pipeline_spec=pipeline_spec).id
            # TODO: check whether orchestrator even support scheduling before
            # registering the schedule
            schedule_id = None
            if schedule:
                if schedule.name:
                    schedule_name = schedule.name
                else:
                    date = datetime.utcnow().strftime("%Y_%m_%d")
                    time = datetime.utcnow().strftime("%H_%M_%S_%f")
                    schedule_name = deployment.run_name_template.format(
                        date=date, time=time
                    )
                components = Client().active_stack_model.components
                orchestrator = components[StackComponentType.ORCHESTRATOR][0]
                schedule_model = ScheduleRequestModel(
                    workspace=Client().active_workspace.id,
                    user=Client().active_user.id,
                    pipeline_id=pipeline_id,
                    orchestrator_id=orchestrator.id,
                    name=schedule_name,
                    active=True,
                    cron_expression=schedule.cron_expression,
                    start_time=schedule.start_time,
                    end_time=schedule.end_time,
                    interval_second=schedule.interval_second,
                    catchup=schedule.catchup,
                )
                schedule_id = (
                    Client().zen_store.create_schedule(schedule_model).id
                )
                logger.info(
                    f"Created schedule `{schedule_name}` for pipeline "
                    f"`{deployment.pipeline_configuration.name}`."
                )
            stack = Client().active_stack
            build_model = self._load_or_create_pipeline_build(
                deployment=deployment,
                pipeline_spec=pipeline_spec,
                pipeline_id=pipeline_id,
                build=build,
            )
            build_id = build_model.id if build_model else None
            deployment_request = PipelineDeploymentRequestModel(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=stack.id,
                pipeline=pipeline_id,
                build=build_id,
                schedule=schedule_id,
                **deployment.dict(),
            )
            deployment_model = Client().zen_store.create_deployment(
                deployment=deployment_request
            )
            analytics_handler.metadata = self._get_pipeline_analytics_metadata(
                deployment=deployment_model, stack=stack
            )
            caching_status = (
                "enabled"
                if deployment.pipeline_configuration.enable_cache is not False
                else "disabled"
            )
            logger.info(
                "%s %s on stack `%s` (caching %s)",
                "Scheduling" if deployment_model.schedule else "Running",
                f"pipeline `{deployment_model.pipeline_configuration.name}`"
                if register_pipeline
                else "unlisted pipeline",
                stack.name,
                caching_status,
            )
            stack.prepare_pipeline_deployment(deployment=deployment_model)
            # Prevent execution of nested pipelines which might lead to
            # unexpected behavior
            constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
            try:
                stack.deploy_pipeline(
                    deployment=deployment_model,
                )
            finally:
                constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
            # Log the dashboard URL
            dashboard_utils.print_run_url(
                run_name=deployment.run_name_template, pipeline_id=pipeline_id
            )
    @classmethod
    def get_runs(cls) -> Optional[List["PipelineRunView"]]:
        """Get all past runs from the associated PipelineView.
        Returns:
            A list of all past PipelineRunViews.
        Raises:
            RuntimeError: In case the repository does not contain the view
                of the current pipeline.
        """
        from zenml.post_execution import get_pipeline
        pipeline_view = get_pipeline(cls)
        if pipeline_view:
            return pipeline_view.runs  # type: ignore[no-any-return]
        else:
            raise RuntimeError(
                f"The PipelineView for `{cls.__name__}` could "
                f"not be found. Are you sure this pipeline has "
                f"been run already?"
            )
    def write_run_configuration_template(
        self, path: str, stack: Optional["Stack"] = None
    ) -> None:
        """Writes a run configuration yaml template.
        Args:
            path: The path where the template will be written.
            stack: The stack for which the template should be generated. If
                not given, the active stack will be used.
        """
        from zenml.config.base_settings import ConfigurationLevel
        from zenml.config.step_configurations import (
            PartialArtifactConfiguration,
        )
        stack = stack or Client().active_stack
        setting_classes = stack.setting_classes
        setting_classes.update(settings_utils.get_general_settings())
        pipeline_settings = {}
        step_settings = {}
        for key, setting_class in setting_classes.items():
            fields = pydantic_utils.TemplateGenerator(setting_class).run()
            if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
                pipeline_settings[key] = fields
            if ConfigurationLevel.STEP in setting_class.LEVEL:
                step_settings[key] = fields
        steps = {}
        for step_name, step in self.steps.items():
            parameters = (
                pydantic_utils.TemplateGenerator(step.PARAMETERS_CLASS).run()
                if step.PARAMETERS_CLASS
                else {}
            )
            outputs = {
                name: PartialArtifactConfiguration()
                for name in step.OUTPUT_SIGNATURE
            }
            step_template = StepConfigurationUpdate(
                parameters=parameters,
                settings=step_settings,
                outputs=outputs,
            )
            steps[step_name] = step_template
        run_config = PipelineRunConfiguration(
            settings=pipeline_settings, steps=steps
        )
        template = pydantic_utils.TemplateGenerator(run_config).run()
        yaml_string = yaml.dump(template)
        yaml_string = yaml_utils.comment_out_yaml(yaml_string)
        with open(path, "w") as f:
            f.write(yaml_string)
    def _apply_class_configuration(self, options: Dict[str, Any]) -> None:
        """Applies the configurations specified on the pipeline class.
        Args:
            options: Class configurations.
        """
        settings = options.pop(PARAM_SETTINGS, None)
        extra = options.pop(PARAM_EXTRA_OPTIONS, None)
        self.configure(settings=settings, extra=extra)
    def _apply_configuration(
        self,
        config: PipelineConfigurationUpdate,
        merge: bool = True,
    ) -> None:
        """Applies an update to the pipeline configuration.
        Args:
            config: The configuration update.
            merge: Whether to merge the updates with the existing configuration
                or not. See the `BasePipeline.configure(...)` method for a
                detailed explanation.
        """
        self._validate_configuration(config)
        self._configuration = pydantic_utils.update_model(
            self._configuration, update=config, recursive=merge
        )
        logger.debug("Updated pipeline configuration:")
        logger.debug(self._configuration)
    @staticmethod
    def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
        """Validates a configuration update.
        Args:
            config: The configuration update to validate.
        """
        settings_utils.validate_setting_keys(list(config.settings))
    def _verify_steps(self, *steps: BaseStep, **kw_steps: Any) -> None:
        """Verifies the initialization args and kwargs of this pipeline.
        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline. If
        all arguments are correct, saves the steps to `self.__steps`.
        Args:
            *steps: The args passed to the init method of this pipeline.
            **kw_steps: The kwargs passed to the init method of this pipeline.
        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.
        """
        input_step_keys = list(self.STEP_SPEC.keys())
        if len(steps) > len(input_step_keys):
            raise PipelineInterfaceError(
                f"Too many input steps for pipeline '{self.name}'. "
                f"This pipeline expects {len(input_step_keys)} step(s) "
                f"but got {len(steps) + len(kw_steps)}."
            )
        combined_steps = {}
        step_ids: Dict[int, str] = {}
        def _verify_step(key: str, step: BaseStep) -> None:
            """Verifies a single step of the pipeline.
            Args:
                key: The key of the step.
                step: The step to verify.
            Raises:
                PipelineInterfaceError: If the step is not of the correct type
                    or is of the same class as another step.
            """
            step_class = type(step)
            if isinstance(step, BaseStepMeta):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. "
                    f"A `BaseStep` subclass was provided instead of an "
                    f"instance. "
                    f"This might have been caused due to missing brackets of "
                    f"your steps when creating a pipeline with `@step` "
                    f"decorated functions, "
                    f"for which the correct syntax is `pipeline(step=step())`."
                )
            if not isinstance(step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )
            if id(step) in step_ids:
                previous_key = step_ids[id(step)]
                raise PipelineInterfaceError(
                    f"Found the same step object for arguments "
                    f"'{previous_key}' and '{key}' in pipeline '{self.name}'. "
                    "Step object cannot be reused inside a ZenML pipeline. "
                    "A possible solution is to create two instances of the "
                    "same step class and assigning them different names: "
                    "`first_instance = step_class(name='s1')` and "
                    "`second_instance = step_class(name='s2')`."
                )
            step_ids[id(step)] = key
            combined_steps[key] = step
        # verify args
        for i, step in enumerate(steps):
            key = input_step_keys[i]
            _verify_step(key, step)
        # verify kwargs
        for key, step in kw_steps.items():
            if key in combined_steps:
                # a step for this key was already set by
                # the positional input steps
                raise PipelineInterfaceError(
                    f"Unexpected keyword argument '{key}' for pipeline "
                    f"'{self.name}'. A step for this key was "
                    f"already passed as a positional argument."
                )
            _verify_step(key, step)
        # check if there are any missing or unexpected steps
        expected_steps = set(self.STEP_SPEC.keys())
        actual_steps = set(combined_steps.keys())
        missing_steps = expected_steps - actual_steps
        unexpected_steps = actual_steps - expected_steps
        if missing_steps:
            raise PipelineInterfaceError(
                f"Missing input step(s) for pipeline "
                f"'{self.name}': {missing_steps}."
            )
        if unexpected_steps:
            raise PipelineInterfaceError(
                f"Unexpected input step(s) for pipeline "
                f"'{self.name}': {unexpected_steps}. This pipeline "
                f"only requires the following steps: {expected_steps}."
            )
        self.__steps = combined_steps
    def _get_pipeline_analytics_metadata(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
    ) -> Dict[str, Any]:
        """Returns the pipeline deployment metadata.
        Args:
            deployment: The pipeline deployment to track.
            stack: The stack on which the pipeline will be deployed.
        Returns:
            the metadata about the pipeline deployment
        """
        custom_materializer = False
        for step in deployment.step_configurations.values():
            for output in step.config.outputs.values():
                if not output.materializer_source.startswith("zenml."):
                    custom_materializer = True
        stack_creator = Client().get_stack(stack.id).user
        active_user = Client().active_user
        own_stack = stack_creator and stack_creator.id == active_user.id
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        return {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "total_steps": len(self.steps),
            "schedule": bool(deployment.schedule),
            "custom_materializer": custom_materializer,
            "own_stack": own_stack,
        }
    @staticmethod
    def _load_and_verify_steps(
        pipeline_spec: "PipelineSpec",
    ) -> Dict[str, BaseStep]:
        """Loads steps and verifies their names and inputs/outputs names.
        Args:
            pipeline_spec: The pipeline spec from which to load the steps.
        Raises:
            RuntimeError: If the step names or input/output names of the
                loaded steps don't match with the names defined in the spec.
        Returns:
            The loaded steps.
        """
        steps = {}
        available_outputs: Dict[str, Set[str]] = {}
        for step_spec in pipeline_spec.steps:
            for upstream_step in step_spec.upstream_steps:
                if upstream_step not in available_outputs:
                    raise RuntimeError(
                        f"Unable to find upstream step `{upstream_step}`. "
                        "This is probably because the step was renamed in code."
                    )
            for input_spec in step_spec.inputs.values():
                if (
                    input_spec.output_name
                    not in available_outputs[input_spec.step_name]
                ):
                    raise RuntimeError(
                        f"Missing output `{input_spec.output_name}` for step "
                        f"`{input_spec.step_name}`. This is probably because "
                        "the output of the step was renamed."
                    )
            step = BaseStep.load_from_source(step_spec.source)
            input_names = set(step.INPUT_SIGNATURE)
            spec_input_names = set(step_spec.inputs)
            if input_names != spec_input_names:
                raise RuntimeError(
                    f"Input names of step {step_spec.source} and the spec "
                    f"from the database don't match. Step inputs: "
                    f"`{input_names}`, spec inputs: `{spec_input_names}`."
                )
            steps[step_spec.pipeline_parameter_name] = step
            available_outputs[step.name] = set(step.OUTPUT_SIGNATURE.keys())
        return steps
    @staticmethod
    def _generate_connect_method(
        model: "PipelineResponseModel",
    ) -> Callable[..., None]:
        """Dynamically generates a connect method for a pipeline model.
        Args:
            model: The model for which to generate the method.
        Returns:
            The generated connect method.
        """
        def connect(**steps: BaseStep) -> None:
            # Bind **steps to the connect signature assigned to this method
            # below. This ensures that the method inputs get verified and only
            # the arguments defined in the signature are allowed
            inspect.signature(connect).bind(**steps)
            step_outputs: Dict[str, Dict[str, BaseStep._OutputArtifact]] = {}
            for step_spec in model.spec.steps:
                step = steps[step_spec.pipeline_parameter_name]
                step_inputs = {}
                for input_name, input_ in step_spec.inputs.items():
                    try:
                        upstream_step = step_outputs[input_.step_name]
                        step_input = upstream_step[input_.output_name]
                        step_inputs[input_name] = step_input
                    except KeyError:
                        raise RuntimeError(
                            f"Unable to find upstream step "
                            f"`{input_.step_name}` in pipeline `{model.name}`. "
                            "This is probably due to configuring a new step "
                            "name after loading a pipeline using "
                            "`BasePipeline.from_model`."
                        )
                step_output = step(**step_inputs)
                output_keys = list(step.OUTPUT_SIGNATURE.keys())
                if isinstance(step_output, BaseStep._OutputArtifact):
                    step_output = [step_output]
                step_outputs[step.name] = {
                    key: step_output[i] for i, key in enumerate(output_keys)
                }
        # Create the connect method signature based on the expected steps
        parameters = [
            inspect.Parameter(
                name=step_spec.pipeline_parameter_name,
                kind=inspect.Parameter.POSITIONAL_OR_KEYWORD,
            )
            for step_spec in model.spec.steps
        ]
        signature = inspect.Signature(parameters=parameters)
        connect.__signature__ = signature  # type: ignore[attr-defined]
        return connect
    def _compile(
        self, config_path: Optional[str] = None, **run_configuration_args: Any
    ) -> Tuple[
        "PipelineDeploymentBaseModel",
        "PipelineSpec",
        Optional["Schedule"],
        Union["PipelineBuildBaseModel", UUID, None],
    ]:
        """Compiles the pipeline.
        Args:
            config_path: Path to a config file.
            **run_configuration_args: Configurations for the pipeline run.
        Returns:
            A tuple containing the deployment, spec, schedule and build of
            the compiled pipeline.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry
        integration_registry.activate_integrations()
        if config_path:
            run_config = PipelineRunConfiguration.from_yaml(config_path)
        else:
            run_config = PipelineRunConfiguration()
        new_values = dict_utils.remove_none_values(run_configuration_args)
        update = PipelineRunConfiguration.parse_obj(new_values)
        # Update with the values in code so they take precedence
        run_config = pydantic_utils.update_model(run_config, update=update)
        deployment, pipeline_spec = Compiler().compile(
            pipeline=self,
            stack=Client().active_stack,
            run_configuration=run_config,
        )
        return deployment, pipeline_spec, run_config.schedule, run_config.build
    def _register(
        self, pipeline_spec: "PipelineSpec"
    ) -> "PipelineResponseModel":
        """Register the pipeline in the server.
        Args:
            pipeline_spec: The pipeline spec to register.
        Returns:
            The registered pipeline model.
        """
        version_hash = self._compute_unique_identifier(
            pipeline_spec=pipeline_spec
        )
        client = Client()
        matching_pipelines = client.list_pipelines(
            name=self.name,
            version_hash=version_hash,
            size=1,
            sort_by="desc:created",
        )
        if matching_pipelines.total:
            registered_pipeline = matching_pipelines.items[0]
            logger.info(
                "Reusing registered pipeline `%s` (version: %s).",
                registered_pipeline.name,
                registered_pipeline.version,
            )
            return registered_pipeline
        latest_version = self._get_latest_version() or 0
        version = str(latest_version + 1)
        request = PipelineRequestModel(
            workspace=client.active_workspace.id,
            user=client.active_user.id,
            name=self.name,
            version=version,
            version_hash=version_hash,
            spec=pipeline_spec,
            docstring=self.__doc__,
        )
        registered_pipeline = client.zen_store.create_pipeline(
            pipeline=request
        )
        logger.info(
            "Registered pipeline `%s` (version %s).",
            registered_pipeline.name,
            registered_pipeline.version,
        )
        return registered_pipeline
    def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
        """Computes a unique identifier from the pipeline spec and steps.
        Args:
            pipeline_spec: Compiled spec of the pipeline.
        Returns:
            The unique identifier of the pipeline.
        """
        hash_ = hashlib.md5()
        hash_.update(pipeline_spec.json(sort_keys=False).encode())
        for step_spec in pipeline_spec.steps:
            step_source = self.steps[
                step_spec.pipeline_parameter_name
            ].source_code
            hash_.update(step_source.encode())
        return hash_.hexdigest()
    def _get_latest_version(self) -> Optional[int]:
        """Gets the latest version of this pipeline.
        Returns:
            The latest version or `None` if no version exists.
        """
        all_pipelines = Client().list_pipelines(
            name=self.name, sort_by="desc:created", size=1
        )
        if all_pipelines.total:
            pipeline = all_pipelines.items[0]
            if pipeline.version == "UNVERSIONED":
                return None
            return int(all_pipelines.items[0].version)
        else:
            return None
    def _get_registered_model(self) -> Optional[PipelineResponseModel]:
        """Gets the registered pipeline model for this instance.
        Returns:
            The registered pipeline model or None if no model is registered yet.
        """
        pipeline_spec = Compiler().compile_spec(self)
        version_hash = self._compute_unique_identifier(
            pipeline_spec=pipeline_spec
        )
        pipelines = Client().list_pipelines(
            name=self.name, version_hash=version_hash
        )
        if len(pipelines) == 1:
            return pipelines.items[0]
        return None
    def _load_or_create_pipeline_build(
        self,
        deployment: "PipelineDeploymentBaseModel",
        pipeline_spec: "PipelineSpec",
        pipeline_id: Optional[UUID] = None,
        build: Union["UUID", "PipelineBuildBaseModel", None] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Loads or creates a pipeline build.
        Args:
            deployment: The pipeline deployment for which to load or create the
                build.
            pipeline_spec: Spec of the pipeline.
            pipeline_id: Optional ID of the pipeline to reference in the build.
            build: Optional existing build. If given, the build will be loaded
                (or registered) in the database. If not given, a new build will
                be created.
        Returns:
            The build response.
        """
        if not build:
            return self._build(deployment=deployment, pipeline_id=pipeline_id)
        logger.info(
            "Using an old build for a pipeline run can lead to "
            "unexpected behavior as the pipeline will run with the step "
            "code that was included in the Docker images which might "
            "differ from the code in your client environment."
        )
        build_model = None
        if isinstance(build, UUID):
            build_model = Client().zen_store.get_build(build_id=build)
            if build_model.pipeline:
                build_hash = build_model.pipeline.version_hash
                current_hash = self._compute_unique_identifier(
                    pipeline_spec=pipeline_spec
                )
                if build_hash != current_hash:
                    logger.warning(
                        "The pipeline associated with the build you "
                        "specified for this run has a different spec "
                        "or step code. This might lead to unexpected "
                        "behavior as this pipeline run will use the "
                        "code that was included in the Docker images which "
                        "might differ from the code in your client "
                        "environment."
                    )
        else:
            build_request = PipelineBuildRequestModel(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=Client().active_stack_model.id,
                pipeline=pipeline_id,
                **build.dict(),
            )
            build_model = Client().zen_store.create_build(build=build_request)
        if build_model.is_local:
            logger.warning(
                "You're using a local build to run your pipeline. This "
                "might lead to errors if the images don't exist on "
                "your local machine or the image tags have been "
                "overwritten since the original build happened."
            )
        return build_model
    def _build(
        self,
        deployment: "PipelineDeploymentBaseModel",
        pipeline_id: Optional[UUID] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Builds images and registers the output in the server.
        Args:
            deployment: The compiled pipeline deployment.
            pipeline_id: The ID of the pipeline.
        Returns:
            The build output.
        Raises:
            RuntimeError: If multiple builds with the same key but different
                settings were specified.
        """
        client = Client()
        stack = client.active_stack
        required_builds = stack.get_docker_builds(deployment=deployment)
        if not required_builds:
            logger.debug("No docker builds required.")
            return None
        logger.info(
            "Building Docker image(s) for pipeline `%s`.",
            deployment.pipeline_configuration.name,
        )
        docker_image_builder = PipelineDockerImageBuilder()
        images: Dict[str, BuildItem] = {}
        image_names: Dict[str, str] = {}
        for build_config in required_builds:
            combined_key = PipelineBuildBaseModel.get_image_key(
                component_key=build_config.key, step=build_config.step_name
            )
            checksum = build_config.compute_settings_checksum(stack=stack)
            if combined_key in images:
                previous_checksum = images[combined_key].settings_checksum
                if previous_checksum != checksum:
                    raise RuntimeError(
                        f"Trying to build image for key `{combined_key}` but "
                        "an image for this key was already built with a "
                        "different configuration. This happens if multiple "
                        "stack components specified Docker builds for the same "
                        "key in the `StackComponent.get_docker_builds(...)` "
                        "method. If you're using custom components, make sure "
                        "to provide unique keys when returning your build "
                        "configurations to avoid this error."
                    )
                else:
                    continue
            if checksum in image_names:
                image_name_or_digest = image_names[checksum]
            else:
                tag = deployment.pipeline_configuration.name
                if build_config.step_name:
                    tag += f"-{build_config.step_name}"
                tag += f"-{build_config.key}"
                image_name_or_digest = docker_image_builder.build_docker_image(
                    docker_settings=build_config.settings,
                    tag=tag,
                    stack=stack,
                    entrypoint=build_config.entrypoint,
                    extra_files=build_config.extra_files,
                )
            images[combined_key] = BuildItem(
                image=image_name_or_digest, settings_checksum=checksum
            )
            image_names[checksum] = image_name_or_digest
        logger.info("Finished building Docker image(s).")
        is_local = stack.container_registry is None
        build_request = PipelineBuildRequestModel(
            user=client.active_user.id,
            workspace=client.active_workspace.id,
            stack=client.active_stack_model.id,
            pipeline=pipeline_id,
            is_local=is_local,
            images=images,
        )
        return client.zen_store.create_build(build_request)
configuration: PipelineConfiguration
  
      property
      readonly
  
    The configuration of the pipeline.
Returns:
| Type | Description | 
|---|---|
| PipelineConfiguration | The configuration of the pipeline. | 
enable_cache: Optional[bool]
  
      property
      readonly
  
    If caching is enabled for the pipeline.
Returns:
| Type | Description | 
|---|---|
| Optional[bool] | If caching is enabled for the pipeline. | 
name: str
  
      property
      readonly
  
    The name of the pipeline.
Returns:
| Type | Description | 
|---|---|
| str | The name of the pipeline. | 
steps: Dict[str, zenml.steps.base_step.BaseStep]
  
      property
      readonly
  
    Returns a dictionary of pipeline steps.
Returns:
| Type | Description | 
|---|---|
| Dict[str, zenml.steps.base_step.BaseStep] | A dictionary of pipeline steps. | 
__init__(self, *args, **kwargs)
  
      special
  
    Initialize the BasePipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | BaseStep | The steps to be executed by this pipeline. | () | 
| **kwargs | Any | The configuration for this pipeline. | {} | 
Source code in zenml/pipelines/base_pipeline.py
          def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
    """Initialize the BasePipeline.
    Args:
        *args: The steps to be executed by this pipeline.
        **kwargs: The configuration for this pipeline.
    """
    kwargs.update(self.INSTANCE_CONFIGURATION)
    self._configuration = PipelineConfiguration(
        name=self.__class__.__name__,
        enable_cache=kwargs.pop(PARAM_ENABLE_CACHE, None),
        enable_artifact_metadata=kwargs.pop(
            PARAM_ENABLE_ARTIFACT_METADATA, None
        ),
    )
    self._apply_class_configuration(kwargs)
    self.__steps: Dict[str, BaseStep] = {}
    self._verify_steps(*args, **kwargs)
build(self, *, settings=None, step_configurations=None, config_path=None)
    Builds Docker images for the pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| settings | Optional[Mapping[str, SettingsOrDict]] | Settings for the pipeline. | None | 
| step_configurations | Optional[Mapping[str, StepConfigurationUpdateOrDict]] | Configurations for steps of the pipeline. | None | 
| config_path | Optional[str] | Path to a yaml configuration file. This file will
be parsed as a  | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineBuildResponseModel] | The build output. | 
Source code in zenml/pipelines/base_pipeline.py
          def build(
    self,
    *,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponseModel"]:
    """Builds Docker images for the pipeline.
    Args:
        settings: Settings for the pipeline.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
    Returns:
        The build output.
    """
    deployment, pipeline_spec, _, _ = self._compile(
        config_path=config_path,
        steps=step_configurations,
        settings=settings,
    )
    pipeline_id = self._register(pipeline_spec=pipeline_spec).id
    return self._build(deployment=deployment, pipeline_id=pipeline_id)
configure(self, enable_cache=None, enable_artifact_metadata=None, settings=None, extra=None, merge=True)
    Configures the pipeline.
Configuration merging example:
* merge==True:
    pipeline.configure(extra={"key1": 1})
    pipeline.configure(extra={"key2": 2}, merge=True)
    pipeline.configuration.extra # {"key1": 1, "key2": 2}
* merge==False:
    pipeline.configure(extra={"key1": 1})
    pipeline.configure(extra={"key2": 2}, merge=False)
    pipeline.configuration.extra # {"key2": 2}
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| enable_cache | Optional[bool] | If caching should be enabled for this pipeline. | None | 
| enable_artifact_metadata | Optional[bool] | If artifact metadata should be enabled for this pipeline. | None | 
| settings | Optional[Mapping[str, SettingsOrDict]] | settings for this pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline. | None | 
| merge | bool | If  | True | 
Returns:
| Type | Description | 
|---|---|
| ~T | The pipeline instance that this method was called on. | 
Source code in zenml/pipelines/base_pipeline.py
          def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    merge: bool = True,
) -> T:
    """Configures the pipeline.
    Configuration merging example:
    * `merge==True`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=True)
        pipeline.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=False)
        pipeline.configuration.extra # {"key2": 2}
    Args:
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        merge: If `True`, will merge the given dictionary configurations
            like `extra` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.
    Returns:
        The pipeline instance that this method was called on.
    """
    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "settings": settings,
            "extra": extra,
        }
    )
    config = PipelineConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
connect(self, *args, **kwargs)
    Function that connects inputs and outputs of the pipeline steps.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | BaseStep | The positional arguments passed to the pipeline. | () | 
| **kwargs | BaseStep | The keyword arguments passed to the pipeline. | {} | 
Exceptions:
| Type | Description | 
|---|---|
| NotImplementedError | Always. | 
Source code in zenml/pipelines/base_pipeline.py
          @abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Function that connects inputs and outputs of the pipeline steps.
    Args:
        *args: The positional arguments passed to the pipeline.
        **kwargs: The keyword arguments passed to the pipeline.
    Raises:
        NotImplementedError: Always.
    """
    raise NotImplementedError
from_model(model)
  
      classmethod
  
    Creates a pipeline instance from a model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model | PipelineResponseModel | The model to load the pipeline instance from. | required | 
Returns:
| Type | Description | 
|---|---|
| ~T | The pipeline instance. | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If the spec version of the given model is <0.2 | 
Source code in zenml/pipelines/base_pipeline.py
          @classmethod
def from_model(cls: Type[T], model: "PipelineResponseModel") -> T:
    """Creates a pipeline instance from a model.
    Args:
        model: The model to load the pipeline instance from.
    Returns:
        The pipeline instance.
    Raises:
        ValueError: If the spec version of the given model is <0.2
    """
    if version.parse(model.spec.version) < version.parse("0.2"):
        raise ValueError(
            "Loading a pipeline is only possible for pipeline specs with "
            "version 0.2 or higher."
        )
    steps = cls._load_and_verify_steps(pipeline_spec=model.spec)
    connect_method = cls._generate_connect_method(model=model)
    pipeline_class: Type[T] = type(
        model.name,
        (cls,),
        {
            PIPELINE_INNER_FUNC_NAME: staticmethod(connect_method),
            "__doc__": model.docstring,
        },
    )
    pipeline_instance = pipeline_class(**steps)
    version_hash = pipeline_instance._compute_unique_identifier(
        pipeline_spec=model.spec
    )
    if version_hash != model.version_hash:
        logger.warning(
            "Trying to load pipeline version `%s`, but the local step code "
            "changed since this pipeline version was registered. Using "
            "this pipeline instance will result in a different pipeline "
            "version being registered or reused.",
            model.version,
        )
    return pipeline_instance
get_runs()
  
      classmethod
  
    Get all past runs from the associated PipelineView.
Returns:
| Type | Description | 
|---|---|
| Optional[List[PipelineRunView]] | A list of all past PipelineRunViews. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | In case the repository does not contain the view of the current pipeline. | 
Source code in zenml/pipelines/base_pipeline.py
          @classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
    """Get all past runs from the associated PipelineView.
    Returns:
        A list of all past PipelineRunViews.
    Raises:
        RuntimeError: In case the repository does not contain the view
            of the current pipeline.
    """
    from zenml.post_execution import get_pipeline
    pipeline_view = get_pipeline(cls)
    if pipeline_view:
        return pipeline_view.runs  # type: ignore[no-any-return]
    else:
        raise RuntimeError(
            f"The PipelineView for `{cls.__name__}` could "
            f"not be found. Are you sure this pipeline has "
            f"been run already?"
        )
register(self)
    Register the pipeline in the server.
Returns:
| Type | Description | 
|---|---|
| PipelineResponseModel | The registered pipeline model. | 
Source code in zenml/pipelines/base_pipeline.py
          def register(self) -> "PipelineResponseModel":
    """Register the pipeline in the server.
    Returns:
        The registered pipeline model.
    """
    # Activating the built-in integrations to load all materializers
    from zenml.integrations.registry import integration_registry
    integration_registry.activate_integrations()
    custom_configurations = self.configuration.dict(
        exclude_defaults=True, exclude={"name"}
    )
    if custom_configurations:
        logger.warning(
            f"The pipeline `{self.name}` that you're registering has "
            "custom configurations applied to it. These will not be "
            "registered with the pipeline and won't be set when you build "
            "images or run the pipeline from the CLI. To provide these "
            "configurations, use the `--config` option of the `zenml "
            "pipeline build/run` commands."
        )
    pipeline_spec = Compiler().compile_spec(self)
    return self._register(pipeline_spec=pipeline_spec)
run(self, *, run_name=None, enable_cache=None, enable_artifact_metadata=None, schedule=None, build=None, settings=None, step_configurations=None, extra=None, config_path=None, unlisted=False)
    Runs the pipeline on the active stack of the current repository.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_name | Optional[str] | Name of the pipeline run. | None | 
| enable_cache | Optional[bool] | If caching should be enabled for this pipeline run. | None | 
| enable_artifact_metadata | Optional[bool] | If artifact metadata should be enabled for this pipeline run. | None | 
| schedule | Optional[zenml.config.schedule.Schedule] | Optional schedule to use for the run. | None | 
| build | Union[str, UUID, PipelineBuildBaseModel] | Optional build to use for the run. | None | 
| settings | Optional[Mapping[str, SettingsOrDict]] | Settings for this pipeline run. | None | 
| step_configurations | Optional[Mapping[str, StepConfigurationUpdateOrDict]] | Configurations for steps of the pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline run. | None | 
| config_path | Optional[str] | Path to a yaml configuration file. This file will
be parsed as a  | None | 
| unlisted | bool | Whether the pipeline run should be unlisted (not assigned to any pipeline). | False | 
Source code in zenml/pipelines/base_pipeline.py
          def run(
    self,
    *,
    run_name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    extra: Optional[Dict[str, Any]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
) -> None:
    """Runs the pipeline on the active stack of the current repository.
    Args:
        run_name: Name of the pipeline run.
        enable_cache: If caching should be enabled for this pipeline run.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        settings: Settings for this pipeline run.
        step_configurations: Configurations for steps of the pipeline.
        extra: Extra configurations for this pipeline run.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
    """
    if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
        # An environment variable was set to stop the execution of
        # pipelines. This is done to prevent execution of module-level
        # pipeline.run() calls inside docker containers which should only
        # run a single step.
        logger.info(
            "Preventing execution of pipeline '%s'. If this is not "
            "intended behavior, make sure to unset the environment "
            "variable '%s'.",
            self.name,
            constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
        )
        return
    with event_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
        deployment, pipeline_spec, schedule, build = self._compile(
            config_path=config_path,
            run_name=run_name,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            steps=step_configurations,
            settings=settings,
            schedule=schedule,
            build=build,
            extra=extra,
        )
        skip_pipeline_registration = constants.handle_bool_env_var(
            constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION, default=False
        )
        register_pipeline = not (skip_pipeline_registration or unlisted)
        pipeline_id = None
        if register_pipeline:
            pipeline_id = self._register(pipeline_spec=pipeline_spec).id
        # TODO: check whether orchestrator even support scheduling before
        # registering the schedule
        schedule_id = None
        if schedule:
            if schedule.name:
                schedule_name = schedule.name
            else:
                date = datetime.utcnow().strftime("%Y_%m_%d")
                time = datetime.utcnow().strftime("%H_%M_%S_%f")
                schedule_name = deployment.run_name_template.format(
                    date=date, time=time
                )
            components = Client().active_stack_model.components
            orchestrator = components[StackComponentType.ORCHESTRATOR][0]
            schedule_model = ScheduleRequestModel(
                workspace=Client().active_workspace.id,
                user=Client().active_user.id,
                pipeline_id=pipeline_id,
                orchestrator_id=orchestrator.id,
                name=schedule_name,
                active=True,
                cron_expression=schedule.cron_expression,
                start_time=schedule.start_time,
                end_time=schedule.end_time,
                interval_second=schedule.interval_second,
                catchup=schedule.catchup,
            )
            schedule_id = (
                Client().zen_store.create_schedule(schedule_model).id
            )
            logger.info(
                f"Created schedule `{schedule_name}` for pipeline "
                f"`{deployment.pipeline_configuration.name}`."
            )
        stack = Client().active_stack
        build_model = self._load_or_create_pipeline_build(
            deployment=deployment,
            pipeline_spec=pipeline_spec,
            pipeline_id=pipeline_id,
            build=build,
        )
        build_id = build_model.id if build_model else None
        deployment_request = PipelineDeploymentRequestModel(
            user=Client().active_user.id,
            workspace=Client().active_workspace.id,
            stack=stack.id,
            pipeline=pipeline_id,
            build=build_id,
            schedule=schedule_id,
            **deployment.dict(),
        )
        deployment_model = Client().zen_store.create_deployment(
            deployment=deployment_request
        )
        analytics_handler.metadata = self._get_pipeline_analytics_metadata(
            deployment=deployment_model, stack=stack
        )
        caching_status = (
            "enabled"
            if deployment.pipeline_configuration.enable_cache is not False
            else "disabled"
        )
        logger.info(
            "%s %s on stack `%s` (caching %s)",
            "Scheduling" if deployment_model.schedule else "Running",
            f"pipeline `{deployment_model.pipeline_configuration.name}`"
            if register_pipeline
            else "unlisted pipeline",
            stack.name,
            caching_status,
        )
        stack.prepare_pipeline_deployment(deployment=deployment_model)
        # Prevent execution of nested pipelines which might lead to
        # unexpected behavior
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
        try:
            stack.deploy_pipeline(
                deployment=deployment_model,
            )
        finally:
            constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False
        # Log the dashboard URL
        dashboard_utils.print_run_url(
            run_name=deployment.run_name_template, pipeline_id=pipeline_id
        )
write_run_configuration_template(self, path, stack=None)
    Writes a run configuration yaml template.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | The path where the template will be written. | required | 
| stack | Optional[Stack] | The stack for which the template should be generated. If not given, the active stack will be used. | None | 
Source code in zenml/pipelines/base_pipeline.py
          def write_run_configuration_template(
    self, path: str, stack: Optional["Stack"] = None
) -> None:
    """Writes a run configuration yaml template.
    Args:
        path: The path where the template will be written.
        stack: The stack for which the template should be generated. If
            not given, the active stack will be used.
    """
    from zenml.config.base_settings import ConfigurationLevel
    from zenml.config.step_configurations import (
        PartialArtifactConfiguration,
    )
    stack = stack or Client().active_stack
    setting_classes = stack.setting_classes
    setting_classes.update(settings_utils.get_general_settings())
    pipeline_settings = {}
    step_settings = {}
    for key, setting_class in setting_classes.items():
        fields = pydantic_utils.TemplateGenerator(setting_class).run()
        if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
            pipeline_settings[key] = fields
        if ConfigurationLevel.STEP in setting_class.LEVEL:
            step_settings[key] = fields
    steps = {}
    for step_name, step in self.steps.items():
        parameters = (
            pydantic_utils.TemplateGenerator(step.PARAMETERS_CLASS).run()
            if step.PARAMETERS_CLASS
            else {}
        )
        outputs = {
            name: PartialArtifactConfiguration()
            for name in step.OUTPUT_SIGNATURE
        }
        step_template = StepConfigurationUpdate(
            parameters=parameters,
            settings=step_settings,
            outputs=outputs,
        )
        steps[step_name] = step_template
    run_config = PipelineRunConfiguration(
        settings=pipeline_settings, steps=steps
    )
    template = pydantic_utils.TemplateGenerator(run_config).run()
    yaml_string = yaml.dump(template)
    yaml_string = yaml_utils.comment_out_yaml(yaml_string)
    with open(path, "w") as f:
        f.write(yaml_string)
        
BasePipelineMeta            (type)
        
    Pipeline Metaclass responsible for validating the pipeline definition.
Source code in zenml/pipelines/base_pipeline.py
          class BasePipelineMeta(type):
    """Pipeline Metaclass responsible for validating the pipeline definition."""
    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BasePipelineMeta":
        """Saves argument names for later verification purposes.
        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.
        Returns:
            The class.
        """
        dct.setdefault(INSTANCE_CONFIGURATION, {})
        cls = cast(
            Type["BasePipeline"], super().__new__(mcs, name, bases, dct)
        )
        cls.STEP_SPEC = {}
        connect_spec = inspect.getfullargspec(
            inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
        )
        connect_args = connect_spec.args
        if connect_args and connect_args[0] == "self":
            connect_args.pop(0)
        for arg in connect_args:
            arg_type = connect_spec.annotations.get(arg, None)
            cls.STEP_SPEC.update({arg: arg_type})
        return cls
__new__(mcs, name, bases, dct)
  
      special
      staticmethod
  
    Saves argument names for later verification purposes.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| name | str | The name of the class. | required | 
| bases | Tuple[Type[Any], ...] | The base classes of the class. | required | 
| dct | Dict[str, Any] | The dictionary of the class. | required | 
Returns:
| Type | Description | 
|---|---|
| BasePipelineMeta | The class. | 
Source code in zenml/pipelines/base_pipeline.py
          def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
    """Saves argument names for later verification purposes.
    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.
    Returns:
        The class.
    """
    dct.setdefault(INSTANCE_CONFIGURATION, {})
    cls = cast(
        Type["BasePipeline"], super().__new__(mcs, name, bases, dct)
    )
    cls.STEP_SPEC = {}
    connect_spec = inspect.getfullargspec(
        inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
    )
    connect_args = connect_spec.args
    if connect_args and connect_args[0] == "self":
        connect_args.pop(0)
    for arg in connect_args:
        arg_type = connect_spec.annotations.get(arg, None)
        cls.STEP_SPEC.update({arg: arg_type})
    return cls
        pipeline_decorator
    Decorator function for ZenML pipelines.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, settings=None, extra=None)
    Outer decorator function for the creation of a ZenML pipeline.
In order to be able to work with parameters such as "name", it features a nested decorator structure.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| _func | Optional[~F] | The decorated function. | None | 
| name | Optional[str] | The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. | None | 
| enable_cache | Optional[bool] | Whether to use caching or not. | None | 
| enable_artifact_metadata | Optional[bool] | Whether to enable artifact metadata or not. | None | 
| settings | Optional[Dict[str, SettingsOrDict]] | Settings for this pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline. | None | 
Returns:
| Type | Description | 
|---|---|
| Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]] | the inner decorator which creates the pipeline class based on the ZenML BasePipeline | 
Source code in zenml/pipelines/pipeline_decorator.py
          def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline.
    In order to be able to work with parameters such as "name", it features a
    nested decorator structure.
    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """
    def inner_decorator(func: F) -> Type[BasePipeline]:
        """Inner decorator function for the creation of a ZenML pipeline.
        Args:
            func: types.FunctionType, this function will be used as the
                "connect" method of the generated Pipeline
        Returns:
            the class of a newly generated ZenML Pipeline
        """
        return type(  # noqa
            name if name else func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),  # type: ignore[arg-type] # noqa
                INSTANCE_CONFIGURATION: {
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_ENABLE_ARTIFACT_METADATA: enable_artifact_metadata,
                    PARAM_SETTINGS: settings,
                    PARAM_EXTRA_OPTIONS: extra,
                },
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )
    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)