New
        zenml.new
  
      special
  
    
        pipelines
  
      special
  
    
        build_utils
    Pipeline build utilities.
build_required(deployment)
    Checks whether a build is required for the deployment and active stack.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment for which to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | If a build is required. | 
Source code in zenml/new/pipelines/build_utils.py
          def build_required(deployment: "PipelineDeploymentBase") -> bool:
    """Checks whether a build is required for the deployment and active stack.
    Args:
        deployment: The deployment for which to check.
    Returns:
        If a build is required.
    """
    stack = Client().active_stack
    return bool(stack.get_docker_builds(deployment=deployment))
code_download_possible(deployment, code_repository=None)
    Checks whether code download is possible for the deployment.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment. | required | 
| code_repository | Optional[BaseCodeRepository] | If provided, this code repository can be used to download the code inside the container images. | None | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether code download is possible for the deployment. | 
Source code in zenml/new/pipelines/build_utils.py
          def code_download_possible(
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
    """Checks whether code download is possible for the deployment.
    Args:
        deployment: The deployment.
        code_repository: If provided, this code repository can be used to
            download the code inside the container images.
    Returns:
        Whether code download is possible for the deployment.
    """
    for step in deployment.step_configurations.values():
        if step.config.docker_settings.allow_download_from_artifact_store:
            continue
        if (
            step.config.docker_settings.allow_download_from_code_repository
            and code_repository
        ):
            continue
        return False
    return True
compute_build_checksum(items, stack, code_repository=None)
    Compute an overall checksum for a pipeline build.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| items | List[BuildConfiguration] | Items of the build. | required | 
| stack | Stack | The stack associated with the build. Will be used to gather its requirements. | required | 
| code_repository | Optional[BaseCodeRepository] | The code repository that will be used to download files inside the build. Will be used for its dependency specification. | None | 
Returns:
| Type | Description | 
|---|---|
| str | The build checksum. | 
Source code in zenml/new/pipelines/build_utils.py
          def compute_build_checksum(
    items: List["BuildConfiguration"],
    stack: "Stack",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
    """Compute an overall checksum for a pipeline build.
    Args:
        items: Items of the build.
        stack: The stack associated with the build. Will be used to gather
            its requirements.
        code_repository: The code repository that will be used to download
            files inside the build. Will be used for its dependency
            specification.
    Returns:
        The build checksum.
    """
    hash_ = hashlib.md5()  # nosec
    for item in items:
        key = PipelineBuildBase.get_image_key(
            component_key=item.key, step=item.step_name
        )
        settings_checksum = item.compute_settings_checksum(
            stack=stack,
            code_repository=code_repository,
        )
        hash_.update(key.encode())
        hash_.update(settings_checksum.encode())
    return hash_.hexdigest()
compute_stack_checksum(stack)
    Compute a stack checksum.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack | StackResponse | The stack for which to compute the checksum. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The checksum. | 
Source code in zenml/new/pipelines/build_utils.py
          def compute_stack_checksum(stack: StackResponse) -> str:
    """Compute a stack checksum.
    Args:
        stack: The stack for which to compute the checksum.
    Returns:
        The checksum.
    """
    hash_ = hashlib.md5()  # nosec
    # This checksum is used to see if the stack has been updated since a build
    # was created for it. We create this checksum not with specific requirements
    # as these might change with new ZenML releases, but they don't actually
    # invalidate those Docker images.
    required_integrations = sorted(
        {
            component.integration
            for components in stack.components.values()
            for component in components
            if component.integration and component.integration != "built-in"
        }
    )
    for integration in required_integrations:
        hash_.update(integration.encode())
    return hash_.hexdigest()
create_pipeline_build(deployment, pipeline_id=None, code_repository=None)
    Builds images and registers the output in the server.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The pipeline deployment. | required | 
| pipeline_id | Optional[uuid.UUID] | The ID of the pipeline. | None | 
| code_repository | Optional[BaseCodeRepository] | If provided, this code repository will be used to download inside the build images. | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineBuildResponse] | The build output. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If multiple builds with the same key but different settings were specified. | 
Source code in zenml/new/pipelines/build_utils.py
          def create_pipeline_build(
    deployment: "PipelineDeploymentBase",
    pipeline_id: Optional[UUID] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Builds images and registers the output in the server.
    Args:
        deployment: The pipeline deployment.
        pipeline_id: The ID of the pipeline.
        code_repository: If provided, this code repository will be used to
            download inside the build images.
    Returns:
        The build output.
    Raises:
        RuntimeError: If multiple builds with the same key but different
            settings were specified.
    """
    client = Client()
    stack_model = Client().active_stack_model
    stack = client.active_stack
    required_builds = stack.get_docker_builds(deployment=deployment)
    if not required_builds:
        logger.debug("No docker builds required.")
        return None
    logger.info(
        "Building Docker image(s) for pipeline `%s`.",
        deployment.pipeline_configuration.name,
    )
    docker_image_builder = PipelineDockerImageBuilder()
    images: Dict[str, BuildItem] = {}
    checksums: Dict[str, str] = {}
    for build_config in required_builds:
        combined_key = PipelineBuildBase.get_image_key(
            component_key=build_config.key, step=build_config.step_name
        )
        checksum = build_config.compute_settings_checksum(
            stack=stack, code_repository=code_repository
        )
        if combined_key in images:
            previous_checksum = images[combined_key].settings_checksum
            if previous_checksum != checksum:
                raise RuntimeError(
                    f"Trying to build image for key `{combined_key}` but "
                    "an image for this key was already built with a "
                    "different configuration. This happens if multiple "
                    "stack components specified Docker builds for the same "
                    "key in the `StackComponent.get_docker_builds(...)` "
                    "method. If you're using custom components, make sure "
                    "to provide unique keys when returning your build "
                    "configurations to avoid this error."
                )
            else:
                continue
        if checksum in checksums:
            item_key = checksums[checksum]
            image_name_or_digest = images[item_key].image
            contains_code = images[item_key].contains_code
            dockerfile = images[item_key].dockerfile
            requirements = images[item_key].requirements
        else:
            tag = deployment.pipeline_configuration.name
            if build_config.step_name:
                tag += f"-{build_config.step_name}"
            tag += f"-{build_config.key}"
            include_files = build_config.should_include_files(
                code_repository=code_repository,
            )
            download_files = build_config.should_download_files(
                code_repository=code_repository,
            )
            pass_code_repo = (
                build_config.should_download_files_from_code_repository(
                    code_repository=code_repository
                )
            )
            (
                image_name_or_digest,
                dockerfile,
                requirements,
            ) = docker_image_builder.build_docker_image(
                docker_settings=build_config.settings,
                tag=tag,
                stack=stack,
                include_files=include_files,
                download_files=download_files,
                entrypoint=build_config.entrypoint,
                extra_files=build_config.extra_files,
                code_repository=code_repository if pass_code_repo else None,
            )
            contains_code = include_files
        images[combined_key] = BuildItem(
            image=image_name_or_digest,
            dockerfile=dockerfile,
            requirements=requirements,
            settings_checksum=checksum,
            contains_code=contains_code,
            requires_code_download=download_files,
        )
        checksums[checksum] = combined_key
    logger.info("Finished building Docker image(s).")
    is_local = stack.container_registry is None
    contains_code = any(item.contains_code for item in images.values())
    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )
    stack_checksum = compute_stack_checksum(stack=stack_model)
    build_request = PipelineBuildRequest(
        user=client.active_user.id,
        workspace=client.active_workspace.id,
        stack=stack_model.id,
        pipeline=pipeline_id,
        is_local=is_local,
        contains_code=contains_code,
        images=images,
        zenml_version=zenml.__version__,
        python_version=platform.python_version(),
        checksum=build_checksum,
        stack_checksum=stack_checksum,
    )
    return client.zen_store.create_build(build_request)
find_existing_build(deployment, code_repository=None)
    Find an existing build for a deployment.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment for which to find an existing build. | required | 
| code_repository | Optional[BaseCodeRepository] | The code repository that will be used to download files in the images. | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineBuildResponse] | The existing build to reuse if found. | 
Source code in zenml/new/pipelines/build_utils.py
          def find_existing_build(
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Find an existing build for a deployment.
    Args:
        deployment: The deployment for which to find an existing build.
        code_repository: The code repository that will be used to download
            files in the images.
    Returns:
        The existing build to reuse if found.
    """
    client = Client()
    stack = client.active_stack
    python_version_prefix = ".".join(platform.python_version_tuple()[:2])
    required_builds = stack.get_docker_builds(deployment=deployment)
    if not required_builds:
        return None
    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )
    matches = client.list_builds(
        sort_by="desc:created",
        size=1,
        stack_id=stack.id,
        # The build is local and it's not clear whether the images
        # exist on the current machine or if they've been overwritten.
        # TODO: Should we support this by storing the unique Docker ID for
        #   the image and checking if an image with that ID exists locally?
        is_local=False,
        # The build contains some code which might be different from the
        # local code the user is expecting to run
        contains_code=False,
        zenml_version=zenml.__version__,
        # Match all patch versions of the same Python major + minor
        python_version=f"startswith:{python_version_prefix}",
        checksum=build_checksum,
    )
    if not matches.items:
        return None
    return matches[0]
requires_download_from_code_repository(deployment)
    Checks whether the deployment needs to download code from a repository.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | If the deployment needs to download code from a code repository. | 
Source code in zenml/new/pipelines/build_utils.py
          def requires_download_from_code_repository(
    deployment: "PipelineDeploymentBase",
) -> bool:
    """Checks whether the deployment needs to download code from a repository.
    Args:
        deployment: The deployment.
    Returns:
        If the deployment needs to download code from a code repository.
    """
    for step in deployment.step_configurations.values():
        docker_settings = step.config.docker_settings
        if docker_settings.allow_download_from_artifact_store:
            return False
        if docker_settings.allow_including_files_in_images:
            return False
        if docker_settings.allow_download_from_code_repository:
            # The other two options are false, which means download from a
            # code repo is required.
            return True
    return False
requires_included_code(deployment, code_repository=None)
    Checks whether the deployment requires included code.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment. | required | 
| code_repository | Optional[BaseCodeRepository] | If provided, this code repository can be used to download the code inside the container images. | None | 
Returns:
| Type | Description | 
|---|---|
| bool | If the deployment requires code included in the container images. | 
Source code in zenml/new/pipelines/build_utils.py
          def requires_included_code(
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
    """Checks whether the deployment requires included code.
    Args:
        deployment: The deployment.
        code_repository: If provided, this code repository can be used to
            download the code inside the container images.
    Returns:
        If the deployment requires code included in the container images.
    """
    for step in deployment.step_configurations.values():
        docker_settings = step.config.docker_settings
        if docker_settings.allow_download_from_artifact_store:
            return False
        if docker_settings.allow_download_from_code_repository:
            if code_repository:
                continue
        if docker_settings.allow_including_files_in_images:
            return True
    return False
reuse_or_create_pipeline_build(deployment, allow_build_reuse, pipeline_id=None, build=None, code_repository=None)
    Loads or creates a pipeline build.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The pipeline deployment for which to load or create the build. | required | 
| allow_build_reuse | bool | If True, the build is allowed to reuse an existing build. | required | 
| pipeline_id | Optional[uuid.UUID] | Optional ID of the pipeline to reference in the build. | None | 
| build | Union[UUID, PipelineBuildBase] | Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created. | None | 
| code_repository | Optional[BaseCodeRepository] | If provided, this code repository can be used to download code inside the container images. | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineBuildResponse] | The build response. | 
Source code in zenml/new/pipelines/build_utils.py
          def reuse_or_create_pipeline_build(
    deployment: "PipelineDeploymentBase",
    allow_build_reuse: bool,
    pipeline_id: Optional[UUID] = None,
    build: Union["UUID", "PipelineBuildBase", None] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Loads or creates a pipeline build.
    Args:
        deployment: The pipeline deployment for which to load or create the
            build.
        allow_build_reuse: If True, the build is allowed to reuse an
            existing build.
        pipeline_id: Optional ID of the pipeline to reference in the build.
        build: Optional existing build. If given, the build will be fetched
            (or registered) in the database. If not given, a new build will
            be created.
        code_repository: If provided, this code repository can be used to
            download code inside the container images.
    Returns:
        The build response.
    """
    if not build:
        if (
            allow_build_reuse
            and not deployment.should_prevent_build_reuse
            and not requires_included_code(
                deployment=deployment, code_repository=code_repository
            )
            and build_required(deployment=deployment)
        ):
            existing_build = find_existing_build(
                deployment=deployment, code_repository=code_repository
            )
            if existing_build:
                logger.info(
                    "Reusing existing build `%s` for stack `%s`.",
                    existing_build.id,
                    Client().active_stack.name,
                )
                return existing_build
            else:
                logger.info(
                    "Unable to find a build to reuse. A previous build can be "
                    "reused when the following conditions are met:\n"
                    "  * The existing build was created for the same stack, "
                    "ZenML version and Python version\n"
                    "  * The stack contains a container registry\n"
                    "  * The Docker settings of the pipeline and all its steps "
                    "are the same as for the existing build."
                )
        return create_pipeline_build(
            deployment=deployment,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )
    if isinstance(build, UUID):
        build_model = Client().zen_store.get_build(build_id=build)
    else:
        build_request = PipelineBuildRequest(
            user=Client().active_user.id,
            workspace=Client().active_workspace.id,
            stack=Client().active_stack_model.id,
            pipeline=pipeline_id,
            **build.model_dump(),
        )
        build_model = Client().zen_store.create_build(build=build_request)
    verify_custom_build(
        build=build_model,
        deployment=deployment,
        code_repository=code_repository,
    )
    return build_model
should_upload_code(deployment, build, code_reference)
    Checks whether the current code should be uploaded for the deployment.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment. | required | 
| build | Optional[zenml.models.v2.core.pipeline_build.PipelineBuildResponse] | The build for the deployment. | required | 
| code_reference | Optional[zenml.models.v2.core.code_reference.CodeReferenceRequest] | The code reference for the deployment. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the current code should be uploaded for the deployment. | 
Source code in zenml/new/pipelines/build_utils.py
          def should_upload_code(
    deployment: PipelineDeploymentBase,
    build: Optional[PipelineBuildResponse],
    code_reference: Optional[CodeReferenceRequest],
) -> bool:
    """Checks whether the current code should be uploaded for the deployment.
    Args:
        deployment: The deployment.
        build: The build for the deployment.
        code_reference: The code reference for the deployment.
    Returns:
        Whether the current code should be uploaded for the deployment.
    """
    if not build:
        # No build means we don't need to download code into a Docker container
        # for step execution. In other remote orchestrators that don't use
        # Docker containers but instead use e.g. Wheels to run, the code should
        # already be included.
        return False
    for step in deployment.step_configurations.values():
        docker_settings = step.config.docker_settings
        if (
            code_reference
            and docker_settings.allow_download_from_code_repository
        ):
            # No upload needed for this step
            continue
        if docker_settings.allow_download_from_artifact_store:
            return True
    return False
verify_custom_build(build, deployment, code_repository=None)
    Verify a custom build for a pipeline deployment.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| build | PipelineBuildResponse | The build to verify. | required | 
| deployment | PipelineDeploymentBase | The deployment for which to verify the build. | required | 
| code_repository | Optional[BaseCodeRepository] | Code repository that will be used to download files for the deployment. | None | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the build can't be used for the deployment. | 
Source code in zenml/new/pipelines/build_utils.py
          def verify_custom_build(
    build: "PipelineBuildResponse",
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
    """Verify a custom build for a pipeline deployment.
    Args:
        build: The build to verify.
        deployment: The deployment for which to verify the build.
        code_repository: Code repository that will be used to download files
            for the deployment.
    Raises:
        RuntimeError: If the build can't be used for the deployment.
    """
    stack = Client().active_stack
    required_builds = stack.get_docker_builds(deployment=deployment)
    if build.stack and build.stack.id != stack.id:
        logger.warning(
            "The stack `%s` used for the build `%s` is not the same as the "
            "stack `%s` that the pipeline will run on. This could lead "
            "to issues if the stacks have different build requirements.",
            build.stack.name,
            build.id,
            stack.name,
        )
    if build.contains_code:
        logger.warning(
            "The build you specified for this run contains code and will run "
            "with the step code that was included in the Docker images which "
            "might differ from the local code in your client environment."
        )
    if build.requires_code_download:
        if requires_included_code(
            deployment=deployment, code_repository=code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be included in the Docker "
                "image, but the build you "
                "specified requires code download. Either update your "
                "`DockerSettings` or specify a different build and try "
                "again."
            )
        if (
            requires_download_from_code_repository(deployment=deployment)
            and not code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be downloaded from a "
                "code repository but "
                "there is no code repository active at your current source "
                f"root `{source_utils.get_source_root()}`."
            )
        if not code_download_possible(
            deployment=deployment, code_repository=code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code can not be downloaded from the "
                "artifact store, but the build you specified requires code "
                "download. Either update your `DockerSettings` or specify a "
                "different build and try again."
            )
    if build.checksum:
        build_checksum = compute_build_checksum(
            required_builds, stack=stack, code_repository=code_repository
        )
        if build_checksum != build.checksum:
            logger.warning(
                "The Docker settings used for the build `%s` are "
                "not the same as currently specified for your pipeline. "
                "This means that the build you specified to run this "
                "pipeline might be outdated and most likely contains "
                "outdated requirements.",
                build.id,
            )
    else:
        # No checksum given for the entire build, we manually check that
        # all the images exist and the setting match
        for build_config in required_builds:
            try:
                image = build.get_image(
                    component_key=build_config.key,
                    step=build_config.step_name,
                )
            except KeyError:
                raise RuntimeError(
                    "The build you specified is missing an image for key: "
                    f"{build_config.key}."
                )
            if build_config.compute_settings_checksum(
                stack=stack, code_repository=code_repository
            ) != build.get_settings_checksum(
                component_key=build_config.key, step=build_config.step_name
            ):
                logger.warning(
                    "The Docker settings used to build the image `%s` are "
                    "not the same as currently specified for your pipeline. "
                    "This means that the build you specified to run this "
                    "pipeline might be outdated and most likely contains "
                    "outdated code or requirements.",
                    image,
                )
    if build.is_local:
        logger.warning(
            "You manually specified a local build to run your pipeline. "
            "This might lead to errors if the images don't exist on "
            "your local machine or the image tags have been "
            "overwritten since the original build happened."
        )
verify_local_repository_context(deployment, local_repo_context)
    Verifies the local repository.
If the local repository exists and has no local changes, code download inside the images is possible.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The pipeline deployment. | required | 
| local_repo_context | Optional[LocalRepositoryContext] | The local repository active at the source root. | required | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the deployment requires code download but code download is not possible. | 
Returns:
| Type | Description | 
|---|---|
| Optional[zenml.code_repositories.base_code_repository.BaseCodeRepository] | The code repository from which to download files for the runs of the deployment, or None if code download is not possible. | 
Source code in zenml/new/pipelines/build_utils.py
          def verify_local_repository_context(
    deployment: "PipelineDeploymentBase",
    local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
    """Verifies the local repository.
    If the local repository exists and has no local changes, code download
    inside the images is possible.
    Args:
        deployment: The pipeline deployment.
        local_repo_context: The local repository active at the source root.
    Raises:
        RuntimeError: If the deployment requires code download but code download
            is not possible.
    Returns:
        The code repository from which to download files for the runs of the
        deployment, or None if code download is not possible.
    """
    if build_required(deployment=deployment):
        if requires_download_from_code_repository(deployment=deployment):
            if not local_repo_context:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "there is no code repository active at your current source "
                    f"root `{source_utils.get_source_root()}`."
                )
            elif local_repo_context.is_dirty:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "the code repository active at your current source root "
                    f"`{source_utils.get_source_root()}` has uncommitted "
                    "changes."
                )
            elif local_repo_context.has_local_changes:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "the code repository active at your current source root "
                    f"`{source_utils.get_source_root()}` has unpushed "
                    "changes."
                )
        if local_repo_context:
            if local_repo_context.is_dirty:
                logger.warning(
                    "Unable to use code repository to download code for this "
                    "run as there are uncommitted changes."
                )
            elif local_repo_context.has_local_changes:
                logger.warning(
                    "Unable to use code repository to download code for this "
                    "run as there are unpushed changes."
                )
    code_repository = None
    if local_repo_context and not local_repo_context.has_local_changes:
        model = Client().get_code_repository(
            local_repo_context.code_repository_id
        )
        code_repository = BaseCodeRepository.from_model(model)
    return code_repository
        code_archive
    Code archive.
        
CodeArchive            (Archivable)
        
    Code archive class.
This class is used to archive user code before uploading it to the artifact store. If the user code is stored in a Git repository, only files not excluded by gitignores will be included in the archive.
Source code in zenml/new/pipelines/code_archive.py
          class CodeArchive(Archivable):
    """Code archive class.
    This class is used to archive user code before uploading it to the artifact
    store. If the user code is stored in a Git repository, only files not
    excluded by gitignores will be included in the archive.
    """
    def __init__(self, root: str) -> None:
        """Initialize the object.
        Args:
            root: Root directory of the archive.
        """
        super().__init__()
        self._root = root
    @property
    def git_repo(self) -> Optional["Repo"]:
        """Git repository active at the code archive root.
        Returns:
            The git repository if available.
        """
        try:
            # These imports fail when git is not installed on the machine
            from git.exc import InvalidGitRepositoryError
            from git.repo.base import Repo
        except ImportError:
            return None
        try:
            git_repo = Repo(path=self._root, search_parent_directories=True)
        except InvalidGitRepositoryError:
            return None
        return git_repo
    def _get_all_files(self) -> Dict[str, str]:
        """Get all files inside the archive root.
        Returns:
            All files inside the archive root.
        """
        all_files = {}
        for root, _, files in os.walk(self._root):
            for file in files:
                file_path = os.path.join(root, file)
                path_in_archive = os.path.relpath(file_path, self._root)
                all_files[path_in_archive] = file_path
        return all_files
    def get_files(self) -> Dict[str, str]:
        """Gets all regular files that should be included in the archive.
        Raises:
            RuntimeError: If the code archive would not include any files.
        Returns:
            A dict {path_in_archive: path_on_filesystem} for all regular files
            in the archive.
        """
        all_files = {}
        if repo := self.git_repo:
            try:
                result = repo.git.ls_files(
                    "--cached",
                    "--others",
                    "--modified",
                    "--exclude-standard",
                    self._root,
                )
            except Exception as e:
                logger.warning(
                    "Failed to get non-ignored files from git: %s", str(e)
                )
                all_files = self._get_all_files()
            else:
                for file in result.split():
                    file_path = os.path.join(repo.working_dir, file)
                    path_in_archive = os.path.relpath(file_path, self._root)
                    if os.path.exists(file_path):
                        all_files[path_in_archive] = file_path
        else:
            all_files = self._get_all_files()
        if not all_files:
            raise RuntimeError(
                "The code archive to be uploaded does not contain any files. "
                "This is probably because all files in your source root "
                f"`{self._root}` are ignored by a .gitignore file."
            )
        # Explicitly remove .zen directories as we write an updated version
        # to disk everytime ZenML is called. This updates the mtime of the
        # file, which invalidates the code upload caching. The values in
        # the .zen directory are not needed anyway as we set them as
        # environment variables.
        all_files = {
            path_in_archive: file_path
            for path_in_archive, file_path in sorted(all_files.items())
            if ".zen" not in Path(path_in_archive).parts[:-1]
        }
        return all_files
    def write_archive(
        self, output_file: IO[bytes], use_gzip: bool = True
    ) -> None:
        """Writes an archive of the build context to the given file.
        Args:
            output_file: The file to write the archive to.
            use_gzip: Whether to use `gzip` to compress the file.
        """
        super().write_archive(output_file=output_file, use_gzip=use_gzip)
        archive_size = os.path.getsize(output_file.name)
        if archive_size > 20 * 1024 * 1024:
            logger.warning(
                "Code archive size: `%s`. If you believe this is "
                "unreasonably large, make sure to version your code in git and "
                "ignore unnecessary files using a `.gitignore` file.",
                string_utils.get_human_readable_filesize(archive_size),
            )
git_repo: Optional[Repo]
  
      property
      readonly
  
    Git repository active at the code archive root.
Returns:
| Type | Description | 
|---|---|
| Optional[Repo] | The git repository if available. | 
__init__(self, root)
  
      special
  
    Initialize the object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| root | str | Root directory of the archive. | required | 
Source code in zenml/new/pipelines/code_archive.py
          def __init__(self, root: str) -> None:
    """Initialize the object.
    Args:
        root: Root directory of the archive.
    """
    super().__init__()
    self._root = root
get_files(self)
    Gets all regular files that should be included in the archive.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the code archive would not include any files. | 
Returns:
| Type | Description | 
|---|---|
| A dict {path_in_archive | path_on_filesystem} for all regular files in the archive. | 
Source code in zenml/new/pipelines/code_archive.py
          def get_files(self) -> Dict[str, str]:
    """Gets all regular files that should be included in the archive.
    Raises:
        RuntimeError: If the code archive would not include any files.
    Returns:
        A dict {path_in_archive: path_on_filesystem} for all regular files
        in the archive.
    """
    all_files = {}
    if repo := self.git_repo:
        try:
            result = repo.git.ls_files(
                "--cached",
                "--others",
                "--modified",
                "--exclude-standard",
                self._root,
            )
        except Exception as e:
            logger.warning(
                "Failed to get non-ignored files from git: %s", str(e)
            )
            all_files = self._get_all_files()
        else:
            for file in result.split():
                file_path = os.path.join(repo.working_dir, file)
                path_in_archive = os.path.relpath(file_path, self._root)
                if os.path.exists(file_path):
                    all_files[path_in_archive] = file_path
    else:
        all_files = self._get_all_files()
    if not all_files:
        raise RuntimeError(
            "The code archive to be uploaded does not contain any files. "
            "This is probably because all files in your source root "
            f"`{self._root}` are ignored by a .gitignore file."
        )
    # Explicitly remove .zen directories as we write an updated version
    # to disk everytime ZenML is called. This updates the mtime of the
    # file, which invalidates the code upload caching. The values in
    # the .zen directory are not needed anyway as we set them as
    # environment variables.
    all_files = {
        path_in_archive: file_path
        for path_in_archive, file_path in sorted(all_files.items())
        if ".zen" not in Path(path_in_archive).parts[:-1]
    }
    return all_files
write_archive(self, output_file, use_gzip=True)
    Writes an archive of the build context to the given file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_file | IO[bytes] | The file to write the archive to. | required | 
| use_gzip | bool | Whether to use  | True | 
Source code in zenml/new/pipelines/code_archive.py
          def write_archive(
    self, output_file: IO[bytes], use_gzip: bool = True
) -> None:
    """Writes an archive of the build context to the given file.
    Args:
        output_file: The file to write the archive to.
        use_gzip: Whether to use `gzip` to compress the file.
    """
    super().write_archive(output_file=output_file, use_gzip=use_gzip)
    archive_size = os.path.getsize(output_file.name)
    if archive_size > 20 * 1024 * 1024:
        logger.warning(
            "Code archive size: `%s`. If you believe this is "
            "unreasonably large, make sure to version your code in git and "
            "ignore unnecessary files using a `.gitignore` file.",
            string_utils.get_human_readable_filesize(archive_size),
        )
        pipeline
    Definition of a ZenML pipeline.
        
Pipeline        
    ZenML pipeline class.
Source code in zenml/new/pipelines/pipeline.py
          class Pipeline:
    """ZenML pipeline class."""
    # The active pipeline is the pipeline to which step invocations will be
    # added when a step is called. It is set using a context manager when a
    # pipeline is called (see Pipeline.__call__ for more context)
    ACTIVE_PIPELINE: ClassVar[Optional["Pipeline"]] = None
    def __init__(
        self,
        name: str,
        entrypoint: F,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
        model: Optional["Model"] = None,
    ) -> None:
        """Initializes a pipeline.
        Args:
            name: The name of the pipeline.
            entrypoint: The entrypoint function of the pipeline.
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline.
            enable_step_logs: If step logs should be enabled for this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            on_failure: Callback function in event of failure of the step. Can
                be a function with a single argument of type `BaseException`, or
                a source path to such a function (e.g. `module.my_function`).
            on_success: Callback function in event of success of the step. Can
                be a function with no arguments, or a source path to such a
                function (e.g. `module.my_function`).
            model: configuration of the model in the Model Control Plane.
        """
        self._invocations: Dict[str, StepInvocation] = {}
        self._run_args: Dict[str, Any] = {}
        self._configuration = PipelineConfiguration(
            name=name,
        )
        self._from_config_file: Dict[str, Any] = {}
        with self.__suppress_configure_warnings__():
            self.configure(
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                enable_artifact_visualization=enable_artifact_visualization,
                enable_step_logs=enable_step_logs,
                settings=settings,
                extra=extra,
                on_failure=on_failure,
                on_success=on_success,
                model=model,
            )
        self.entrypoint = entrypoint
        self._parameters: Dict[str, Any] = {}
        self.__suppress_warnings_flag__ = False
    @property
    def name(self) -> str:
        """The name of the pipeline.
        Returns:
            The name of the pipeline.
        """
        return self.configuration.name
    @property
    def enable_cache(self) -> Optional[bool]:
        """If caching is enabled for the pipeline.
        Returns:
            If caching is enabled for the pipeline.
        """
        return self.configuration.enable_cache
    @property
    def configuration(self) -> PipelineConfiguration:
        """The configuration of the pipeline.
        Returns:
            The configuration of the pipeline.
        """
        return self._configuration
    @property
    def invocations(self) -> Dict[str, StepInvocation]:
        """Returns the step invocations of this pipeline.
        This dictionary will only be populated once the pipeline has been
        called.
        Returns:
            The step invocations.
        """
        return self._invocations
    def resolve(self) -> "Source":
        """Resolves the pipeline.
        Returns:
            The pipeline source.
        """
        return source_utils.resolve(self.entrypoint, skip_validation=True)
    @property
    def source_object(self) -> Any:
        """The source object of this pipeline.
        Returns:
            The source object of this pipeline.
        """
        return self.entrypoint
    @property
    def source_code(self) -> str:
        """The source code of this pipeline.
        Returns:
            The source code of this pipeline.
        """
        return inspect.getsource(self.source_object)
    @property
    def model(self) -> "PipelineResponse":
        """Gets the registered pipeline model for this instance.
        Returns:
            The registered pipeline model.
        Raises:
            RuntimeError: If the pipeline has not been registered yet.
        """
        self._prepare_if_possible()
        pipelines = Client().list_pipelines(name=self.name)
        if len(pipelines) == 1:
            return pipelines.items[0]
        raise RuntimeError(
            f"Cannot get the model of pipeline '{self.name}' because it has "
            f"not been registered yet. Please ensure that the pipeline has "
            f"been run or built and try again."
        )
    @contextmanager
    def __suppress_configure_warnings__(self) -> Iterator[Any]:
        """Context manager to suppress warnings in `Pipeline.configure(...)`.
        Used to suppress warnings when called from inner code and not user-facing code.
        Yields:
            Nothing.
        """
        self.__suppress_warnings_flag__ = True
        yield
        self.__suppress_warnings_flag__ = False
    def configure(
        self: T,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
        model: Optional["Model"] = None,
        parameters: Optional[Dict[str, Any]] = None,
        merge: bool = True,
    ) -> T:
        """Configures the pipeline.
        Configuration merging example:
        * `merge==True`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=True)
            pipeline.configuration.extra # {"key1": 1, "key2": 2}
        * `merge==False`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=False)
            pipeline.configuration.extra # {"key2": 2}
        Args:
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline.
            enable_step_logs: If step logs should be enabled for this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            on_failure: Callback function in event of failure of the step. Can
                be a function with a single argument of type `BaseException`, or
                a source path to such a function (e.g. `module.my_function`).
            on_success: Callback function in event of success of the step. Can
                be a function with no arguments, or a source path to such a
                function (e.g. `module.my_function`).
            merge: If `True`, will merge the given dictionary configurations
                like `extra` and `settings` with existing
                configurations. If `False` the given configurations will
                overwrite all existing ones. See the general description of this
                method for an example.
            model: configuration of the model version in the Model Control Plane.
            parameters: input parameters for the pipeline.
        Returns:
            The pipeline instance that this method was called on.
        """
        failure_hook_source = None
        if on_failure:
            # string of on_failure hook function to be used for this pipeline
            failure_hook_source = resolve_and_validate_hook(on_failure)
        success_hook_source = None
        if on_success:
            # string of on_success hook function to be used for this pipeline
            success_hook_source = resolve_and_validate_hook(on_success)
        values = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "enable_artifact_visualization": enable_artifact_visualization,
                "enable_step_logs": enable_step_logs,
                "settings": settings,
                "extra": extra,
                "failure_hook_source": failure_hook_source,
                "success_hook_source": success_hook_source,
                "model": model,
                "parameters": parameters,
            }
        )
        if not self.__suppress_warnings_flag__:
            to_be_reapplied = []
            for param_, value_ in values.items():
                if (
                    param_ in PipelineRunConfiguration.model_fields
                    and param_ in self._from_config_file
                    and value_ != self._from_config_file[param_]
                ):
                    to_be_reapplied.append(
                        (param_, self._from_config_file[param_], value_)
                    )
            if to_be_reapplied:
                msg = ""
                reapply_during_run_warning = (
                    "The value of parameter '{name}' has changed from "
                    "'{file_value}' to '{new_value}' set in your configuration "
                    "file.\n"
                )
                for name, file_value, new_value in to_be_reapplied:
                    msg += reapply_during_run_warning.format(
                        name=name, file_value=file_value, new_value=new_value
                    )
                msg += (
                    "Configuration file value will be used during pipeline "
                    "run, so you change will not be efficient. Consider "
                    "updating your configuration file instead."
                )
                logger.warning(msg)
        config = PipelineConfigurationUpdate(**values)
        self._apply_configuration(config, merge=merge)
        return self
    @property
    def requires_parameters(self) -> bool:
        """If the pipeline entrypoint requires parameters.
        Returns:
            If the pipeline entrypoint requires parameters.
        """
        signature = inspect.signature(self.entrypoint, follow_wrapped=True)
        return any(
            parameter.default is inspect.Parameter.empty
            for parameter in signature.parameters.values()
        )
    @property
    def is_prepared(self) -> bool:
        """If the pipeline is prepared.
        Prepared means that the pipeline entrypoint has been called and the
        pipeline is fully defined.
        Returns:
            If the pipeline is prepared.
        """
        return len(self.invocations) > 0
    def prepare(self, *args: Any, **kwargs: Any) -> None:
        """Prepares the pipeline.
        Args:
            *args: Pipeline entrypoint input arguments.
            **kwargs: Pipeline entrypoint input keyword arguments.
        Raises:
            RuntimeError: If the pipeline has parameters configured differently in
                configuration file and code.
        """
        # Clear existing parameters and invocations
        self._parameters = {}
        self._invocations = {}
        conflicting_parameters = {}
        parameters_ = (self.configuration.parameters or {}).copy()
        if from_file_ := self._from_config_file.get("parameters", None):
            parameters_ = dict_utils.recursive_update(parameters_, from_file_)
        if parameters_:
            for k, v_runtime in kwargs.items():
                if k in parameters_:
                    v_config = parameters_[k]
                    if v_config != v_runtime:
                        conflicting_parameters[k] = (v_config, v_runtime)
            if conflicting_parameters:
                is_plural = "s" if len(conflicting_parameters) > 1 else ""
                msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
                for key, values in conflicting_parameters.items():
                    msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
                msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
    parameters:
        param_name: value1
# pipeline.py
@pipeline
def pipeline_(param_name: str):
    step_name()
if __name__=="__main__":
    pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
                raise RuntimeError(msg)
            for k, v_config in parameters_.items():
                if k not in kwargs:
                    kwargs[k] = v_config
        with self:
            # Enter the context manager, so we become the active pipeline. This
            # means that all steps that get called while the entrypoint function
            # is executed will be added as invocation to this pipeline instance.
            self._call_entrypoint(*args, **kwargs)
    def register(self) -> "PipelineResponse":
        """Register the pipeline in the server.
        Returns:
            The registered pipeline model.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry
        self._prepare_if_possible()
        integration_registry.activate_integrations()
        if self.configuration.model_dump(
            exclude_defaults=True, exclude={"name"}
        ):
            logger.warning(
                f"The pipeline `{self.name}` that you're registering has "
                "custom configurations applied to it. These will not be "
                "registered with the pipeline and won't be set when you build "
                "images or run the pipeline from the CLI. To provide these "
                "configurations, use the `--config` option of the `zenml "
                "pipeline build/run` commands."
            )
        return self._register()
    def build(
        self,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        config_path: Optional[str] = None,
    ) -> Optional["PipelineBuildResponse"]:
        """Builds Docker images for the pipeline.
        Args:
            settings: Settings for the pipeline.
            step_configurations: Configurations for steps of the pipeline.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
        Returns:
            The build output.
        """
        with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
            self._prepare_if_possible()
            deployment, _, _ = self._compile(
                config_path=config_path,
                steps=step_configurations,
                settings=settings,
            )
            pipeline_id = self._register().id
            local_repo = code_repository_utils.find_active_code_repository()
            code_repository = build_utils.verify_local_repository_context(
                deployment=deployment, local_repo_context=local_repo
            )
            return build_utils.create_pipeline_build(
                deployment=deployment,
                pipeline_id=pipeline_id,
                code_repository=code_repository,
            )
    def _run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBase", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
    ) -> Optional[PipelineRunResponse]:
        """Runs the pipeline on the active stack.
        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline run.
            enable_step_logs: If step logs should be enabled for this pipeline.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: DEPRECATED: Use
                `DockerSettings.prevent_build_reuse` instead.
        Returns:
            Model of the pipeline run if running without a schedule, `None` if
            running with a schedule.
        Raises:
            ValueError: if the orchestrator doesn't support scheduling, but schedule was given
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls when importing modules needed to run a step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return None
        logger.info(f"Initiating a new run for the pipeline: `{self.name}`.")
        with track_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
            deployment, schedule, build = self._compile(
                config_path=config_path,
                run_name=run_name,
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                enable_artifact_visualization=enable_artifact_visualization,
                enable_step_logs=enable_step_logs,
                steps=step_configurations,
                settings=settings,
                schedule=schedule,
                build=build,
                extra=extra,
            )
            skip_pipeline_registration = constants.handle_bool_env_var(
                constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
                default=False,
            )
            register_pipeline = not (skip_pipeline_registration or unlisted)
            pipeline_id = None
            if register_pipeline:
                pipeline_id = self._register().id
            else:
                logger.debug(f"Pipeline {self.name} is unlisted.")
            stack = Client().active_stack
            stack.validate()
            schedule_id = None
            if schedule:
                if not stack.orchestrator.config.is_schedulable:
                    raise ValueError(
                        f"Stack {stack.name} does not support scheduling. "
                        "Not all orchestrator types support scheduling, "
                        "kindly consult with "
                        "https://docs.zenml.io/how-to/build-pipelines/schedule-a-pipeline "
                        "for details."
                    )
                if schedule.name:
                    schedule_name = schedule.name
                else:
                    schedule_name = format_name_template(
                        deployment.run_name_template
                    )
                components = Client().active_stack_model.components
                orchestrator = components[StackComponentType.ORCHESTRATOR][0]
                schedule_model = ScheduleRequest(
                    workspace=Client().active_workspace.id,
                    user=Client().active_user.id,
                    pipeline_id=pipeline_id,
                    orchestrator_id=orchestrator.id,
                    name=schedule_name,
                    active=True,
                    cron_expression=schedule.cron_expression,
                    start_time=schedule.start_time,
                    end_time=schedule.end_time,
                    interval_second=schedule.interval_second,
                    catchup=schedule.catchup,
                    run_once_start_time=schedule.run_once_start_time,
                )
                schedule_id = (
                    Client().zen_store.create_schedule(schedule_model).id
                )
                logger.info(
                    f"Created schedule `{schedule_name}` for pipeline "
                    f"`{deployment.pipeline_configuration.name}`."
                )
            stack = Client().active_stack
            stack.validate()
            upload_notebook_cell_code_if_necessary(
                deployment=deployment, stack=stack
            )
            local_repo_context = (
                code_repository_utils.find_active_code_repository()
            )
            code_repository = build_utils.verify_local_repository_context(
                deployment=deployment, local_repo_context=local_repo_context
            )
            if prevent_build_reuse:
                logger.warning(
                    "Passing `prevent_build_reuse=True` to "
                    "`pipeline.with_opitions(...)` is deprecated. Use "
                    "`DockerSettings.prevent_build_reuse` instead."
                )
            build_model = build_utils.reuse_or_create_pipeline_build(
                deployment=deployment,
                pipeline_id=pipeline_id,
                allow_build_reuse=not prevent_build_reuse,
                build=build,
                code_repository=code_repository,
            )
            build_id = build_model.id if build_model else None
            code_reference = None
            if local_repo_context and not local_repo_context.is_dirty:
                source_root = source_utils.get_source_root()
                subdirectory = (
                    Path(source_root)
                    .resolve()
                    .relative_to(local_repo_context.root)
                )
                code_reference = CodeReferenceRequest(
                    commit=local_repo_context.current_commit,
                    subdirectory=subdirectory.as_posix(),
                    code_repository=local_repo_context.code_repository_id,
                )
            code_path = None
            if build_utils.should_upload_code(
                deployment=deployment,
                build=build_model,
                code_reference=code_reference,
            ):
                code_archive = code_utils.CodeArchive(
                    root=source_utils.get_source_root()
                )
                logger.info("Archiving pipeline code...")
                code_path = code_utils.upload_code_if_necessary(code_archive)
            deployment_request = PipelineDeploymentRequest(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=stack.id,
                pipeline=pipeline_id,
                build=build_id,
                schedule=schedule_id,
                code_reference=code_reference,
                code_path=code_path,
                **deployment.model_dump(),
            )
            deployment_model = Client().zen_store.create_deployment(
                deployment=deployment_request
            )
            self.log_pipeline_deployment_metadata(deployment_model)
            run = create_placeholder_run(deployment=deployment_model)
            analytics_handler.metadata = self._get_pipeline_analytics_metadata(
                deployment=deployment_model,
                stack=stack,
                run_id=run.id if run else None,
            )
            if run:
                run_url = dashboard_utils.get_run_url(run)
                if run_url:
                    logger.info(f"Dashboard URL for Pipeline Run: {run_url}")
                else:
                    logger.info(
                        "You can visualize your pipeline runs in the `ZenML "
                        "Dashboard`. In order to try it locally, please run "
                        "`zenml up`."
                    )
            deploy_pipeline(
                deployment=deployment_model, stack=stack, placeholder_run=run
            )
            if run:
                return Client().get_pipeline_run(run.id)
            return None
    @staticmethod
    def log_pipeline_deployment_metadata(
        deployment_model: PipelineDeploymentResponse,
    ) -> None:
        """Displays logs based on the deployment model upon running a pipeline.
        Args:
            deployment_model: The model for the pipeline deployment
        """
        try:
            # Log about the schedule/run
            if deployment_model.schedule:
                logger.info(
                    "Scheduling a run with the schedule: "
                    f"`{deployment_model.schedule.name}`"
                )
            else:
                logger.info("Executing a new run.")
            # Log about the caching status
            if deployment_model.pipeline_configuration.enable_cache is False:
                logger.info(
                    f"Caching is disabled by default for "
                    f"`{deployment_model.pipeline_configuration.name}`."
                )
            # Log about the used builds
            if deployment_model.build:
                logger.info("Using a build:")
                logger.info(
                    " Image(s): "
                    f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
                )
                # Log about version mismatches between local and build
                from zenml import __version__
                if deployment_model.build.zenml_version != __version__:
                    logger.info(
                        f"ZenML version (different than the local version): "
                        f"{deployment_model.build.zenml_version}"
                    )
                import platform
                if (
                    deployment_model.build.python_version
                    != platform.python_version()
                ):
                    logger.info(
                        f"Python version (different than the local version): "
                        f"{deployment_model.build.python_version}"
                    )
            # Log about the user, stack and components
            if deployment_model.user is not None:
                logger.info(f"Using user: `{deployment_model.user.name}`")
            if deployment_model.stack is not None:
                logger.info(f"Using stack: `{deployment_model.stack.name}`")
                for (
                    component_type,
                    component_models,
                ) in deployment_model.stack.components.items():
                    logger.info(
                        f"  {component_type.value}: `{component_models[0].name}`"
                    )
        except Exception as e:
            logger.debug(f"Logging pipeline deployment metadata failed: {e}")
    def get_runs(self, **kwargs: Any) -> List["PipelineRunResponse"]:
        """(Deprecated) Get runs of this pipeline.
        Args:
            **kwargs: Further arguments for filtering or pagination that are
                passed to `client.list_pipeline_runs()`.
        Returns:
            List of runs of this pipeline.
        """
        logger.warning(
            "`Pipeline.get_runs()` is deprecated and will be removed in a "
            "future version. Please use `Pipeline.model.get_runs()` instead."
        )
        return self.model.get_runs(**kwargs)
    def write_run_configuration_template(
        self, path: str, stack: Optional["Stack"] = None
    ) -> None:
        """Writes a run configuration yaml template.
        Args:
            path: The path where the template will be written.
            stack: The stack for which the template should be generated. If
                not given, the active stack will be used.
        """
        from zenml.config.base_settings import ConfigurationLevel
        from zenml.config.step_configurations import (
            PartialArtifactConfiguration,
        )
        self._prepare_if_possible()
        stack = stack or Client().active_stack
        setting_classes = stack.setting_classes
        setting_classes.update(settings_utils.get_general_settings())
        pipeline_settings = {}
        step_settings = {}
        for key, setting_class in setting_classes.items():
            fields = pydantic_utils.TemplateGenerator(setting_class).run()
            if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
                pipeline_settings[key] = fields
            if ConfigurationLevel.STEP in setting_class.LEVEL:
                step_settings[key] = fields
        steps = {}
        for step_name, invocation in self.invocations.items():
            step = invocation.step
            parameters = (
                pydantic_utils.TemplateGenerator(
                    step.entrypoint_definition.legacy_params.annotation
                ).run()
                if step.entrypoint_definition.legacy_params
                else {}
            )
            outputs = {
                name: PartialArtifactConfiguration()
                for name in step.entrypoint_definition.outputs
            }
            step_template = StepConfigurationUpdate(
                parameters=parameters,
                settings=step_settings,
                outputs=outputs,
            )
            steps[step_name] = step_template
        run_config = PipelineRunConfiguration(
            settings=pipeline_settings, steps=steps
        )
        template = pydantic_utils.TemplateGenerator(run_config).run()
        yaml_string = yaml.dump(template)
        yaml_string = yaml_utils.comment_out_yaml(yaml_string)
        with open(path, "w") as f:
            f.write(yaml_string)
    def _apply_configuration(
        self,
        config: PipelineConfigurationUpdate,
        merge: bool = True,
    ) -> None:
        """Applies an update to the pipeline configuration.
        Args:
            config: The configuration update.
            merge: Whether to merge the updates with the existing configuration
                or not. See the `BasePipeline.configure(...)` method for a
                detailed explanation.
        """
        self._validate_configuration(config)
        self._configuration = pydantic_utils.update_model(
            self._configuration, update=config, recursive=merge
        )
        logger.debug("Updated pipeline configuration:")
        logger.debug(self._configuration)
    @staticmethod
    def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
        """Validates a configuration update.
        Args:
            config: The configuration update to validate.
        """
        settings_utils.validate_setting_keys(list(config.settings))
    def _get_pipeline_analytics_metadata(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        run_id: Optional[UUID] = None,
    ) -> Dict[str, Any]:
        """Returns the pipeline deployment metadata.
        Args:
            deployment: The pipeline deployment to track.
            stack: The stack on which the pipeline will be deployed.
            run_id: The ID of the pipeline run.
        Returns:
            the metadata about the pipeline deployment
        """
        custom_materializer = False
        for step in deployment.step_configurations.values():
            for output in step.config.outputs.values():
                for source in output.materializer_source:
                    if not source.is_internal:
                        custom_materializer = True
        stack_creator = Client().get_stack(stack.id).user
        active_user = Client().active_user
        own_stack = stack_creator and stack_creator.id == active_user.id
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        return {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "total_steps": len(self.invocations),
            "schedule": bool(deployment.schedule),
            "custom_materializer": custom_materializer,
            "own_stack": own_stack,
            "pipeline_run_id": str(run_id) if run_id else None,
        }
    def _compile(
        self, config_path: Optional[str] = None, **run_configuration_args: Any
    ) -> Tuple[
        "PipelineDeploymentBase",
        Optional["Schedule"],
        Union["PipelineBuildBase", UUID, None],
    ]:
        """Compiles the pipeline.
        Args:
            config_path: Path to a config file.
            **run_configuration_args: Configurations for the pipeline run.
        Returns:
            A tuple containing the deployment, schedule and build of
            the compiled pipeline.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry
        integration_registry.activate_integrations()
        _from_config_file = self._parse_config_file(
            config_path=config_path,
            matcher=list(PipelineRunConfiguration.model_fields.keys()),
        )
        self._reconfigure_from_file_with_overrides(config_path=config_path)
        run_config = PipelineRunConfiguration(**_from_config_file)
        new_values = dict_utils.remove_none_values(run_configuration_args)
        update = PipelineRunConfiguration.model_validate(new_values)
        # Update with the values in code so they take precedence
        run_config = pydantic_utils.update_model(run_config, update=update)
        deployment = Compiler().compile(
            pipeline=self,
            stack=Client().active_stack,
            run_configuration=run_config,
        )
        return deployment, run_config.schedule, run_config.build
    def _register(self) -> "PipelineResponse":
        """Register the pipeline in the server.
        Returns:
            The registered pipeline model.
        """
        client = Client()
        def _get() -> PipelineResponse:
            matching_pipelines = client.list_pipelines(
                name=self.name,
                size=1,
                sort_by="desc:created",
            )
            if matching_pipelines.total:
                registered_pipeline = matching_pipelines.items[0]
                return registered_pipeline
            raise RuntimeError("No matching pipelines found.")
        try:
            return _get()
        except RuntimeError:
            request = PipelineRequest(
                workspace=client.active_workspace.id,
                user=client.active_user.id,
                name=self.name,
            )
            try:
                registered_pipeline = client.zen_store.create_pipeline(
                    pipeline=request
                )
                logger.info(
                    "Registered new pipeline: `%s`.",
                    registered_pipeline.name,
                )
                return registered_pipeline
            except EntityExistsError:
                return _get()
    def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
        """Computes a unique identifier from the pipeline spec and steps.
        Args:
            pipeline_spec: Compiled spec of the pipeline.
        Returns:
            The unique identifier of the pipeline.
        """
        from packaging import version
        hash_ = hashlib.md5()  # nosec
        hash_.update(pipeline_spec.json_with_string_sources.encode())
        if version.parse(pipeline_spec.version) >= version.parse("0.4"):
            # Only add this for newer versions to keep backwards compatibility
            hash_.update(self.source_code.encode())
        for step_spec in pipeline_spec.steps:
            invocation = self.invocations[step_spec.pipeline_parameter_name]
            step_source = invocation.step.source_code
            hash_.update(step_source.encode())
        return hash_.hexdigest()
    def add_step_invocation(
        self,
        step: "BaseStep",
        input_artifacts: Dict[str, StepArtifact],
        external_artifacts: Dict[str, "ExternalArtifact"],
        model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
        client_lazy_loaders: Dict[str, "ClientLazyLoader"],
        parameters: Dict[str, Any],
        default_parameters: Dict[str, Any],
        upstream_steps: Set[str],
        custom_id: Optional[str] = None,
        allow_id_suffix: bool = True,
    ) -> str:
        """Adds a step invocation to the pipeline.
        Args:
            step: The step for which to add an invocation.
            input_artifacts: The input artifacts for the invocation.
            external_artifacts: The external artifacts for the invocation.
            model_artifacts_or_metadata: The model artifacts or metadata for
                the invocation.
            client_lazy_loaders: The client lazy loaders for the invocation.
            parameters: The parameters for the invocation.
            default_parameters: The default parameters for the invocation.
            upstream_steps: The upstream steps for the invocation.
            custom_id: Custom ID to use for the invocation.
            allow_id_suffix: Whether a suffix can be appended to the invocation
                ID.
        Raises:
            RuntimeError: If the method is called on an inactive pipeline.
            RuntimeError: If the invocation was called with an artifact from
                a different pipeline.
        Returns:
            The step invocation ID.
        """
        if Pipeline.ACTIVE_PIPELINE != self:
            raise RuntimeError(
                "A step invocation can only be added to an active pipeline."
            )
        for artifact in input_artifacts.values():
            if artifact.pipeline is not self:
                raise RuntimeError(
                    "Got invalid input artifact for invocation of step "
                    f"{step.name}: The input artifact was produced by a step "
                    f"inside a different pipeline {artifact.pipeline.name}."
                )
        invocation_id = self._compute_invocation_id(
            step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
        )
        invocation = StepInvocation(
            id=invocation_id,
            step=step,
            input_artifacts=input_artifacts,
            external_artifacts=external_artifacts,
            model_artifacts_or_metadata=model_artifacts_or_metadata,
            client_lazy_loaders=client_lazy_loaders,
            parameters=parameters,
            default_parameters=default_parameters,
            upstream_steps=upstream_steps,
            pipeline=self,
        )
        self._invocations[invocation_id] = invocation
        return invocation_id
    def _compute_invocation_id(
        self,
        step: "BaseStep",
        custom_id: Optional[str] = None,
        allow_suffix: bool = True,
    ) -> str:
        """Compute the invocation ID.
        Args:
            step: The step for which to compute the ID.
            custom_id: Custom ID to use for the invocation.
            allow_suffix: Whether a suffix can be appended to the invocation
                ID.
        Raises:
            RuntimeError: If no ID suffix is allowed and an invocation for the
                same ID already exists.
            RuntimeError: If no unique invocation ID can be found.
        Returns:
            The invocation ID.
        """
        base_id = id_ = custom_id or step.name
        if id_ not in self.invocations:
            return id_
        if not allow_suffix:
            raise RuntimeError("Duplicate step ID")
        for index in range(2, 10000):
            id_ = f"{base_id}_{index}"
            if id_ not in self.invocations:
                return id_
        raise RuntimeError("Unable to find step ID")
    def __enter__(self: T) -> T:
        """Activate the pipeline context.
        Raises:
            RuntimeError: If a different pipeline is already active.
        Returns:
            The pipeline instance.
        """
        if Pipeline.ACTIVE_PIPELINE:
            raise RuntimeError(
                "Unable to enter pipeline context. A different pipeline "
                f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
            )
        Pipeline.ACTIVE_PIPELINE = self
        return self
    def __exit__(self, *args: Any) -> None:
        """Deactivates the pipeline context.
        Args:
            *args: The arguments passed to the context exit handler.
        """
        Pipeline.ACTIVE_PIPELINE = None
    def _parse_config_file(
        self, config_path: Optional[str], matcher: List[str]
    ) -> Dict[str, Any]:
        """Parses the given configuration file and sets `self._from_config_file`.
        Args:
            config_path: Path to a yaml configuration file.
            matcher: List of keys to match in the configuration file.
        Returns:
            Parsed config file according to matcher settings.
        """
        _from_config_file: Dict[str, Any] = {}
        if config_path:
            with open(config_path, "r") as f:
                _from_config_file = yaml.load(f, Loader=yaml.SafeLoader)
            _from_config_file = dict_utils.remove_none_values(
                {k: v for k, v in _from_config_file.items() if k in matcher}
            )
            # TODO: deprecate me
            if "model_version" in _from_config_file:
                logger.warning(
                    "YAML config option `model_version` is deprecated. Please use `model`."
                )
                _from_config_file["model"] = _from_config_file["model_version"]
                del _from_config_file["model_version"]
            if "model" in _from_config_file:
                if "model" in self._from_config_file:
                    _from_config_file["model"] = self._from_config_file[
                        "model"
                    ]
                else:
                    from zenml.model.model import Model
                    _from_config_file["model"] = Model.model_validate(
                        _from_config_file["model"]
                    )
        return _from_config_file
    def with_options(
        self,
        run_name: Optional[str] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBase", None] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
        **kwargs: Any,
    ) -> "Pipeline":
        """Copies the pipeline and applies the given configurations.
        Args:
            run_name: Name of the pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            step_configurations: Configurations for steps of the pipeline.
            steps: Configurations for steps of the pipeline. This is equivalent
                to `step_configurations`, and will be ignored if
                `step_configurations` is set as well.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: DEPRECATED: Use
                `DockerSettings.prevent_build_reuse` instead.
            **kwargs: Pipeline configuration options. These will be passed
                to the `pipeline.configure(...)` method.
        Returns:
            The copied pipeline instance.
        """
        if steps and step_configurations:
            logger.warning(
                "Step configurations were passed using both the "
                "`step_configurations` and `steps` keywords, ignoring the "
                "values passed using the `steps` keyword."
            )
        pipeline_copy = self.copy()
        pipeline_copy._reconfigure_from_file_with_overrides(
            config_path=config_path, **kwargs
        )
        run_args = dict_utils.remove_none_values(
            {
                "run_name": run_name,
                "schedule": schedule,
                "build": build,
                "step_configurations": step_configurations or steps,
                "config_path": config_path,
                "unlisted": unlisted,
                "prevent_build_reuse": prevent_build_reuse,
            }
        )
        pipeline_copy._run_args.update(run_args)
        return pipeline_copy
    def copy(self) -> "Pipeline":
        """Copies the pipeline.
        Returns:
            The pipeline copy.
        """
        return copy.deepcopy(self)
    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Handle a call of the pipeline.
        This method does one of two things:
        * If there is an active pipeline context, it calls the pipeline
          entrypoint function within that context and the step invocations
          will be added to the active pipeline.
        * If no pipeline is active, it activates this pipeline before calling
          the entrypoint function.
        Args:
            *args: Entrypoint function arguments.
            **kwargs: Entrypoint function keyword arguments.
        Returns:
            The outputs of the entrypoint function call.
        """
        if Pipeline.ACTIVE_PIPELINE:
            # Calling a pipeline inside a pipeline, we return the potential
            # outputs of the entrypoint function
            # TODO: This currently ignores the configuration of the pipeline
            #   and instead applies the configuration of the previously active
            #   pipeline. Is this what we want?
            return self.entrypoint(*args, **kwargs)
        self.prepare(*args, **kwargs)
        return self._run(**self._run_args)
    def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
        """Calls the pipeline entrypoint function with the given arguments.
        Args:
            *args: Entrypoint function arguments.
            **kwargs: Entrypoint function keyword arguments.
        Raises:
            ValueError: If an input argument is missing or not JSON
                serializable.
        """
        try:
            validated_args = pydantic_utils.validate_function_args(
                self.entrypoint,
                ConfigDict(arbitrary_types_allowed=False),
                *args,
                **kwargs,
            )
        except ValidationError as e:
            raise ValueError(
                "Invalid or missing pipeline function entrypoint arguments. "
                "Only JSON serializable inputs are allowed as pipeline inputs."
                "Check out the pydantic error above for more details."
            ) from e
        self._parameters = validated_args
        self.entrypoint(**validated_args)
    def _prepare_if_possible(self) -> None:
        """Prepares the pipeline if possible.
        Raises:
            RuntimeError: If the pipeline is not prepared and the preparation
                requires parameters.
        """
        if not self.is_prepared:
            if self.requires_parameters:
                raise RuntimeError(
                    f"Failed while trying to prepare pipeline {self.name}. "
                    "The entrypoint function of the pipeline requires "
                    "arguments. Please prepare the pipeline by calling "
                    "`pipeline_instance.prepare(...)` and try again."
                )
            else:
                self.prepare()
    def _reconfigure_from_file_with_overrides(
        self,
        config_path: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """Update the pipeline configuration from config file.
        Accepts overrides as kwargs.
        Args:
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            **kwargs: Pipeline configuration options. These will be passed
                to the `pipeline.configure(...)` method.
        """
        self._from_config_file = {}
        if config_path:
            self._from_config_file = self._parse_config_file(
                config_path=config_path,
                matcher=inspect.getfullargspec(self.configure)[0]
                + [
                    "model_version"
                ],  # TODO: deprecate `model_version` later on
            )
        _from_config_file = dict_utils.recursive_update(
            self._from_config_file, kwargs
        )
        with self.__suppress_configure_warnings__():
            self.configure(**_from_config_file)
configuration: PipelineConfiguration
  
      property
      readonly
  
    The configuration of the pipeline.
Returns:
| Type | Description | 
|---|---|
| PipelineConfiguration | The configuration of the pipeline. | 
enable_cache: Optional[bool]
  
      property
      readonly
  
    If caching is enabled for the pipeline.
Returns:
| Type | Description | 
|---|---|
| Optional[bool] | If caching is enabled for the pipeline. | 
invocations: Dict[str, zenml.steps.step_invocation.StepInvocation]
  
      property
      readonly
  
    Returns the step invocations of this pipeline.
This dictionary will only be populated once the pipeline has been called.
Returns:
| Type | Description | 
|---|---|
| Dict[str, zenml.steps.step_invocation.StepInvocation] | The step invocations. | 
is_prepared: bool
  
      property
      readonly
  
    If the pipeline is prepared.
Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.
Returns:
| Type | Description | 
|---|---|
| bool | If the pipeline is prepared. | 
model: PipelineResponse
  
      property
      readonly
  
    Gets the registered pipeline model for this instance.
Returns:
| Type | Description | 
|---|---|
| PipelineResponse | The registered pipeline model. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the pipeline has not been registered yet. | 
name: str
  
      property
      readonly
  
    The name of the pipeline.
Returns:
| Type | Description | 
|---|---|
| str | The name of the pipeline. | 
requires_parameters: bool
  
      property
      readonly
  
    If the pipeline entrypoint requires parameters.
Returns:
| Type | Description | 
|---|---|
| bool | If the pipeline entrypoint requires parameters. | 
source_code: str
  
      property
      readonly
  
    The source code of this pipeline.
Returns:
| Type | Description | 
|---|---|
| str | The source code of this pipeline. | 
source_object: Any
  
      property
      readonly
  
    The source object of this pipeline.
Returns:
| Type | Description | 
|---|---|
| Any | The source object of this pipeline. | 
__call__(self, *args, **kwargs)
  
      special
  
    Handle a call of the pipeline.
This method does one of two things: * If there is an active pipeline context, it calls the pipeline entrypoint function within that context and the step invocations will be added to the active pipeline. * If no pipeline is active, it activates this pipeline before calling the entrypoint function.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Entrypoint function arguments. | () | 
| **kwargs | Any | Entrypoint function keyword arguments. | {} | 
Returns:
| Type | Description | 
|---|---|
| Any | The outputs of the entrypoint function call. | 
Source code in zenml/new/pipelines/pipeline.py
          def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Handle a call of the pipeline.
    This method does one of two things:
    * If there is an active pipeline context, it calls the pipeline
      entrypoint function within that context and the step invocations
      will be added to the active pipeline.
    * If no pipeline is active, it activates this pipeline before calling
      the entrypoint function.
    Args:
        *args: Entrypoint function arguments.
        **kwargs: Entrypoint function keyword arguments.
    Returns:
        The outputs of the entrypoint function call.
    """
    if Pipeline.ACTIVE_PIPELINE:
        # Calling a pipeline inside a pipeline, we return the potential
        # outputs of the entrypoint function
        # TODO: This currently ignores the configuration of the pipeline
        #   and instead applies the configuration of the previously active
        #   pipeline. Is this what we want?
        return self.entrypoint(*args, **kwargs)
    self.prepare(*args, **kwargs)
    return self._run(**self._run_args)
__enter__(self)
  
      special
  
    Activate the pipeline context.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If a different pipeline is already active. | 
Returns:
| Type | Description | 
|---|---|
| ~T | The pipeline instance. | 
Source code in zenml/new/pipelines/pipeline.py
          def __enter__(self: T) -> T:
    """Activate the pipeline context.
    Raises:
        RuntimeError: If a different pipeline is already active.
    Returns:
        The pipeline instance.
    """
    if Pipeline.ACTIVE_PIPELINE:
        raise RuntimeError(
            "Unable to enter pipeline context. A different pipeline "
            f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
        )
    Pipeline.ACTIVE_PIPELINE = self
    return self
__exit__(self, *args)
  
      special
  
    Deactivates the pipeline context.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | The arguments passed to the context exit handler. | () | 
Source code in zenml/new/pipelines/pipeline.py
          def __exit__(self, *args: Any) -> None:
    """Deactivates the pipeline context.
    Args:
        *args: The arguments passed to the context exit handler.
    """
    Pipeline.ACTIVE_PIPELINE = None
__init__(self, name, entrypoint, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None)
  
      special
  
    Initializes a pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| name | str | The name of the pipeline. | required | 
| entrypoint | ~F | The entrypoint function of the pipeline. | required | 
| enable_cache | Optional[bool] | If caching should be enabled for this pipeline. | None | 
| enable_artifact_metadata | Optional[bool] | If artifact metadata should be enabled for this pipeline. | None | 
| enable_artifact_visualization | Optional[bool] | If artifact visualization should be enabled for this pipeline. | None | 
| enable_step_logs | Optional[bool] | If step logs should be enabled for this pipeline. | None | 
| settings | Optional[Mapping[str, SettingsOrDict]] | settings for this pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline. | None | 
| on_failure | Optional[HookSpecification] | Callback function in event of failure of the step. Can
be a function with a single argument of type  | None | 
| on_success | Optional[HookSpecification] | Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g.  | None | 
| model | Optional[Model] | configuration of the model in the Model Control Plane. | None | 
Source code in zenml/new/pipelines/pipeline.py
          def __init__(
    self,
    name: str,
    entrypoint: F,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
) -> None:
    """Initializes a pipeline.
    Args:
        name: The name of the pipeline.
        entrypoint: The entrypoint function of the pipeline.
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
    """
    self._invocations: Dict[str, StepInvocation] = {}
    self._run_args: Dict[str, Any] = {}
    self._configuration = PipelineConfiguration(
        name=name,
    )
    self._from_config_file: Dict[str, Any] = {}
    with self.__suppress_configure_warnings__():
        self.configure(
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model,
        )
    self.entrypoint = entrypoint
    self._parameters: Dict[str, Any] = {}
    self.__suppress_warnings_flag__ = False
__suppress_configure_warnings__(self)
  
      special
  
    Context manager to suppress warnings in Pipeline.configure(...).
Used to suppress warnings when called from inner code and not user-facing code.
Yields:
| Type | Description | 
|---|---|
| Iterator[Any] | Nothing. | 
Source code in zenml/new/pipelines/pipeline.py
          @contextmanager
def __suppress_configure_warnings__(self) -> Iterator[Any]:
    """Context manager to suppress warnings in `Pipeline.configure(...)`.
    Used to suppress warnings when called from inner code and not user-facing code.
    Yields:
        Nothing.
    """
    self.__suppress_warnings_flag__ = True
    yield
    self.__suppress_warnings_flag__ = False
add_step_invocation(self, step, input_artifacts, external_artifacts, model_artifacts_or_metadata, client_lazy_loaders, parameters, default_parameters, upstream_steps, custom_id=None, allow_id_suffix=True)
    Adds a step invocation to the pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| step | BaseStep | The step for which to add an invocation. | required | 
| input_artifacts | Dict[str, zenml.steps.entrypoint_function_utils.StepArtifact] | The input artifacts for the invocation. | required | 
| external_artifacts | Dict[str, ExternalArtifact] | The external artifacts for the invocation. | required | 
| model_artifacts_or_metadata | Dict[str, ModelVersionDataLazyLoader] | The model artifacts or metadata for the invocation. | required | 
| client_lazy_loaders | Dict[str, ClientLazyLoader] | The client lazy loaders for the invocation. | required | 
| parameters | Dict[str, Any] | The parameters for the invocation. | required | 
| default_parameters | Dict[str, Any] | The default parameters for the invocation. | required | 
| upstream_steps | Set[str] | The upstream steps for the invocation. | required | 
| custom_id | Optional[str] | Custom ID to use for the invocation. | None | 
| allow_id_suffix | bool | Whether a suffix can be appended to the invocation ID. | True | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the method is called on an inactive pipeline. | 
| RuntimeError | If the invocation was called with an artifact from a different pipeline. | 
Returns:
| Type | Description | 
|---|---|
| str | The step invocation ID. | 
Source code in zenml/new/pipelines/pipeline.py
          def add_step_invocation(
    self,
    step: "BaseStep",
    input_artifacts: Dict[str, StepArtifact],
    external_artifacts: Dict[str, "ExternalArtifact"],
    model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
    client_lazy_loaders: Dict[str, "ClientLazyLoader"],
    parameters: Dict[str, Any],
    default_parameters: Dict[str, Any],
    upstream_steps: Set[str],
    custom_id: Optional[str] = None,
    allow_id_suffix: bool = True,
) -> str:
    """Adds a step invocation to the pipeline.
    Args:
        step: The step for which to add an invocation.
        input_artifacts: The input artifacts for the invocation.
        external_artifacts: The external artifacts for the invocation.
        model_artifacts_or_metadata: The model artifacts or metadata for
            the invocation.
        client_lazy_loaders: The client lazy loaders for the invocation.
        parameters: The parameters for the invocation.
        default_parameters: The default parameters for the invocation.
        upstream_steps: The upstream steps for the invocation.
        custom_id: Custom ID to use for the invocation.
        allow_id_suffix: Whether a suffix can be appended to the invocation
            ID.
    Raises:
        RuntimeError: If the method is called on an inactive pipeline.
        RuntimeError: If the invocation was called with an artifact from
            a different pipeline.
    Returns:
        The step invocation ID.
    """
    if Pipeline.ACTIVE_PIPELINE != self:
        raise RuntimeError(
            "A step invocation can only be added to an active pipeline."
        )
    for artifact in input_artifacts.values():
        if artifact.pipeline is not self:
            raise RuntimeError(
                "Got invalid input artifact for invocation of step "
                f"{step.name}: The input artifact was produced by a step "
                f"inside a different pipeline {artifact.pipeline.name}."
            )
    invocation_id = self._compute_invocation_id(
        step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
    )
    invocation = StepInvocation(
        id=invocation_id,
        step=step,
        input_artifacts=input_artifacts,
        external_artifacts=external_artifacts,
        model_artifacts_or_metadata=model_artifacts_or_metadata,
        client_lazy_loaders=client_lazy_loaders,
        parameters=parameters,
        default_parameters=default_parameters,
        upstream_steps=upstream_steps,
        pipeline=self,
    )
    self._invocations[invocation_id] = invocation
    return invocation_id
build(self, settings=None, step_configurations=None, config_path=None)
    Builds Docker images for the pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| settings | Optional[Mapping[str, SettingsOrDict]] | Settings for the pipeline. | None | 
| step_configurations | Optional[Mapping[str, StepConfigurationUpdateOrDict]] | Configurations for steps of the pipeline. | None | 
| config_path | Optional[str] | Path to a yaml configuration file. This file will
be parsed as a
 | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineBuildResponse] | The build output. | 
Source code in zenml/new/pipelines/pipeline.py
          def build(
    self,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
    """Builds Docker images for the pipeline.
    Args:
        settings: Settings for the pipeline.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
    Returns:
        The build output.
    """
    with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
        self._prepare_if_possible()
        deployment, _, _ = self._compile(
            config_path=config_path,
            steps=step_configurations,
            settings=settings,
        )
        pipeline_id = self._register().id
        local_repo = code_repository_utils.find_active_code_repository()
        code_repository = build_utils.verify_local_repository_context(
            deployment=deployment, local_repo_context=local_repo
        )
        return build_utils.create_pipeline_build(
            deployment=deployment,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )
configure(self, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None, parameters=None, merge=True)
    Configures the pipeline.
Configuration merging example:
* merge==True:
    pipeline.configure(extra={"key1": 1})
    pipeline.configure(extra={"key2": 2}, merge=True)
    pipeline.configuration.extra # {"key1": 1, "key2": 2}
* merge==False:
    pipeline.configure(extra={"key1": 1})
    pipeline.configure(extra={"key2": 2}, merge=False)
    pipeline.configuration.extra # {"key2": 2}
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| enable_cache | Optional[bool] | If caching should be enabled for this pipeline. | None | 
| enable_artifact_metadata | Optional[bool] | If artifact metadata should be enabled for this pipeline. | None | 
| enable_artifact_visualization | Optional[bool] | If artifact visualization should be enabled for this pipeline. | None | 
| enable_step_logs | Optional[bool] | If step logs should be enabled for this pipeline. | None | 
| settings | Optional[Mapping[str, SettingsOrDict]] | settings for this pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline. | None | 
| on_failure | Optional[HookSpecification] | Callback function in event of failure of the step. Can
be a function with a single argument of type  | None | 
| on_success | Optional[HookSpecification] | Callback function in event of success of the step. Can
be a function with no arguments, or a source path to such a
function (e.g.  | None | 
| merge | bool | If  | True | 
| model | Optional[Model] | configuration of the model version in the Model Control Plane. | None | 
| parameters | Optional[Dict[str, Any]] | input parameters for the pipeline. | None | 
Returns:
| Type | Description | 
|---|---|
| ~T | The pipeline instance that this method was called on. | 
Source code in zenml/new/pipelines/pipeline.py
          def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    parameters: Optional[Dict[str, Any]] = None,
    merge: bool = True,
) -> T:
    """Configures the pipeline.
    Configuration merging example:
    * `merge==True`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=True)
        pipeline.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=False)
        pipeline.configuration.extra # {"key2": 2}
    Args:
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        merge: If `True`, will merge the given dictionary configurations
            like `extra` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.
        model: configuration of the model version in the Model Control Plane.
        parameters: input parameters for the pipeline.
    Returns:
        The pipeline instance that this method was called on.
    """
    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this pipeline
        failure_hook_source = resolve_and_validate_hook(on_failure)
    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this pipeline
        success_hook_source = resolve_and_validate_hook(on_success)
    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "enable_step_logs": enable_step_logs,
            "settings": settings,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
            "model": model,
            "parameters": parameters,
        }
    )
    if not self.__suppress_warnings_flag__:
        to_be_reapplied = []
        for param_, value_ in values.items():
            if (
                param_ in PipelineRunConfiguration.model_fields
                and param_ in self._from_config_file
                and value_ != self._from_config_file[param_]
            ):
                to_be_reapplied.append(
                    (param_, self._from_config_file[param_], value_)
                )
        if to_be_reapplied:
            msg = ""
            reapply_during_run_warning = (
                "The value of parameter '{name}' has changed from "
                "'{file_value}' to '{new_value}' set in your configuration "
                "file.\n"
            )
            for name, file_value, new_value in to_be_reapplied:
                msg += reapply_during_run_warning.format(
                    name=name, file_value=file_value, new_value=new_value
                )
            msg += (
                "Configuration file value will be used during pipeline "
                "run, so you change will not be efficient. Consider "
                "updating your configuration file instead."
            )
            logger.warning(msg)
    config = PipelineConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
copy(self)
    Copies the pipeline.
Returns:
| Type | Description | 
|---|---|
| Pipeline | The pipeline copy. | 
Source code in zenml/new/pipelines/pipeline.py
          def copy(self) -> "Pipeline":
    """Copies the pipeline.
    Returns:
        The pipeline copy.
    """
    return copy.deepcopy(self)
get_runs(self, **kwargs)
    (Deprecated) Get runs of this pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| **kwargs | Any | Further arguments for filtering or pagination that are
passed to  | {} | 
Returns:
| Type | Description | 
|---|---|
| List[PipelineRunResponse] | List of runs of this pipeline. | 
Source code in zenml/new/pipelines/pipeline.py
          def get_runs(self, **kwargs: Any) -> List["PipelineRunResponse"]:
    """(Deprecated) Get runs of this pipeline.
    Args:
        **kwargs: Further arguments for filtering or pagination that are
            passed to `client.list_pipeline_runs()`.
    Returns:
        List of runs of this pipeline.
    """
    logger.warning(
        "`Pipeline.get_runs()` is deprecated and will be removed in a "
        "future version. Please use `Pipeline.model.get_runs()` instead."
    )
    return self.model.get_runs(**kwargs)
log_pipeline_deployment_metadata(deployment_model)
  
      staticmethod
  
    Displays logs based on the deployment model upon running a pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment_model | PipelineDeploymentResponse | The model for the pipeline deployment | required | 
Source code in zenml/new/pipelines/pipeline.py
          @staticmethod
def log_pipeline_deployment_metadata(
    deployment_model: PipelineDeploymentResponse,
) -> None:
    """Displays logs based on the deployment model upon running a pipeline.
    Args:
        deployment_model: The model for the pipeline deployment
    """
    try:
        # Log about the schedule/run
        if deployment_model.schedule:
            logger.info(
                "Scheduling a run with the schedule: "
                f"`{deployment_model.schedule.name}`"
            )
        else:
            logger.info("Executing a new run.")
        # Log about the caching status
        if deployment_model.pipeline_configuration.enable_cache is False:
            logger.info(
                f"Caching is disabled by default for "
                f"`{deployment_model.pipeline_configuration.name}`."
            )
        # Log about the used builds
        if deployment_model.build:
            logger.info("Using a build:")
            logger.info(
                " Image(s): "
                f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
            )
            # Log about version mismatches between local and build
            from zenml import __version__
            if deployment_model.build.zenml_version != __version__:
                logger.info(
                    f"ZenML version (different than the local version): "
                    f"{deployment_model.build.zenml_version}"
                )
            import platform
            if (
                deployment_model.build.python_version
                != platform.python_version()
            ):
                logger.info(
                    f"Python version (different than the local version): "
                    f"{deployment_model.build.python_version}"
                )
        # Log about the user, stack and components
        if deployment_model.user is not None:
            logger.info(f"Using user: `{deployment_model.user.name}`")
        if deployment_model.stack is not None:
            logger.info(f"Using stack: `{deployment_model.stack.name}`")
            for (
                component_type,
                component_models,
            ) in deployment_model.stack.components.items():
                logger.info(
                    f"  {component_type.value}: `{component_models[0].name}`"
                )
    except Exception as e:
        logger.debug(f"Logging pipeline deployment metadata failed: {e}")
prepare(self, *args, **kwargs)
    Prepares the pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Pipeline entrypoint input arguments. | () | 
| **kwargs | Any | Pipeline entrypoint input keyword arguments. | {} | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the pipeline has parameters configured differently in configuration file and code. | 
Source code in zenml/new/pipelines/pipeline.py
              def prepare(self, *args: Any, **kwargs: Any) -> None:
        """Prepares the pipeline.
        Args:
            *args: Pipeline entrypoint input arguments.
            **kwargs: Pipeline entrypoint input keyword arguments.
        Raises:
            RuntimeError: If the pipeline has parameters configured differently in
                configuration file and code.
        """
        # Clear existing parameters and invocations
        self._parameters = {}
        self._invocations = {}
        conflicting_parameters = {}
        parameters_ = (self.configuration.parameters or {}).copy()
        if from_file_ := self._from_config_file.get("parameters", None):
            parameters_ = dict_utils.recursive_update(parameters_, from_file_)
        if parameters_:
            for k, v_runtime in kwargs.items():
                if k in parameters_:
                    v_config = parameters_[k]
                    if v_config != v_runtime:
                        conflicting_parameters[k] = (v_config, v_runtime)
            if conflicting_parameters:
                is_plural = "s" if len(conflicting_parameters) > 1 else ""
                msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
                for key, values in conflicting_parameters.items():
                    msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
                msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
    parameters:
        param_name: value1
# pipeline.py
@pipeline
def pipeline_(param_name: str):
    step_name()
if __name__=="__main__":
    pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
                raise RuntimeError(msg)
            for k, v_config in parameters_.items():
                if k not in kwargs:
                    kwargs[k] = v_config
        with self:
            # Enter the context manager, so we become the active pipeline. This
            # means that all steps that get called while the entrypoint function
            # is executed will be added as invocation to this pipeline instance.
            self._call_entrypoint(*args, **kwargs)
register(self)
    Register the pipeline in the server.
Returns:
| Type | Description | 
|---|---|
| PipelineResponse | The registered pipeline model. | 
Source code in zenml/new/pipelines/pipeline.py
          def register(self) -> "PipelineResponse":
    """Register the pipeline in the server.
    Returns:
        The registered pipeline model.
    """
    # Activating the built-in integrations to load all materializers
    from zenml.integrations.registry import integration_registry
    self._prepare_if_possible()
    integration_registry.activate_integrations()
    if self.configuration.model_dump(
        exclude_defaults=True, exclude={"name"}
    ):
        logger.warning(
            f"The pipeline `{self.name}` that you're registering has "
            "custom configurations applied to it. These will not be "
            "registered with the pipeline and won't be set when you build "
            "images or run the pipeline from the CLI. To provide these "
            "configurations, use the `--config` option of the `zenml "
            "pipeline build/run` commands."
        )
    return self._register()
resolve(self)
    Resolves the pipeline.
Returns:
| Type | Description | 
|---|---|
| Source | The pipeline source. | 
Source code in zenml/new/pipelines/pipeline.py
          def resolve(self) -> "Source":
    """Resolves the pipeline.
    Returns:
        The pipeline source.
    """
    return source_utils.resolve(self.entrypoint, skip_validation=True)
with_options(self, run_name=None, schedule=None, build=None, step_configurations=None, steps=None, config_path=None, unlisted=False, prevent_build_reuse=False, **kwargs)
    Copies the pipeline and applies the given configurations.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_name | Optional[str] | Name of the pipeline run. | None | 
| schedule | Optional[zenml.config.schedule.Schedule] | Optional schedule to use for the run. | None | 
| build | Union[str, UUID, PipelineBuildBase] | Optional build to use for the run. | None | 
| step_configurations | Optional[Mapping[str, StepConfigurationUpdateOrDict]] | Configurations for steps of the pipeline. | None | 
| steps | Optional[Mapping[str, StepConfigurationUpdateOrDict]] | Configurations for steps of the pipeline. This is equivalent
to  | None | 
| config_path | Optional[str] | Path to a yaml configuration file. This file will
be parsed as a
 | None | 
| unlisted | bool | Whether the pipeline run should be unlisted (not assigned to any pipeline). | False | 
| prevent_build_reuse | bool | DEPRECATED: Use
 | False | 
| **kwargs | Any | Pipeline configuration options. These will be passed
to the  | {} | 
Returns:
| Type | Description | 
|---|---|
| Pipeline | The copied pipeline instance. | 
Source code in zenml/new/pipelines/pipeline.py
          def with_options(
    self,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBase", None] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
    prevent_build_reuse: bool = False,
    **kwargs: Any,
) -> "Pipeline":
    """Copies the pipeline and applies the given configurations.
    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        step_configurations: Configurations for steps of the pipeline.
        steps: Configurations for steps of the pipeline. This is equivalent
            to `step_configurations`, and will be ignored if
            `step_configurations` is set as well.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
        prevent_build_reuse: DEPRECATED: Use
            `DockerSettings.prevent_build_reuse` instead.
        **kwargs: Pipeline configuration options. These will be passed
            to the `pipeline.configure(...)` method.
    Returns:
        The copied pipeline instance.
    """
    if steps and step_configurations:
        logger.warning(
            "Step configurations were passed using both the "
            "`step_configurations` and `steps` keywords, ignoring the "
            "values passed using the `steps` keyword."
        )
    pipeline_copy = self.copy()
    pipeline_copy._reconfigure_from_file_with_overrides(
        config_path=config_path, **kwargs
    )
    run_args = dict_utils.remove_none_values(
        {
            "run_name": run_name,
            "schedule": schedule,
            "build": build,
            "step_configurations": step_configurations or steps,
            "config_path": config_path,
            "unlisted": unlisted,
            "prevent_build_reuse": prevent_build_reuse,
        }
    )
    pipeline_copy._run_args.update(run_args)
    return pipeline_copy
write_run_configuration_template(self, path, stack=None)
    Writes a run configuration yaml template.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | The path where the template will be written. | required | 
| stack | Optional[Stack] | The stack for which the template should be generated. If not given, the active stack will be used. | None | 
Source code in zenml/new/pipelines/pipeline.py
          def write_run_configuration_template(
    self, path: str, stack: Optional["Stack"] = None
) -> None:
    """Writes a run configuration yaml template.
    Args:
        path: The path where the template will be written.
        stack: The stack for which the template should be generated. If
            not given, the active stack will be used.
    """
    from zenml.config.base_settings import ConfigurationLevel
    from zenml.config.step_configurations import (
        PartialArtifactConfiguration,
    )
    self._prepare_if_possible()
    stack = stack or Client().active_stack
    setting_classes = stack.setting_classes
    setting_classes.update(settings_utils.get_general_settings())
    pipeline_settings = {}
    step_settings = {}
    for key, setting_class in setting_classes.items():
        fields = pydantic_utils.TemplateGenerator(setting_class).run()
        if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
            pipeline_settings[key] = fields
        if ConfigurationLevel.STEP in setting_class.LEVEL:
            step_settings[key] = fields
    steps = {}
    for step_name, invocation in self.invocations.items():
        step = invocation.step
        parameters = (
            pydantic_utils.TemplateGenerator(
                step.entrypoint_definition.legacy_params.annotation
            ).run()
            if step.entrypoint_definition.legacy_params
            else {}
        )
        outputs = {
            name: PartialArtifactConfiguration()
            for name in step.entrypoint_definition.outputs
        }
        step_template = StepConfigurationUpdate(
            parameters=parameters,
            settings=step_settings,
            outputs=outputs,
        )
        steps[step_name] = step_template
    run_config = PipelineRunConfiguration(
        settings=pipeline_settings, steps=steps
    )
    template = pydantic_utils.TemplateGenerator(run_config).run()
    yaml_string = yaml.dump(template)
    yaml_string = yaml_utils.comment_out_yaml(yaml_string)
    with open(path, "w") as f:
        f.write(yaml_string)
        pipeline_context
    Pipeline context class.
        
PipelineContext        
    Provides pipeline configuration context.
Usage example:
from zenml import get_pipeline_context
...
@pipeline(
    extra={
        "complex_parameter": [
            ("sklearn.tree", "DecisionTreeClassifier"),
            ("sklearn.ensemble", "RandomForestClassifier"),
        ]
    }
)
def my_pipeline():
    context = get_pipeline_context()
    after = []
    search_steps_prefix = "hp_tuning_search_"
    for i, model_search_configuration in enumerate(
        context.extra["complex_parameter"]
    ):
        step_name = f"{search_steps_prefix}{i}"
        cross_validation(
            model_package=model_search_configuration[0],
            model_class=model_search_configuration[1],
            id=step_name
        )
        after.append(step_name)
    select_best_model(
        search_steps_prefix=search_steps_prefix,
        after=after,
    )
Source code in zenml/new/pipelines/pipeline_context.py
          class PipelineContext:
    """Provides pipeline configuration context.
    Usage example:
    ```python
    from zenml import get_pipeline_context
    ...
    @pipeline(
        extra={
            "complex_parameter": [
                ("sklearn.tree", "DecisionTreeClassifier"),
                ("sklearn.ensemble", "RandomForestClassifier"),
            ]
        }
    )
    def my_pipeline():
        context = get_pipeline_context()
        after = []
        search_steps_prefix = "hp_tuning_search_"
        for i, model_search_configuration in enumerate(
            context.extra["complex_parameter"]
        ):
            step_name = f"{search_steps_prefix}{i}"
            cross_validation(
                model_package=model_search_configuration[0],
                model_class=model_search_configuration[1],
                id=step_name
            )
            after.append(step_name)
        select_best_model(
            search_steps_prefix=search_steps_prefix,
            after=after,
        )
    ```
    """
    def __init__(self, pipeline_configuration: "PipelineConfiguration"):
        """Initialize the context of the current pipeline.
        Args:
            pipeline_configuration: The configuration of the pipeline derived
                from Pipeline class.
        """
        self.name = pipeline_configuration.name
        self.enable_cache = pipeline_configuration.enable_cache
        self.enable_artifact_metadata = (
            pipeline_configuration.enable_artifact_metadata
        )
        self.enable_artifact_visualization = (
            pipeline_configuration.enable_artifact_visualization
        )
        self.enable_step_logs = pipeline_configuration.enable_step_logs
        self.settings = pipeline_configuration.settings
        self.extra = pipeline_configuration.extra
        self.model = pipeline_configuration.model
        self._model_version = pipeline_configuration.model
    # TODO: deprecate me
    @property
    def model_version(self) -> Optional["Model"]:
        """DEPRECATED, use `model` instead.
        Returns:
            The `Model` object associated with the current pipeline.
        """
        logger.warning(
            "Pipeline context `model_version` is deprecated. Please use `model` instead."
        )
        return self.model
model_version: Optional[Model]
  
      property
      readonly
  
    DEPRECATED, use model instead.
Returns:
| Type | Description | 
|---|---|
| Optional[Model] | The  | 
__init__(self, pipeline_configuration)
  
      special
  
    Initialize the context of the current pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pipeline_configuration | PipelineConfiguration | The configuration of the pipeline derived from Pipeline class. | required | 
Source code in zenml/new/pipelines/pipeline_context.py
          def __init__(self, pipeline_configuration: "PipelineConfiguration"):
    """Initialize the context of the current pipeline.
    Args:
        pipeline_configuration: The configuration of the pipeline derived
            from Pipeline class.
    """
    self.name = pipeline_configuration.name
    self.enable_cache = pipeline_configuration.enable_cache
    self.enable_artifact_metadata = (
        pipeline_configuration.enable_artifact_metadata
    )
    self.enable_artifact_visualization = (
        pipeline_configuration.enable_artifact_visualization
    )
    self.enable_step_logs = pipeline_configuration.enable_step_logs
    self.settings = pipeline_configuration.settings
    self.extra = pipeline_configuration.extra
    self.model = pipeline_configuration.model
    self._model_version = pipeline_configuration.model
get_pipeline_context()
    Get the context of the current pipeline.
Returns:
| Type | Description | 
|---|---|
| PipelineContext | The context of the current pipeline. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If no active pipeline is found. | 
| RuntimeError | If inside a running step. | 
Source code in zenml/new/pipelines/pipeline_context.py
          def get_pipeline_context() -> "PipelineContext":
    """Get the context of the current pipeline.
    Returns:
        The context of the current pipeline.
    Raises:
        RuntimeError: If no active pipeline is found.
        RuntimeError: If inside a running step.
    """
    from zenml.new.pipelines.pipeline import Pipeline
    if Pipeline.ACTIVE_PIPELINE is None:
        try:
            from zenml.new.steps.step_context import get_step_context
            get_step_context()
        except RuntimeError:
            raise RuntimeError("No active pipeline found.")
        else:
            raise RuntimeError(
                "Inside a step use `from zenml import get_step_context` "
                "instead."
            )
    return PipelineContext(
        pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
    )
        pipeline_decorator
    ZenML pipeline decorator definition.
pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None, model_version=None)
    Decorator to create a pipeline.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| _func | Optional[F] | The decorated function. | None | 
| name | Optional[str] | The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback. | None | 
| enable_cache | Optional[bool] | Whether to use caching or not. | None | 
| enable_artifact_metadata | Optional[bool] | Whether to enable artifact metadata or not. | None | 
| enable_step_logs | Optional[bool] | If step logs should be enabled for this pipeline. | None | 
| settings | Optional[Dict[str, SettingsOrDict]] | Settings for this pipeline. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this pipeline. | None | 
| on_failure | Optional[HookSpecification] | Callback function in event of failure of the step. Can be a
function with a single argument of type  | None | 
| on_success | Optional[HookSpecification] | Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g.  | None | 
| model | Optional[Model] | configuration of the model in the Model Control Plane. | None | 
| model_version | Optional[Model] | DEPRECATED, please use  | None | 
Returns:
| Type | Description | 
|---|---|
| Union[Pipeline, Callable[[F], Pipeline]] | A pipeline instance. | 
Source code in zenml/new/pipelines/pipeline_decorator.py
          def pipeline(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    model_version: Optional["Model"] = None,  # TODO: deprecate me
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
    """Decorator to create a pipeline.
    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        model_version: DEPRECATED, please use `model` instead.
    Returns:
        A pipeline instance.
    """
    def inner_decorator(func: "F") -> "Pipeline":
        from zenml.new.pipelines.pipeline import Pipeline
        if model_version:
            logger.warning(
                "Pipeline decorator argument `model_version` is deprecated. Please use `model` instead."
            )
        p = Pipeline(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_step_logs=enable_step_logs,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model or model_version,
            entrypoint=func,
        )
        p.__doc__ = func.__doc__
        return p
    return inner_decorator if _func is None else inner_decorator(_func)
        run_utils
    Utility functions for running pipelines.
create_placeholder_run(deployment)
    Create a placeholder run for the deployment.
If the deployment contains a schedule, no placeholder run will be created.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentResponse | The deployment for which to create the placeholder run. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineRunResponse] | The placeholder run or  | 
Source code in zenml/new/pipelines/run_utils.py
          def create_placeholder_run(
    deployment: "PipelineDeploymentResponse",
) -> Optional["PipelineRunResponse"]:
    """Create a placeholder run for the deployment.
    If the deployment contains a schedule, no placeholder run will be
    created.
    Args:
        deployment: The deployment for which to create the placeholder run.
    Returns:
        The placeholder run or `None` if no run was created.
    """
    assert deployment.user
    if deployment.schedule:
        return None
    run_request = PipelineRunRequest(
        name=get_run_name(run_name_template=deployment.run_name_template),
        # We set the start time on the placeholder run already to
        # make it consistent with the {time} placeholder in the
        # run name. This means the placeholder run will usually
        # have longer durations than scheduled runs, as for them
        # the start_time is only set once the first step starts
        # running.
        start_time=datetime.utcnow(),
        orchestrator_run_id=None,
        user=deployment.user.id,
        workspace=deployment.workspace.id,
        deployment=deployment.id,
        pipeline=deployment.pipeline.id if deployment.pipeline else None,
        status=ExecutionStatus.INITIALIZING,
    )
    return Client().zen_store.create_run(run_request)
deploy_pipeline(deployment, stack, placeholder_run=None)
    Run a deployment.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentResponse | The deployment to run. | required | 
| stack | Stack | The stack on which to run the deployment. | required | 
| placeholder_run | Optional[PipelineRunResponse] | An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed. | None | 
Exceptions:
| Type | Description | 
|---|---|
| Exception | Any exception that happened while deploying or running (in case it happens synchronously) the pipeline. | 
Source code in zenml/new/pipelines/run_utils.py
          def deploy_pipeline(
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
    """Run a deployment.
    Args:
        deployment: The deployment to run.
        stack: The stack on which to run the deployment.
        placeholder_run: An optional placeholder run for the deployment. This
            will be deleted in case the pipeline deployment failed.
    Raises:
        Exception: Any exception that happened while deploying or running
            (in case it happens synchronously) the pipeline.
    """
    stack.prepare_pipeline_deployment(deployment=deployment)
    # Prevent execution of nested pipelines which might lead to
    # unexpected behavior
    previous_value = constants.SHOULD_PREVENT_PIPELINE_EXECUTION
    constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
    try:
        stack.deploy_pipeline(deployment=deployment)
    except Exception as e:
        if (
            placeholder_run
            and Client().get_pipeline_run(placeholder_run.id).status
            == ExecutionStatus.INITIALIZING
        ):
            # The run hasn't actually started yet, which means that we
            # failed during initialization -> We don't want the
            # placeholder run to stay in the database
            Client().delete_pipeline_run(placeholder_run.id)
        raise e
    finally:
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = previous_value
get_all_sources_from_value(value)
    Get all source objects from a value.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| value | Any | The value from which to get all the source objects. | required | 
Returns:
| Type | Description | 
|---|---|
| List[zenml.config.source.Source] | List of source objects for the given value. | 
Source code in zenml/new/pipelines/run_utils.py
          def get_all_sources_from_value(value: Any) -> List[Source]:
    """Get all source objects from a value.
    Args:
        value: The value from which to get all the source objects.
    Returns:
        List of source objects for the given value.
    """
    sources = []
    if isinstance(value, Source):
        sources.append(value)
    elif isinstance(value, BaseModel):
        for v in value.__dict__.values():
            sources.extend(get_all_sources_from_value(v))
    elif isinstance(value, Dict):
        for v in value.values():
            sources.extend(get_all_sources_from_value(v))
    elif isinstance(value, (List, Set, tuple)):
        for v in value:
            sources.extend(get_all_sources_from_value(v))
    return sources
get_default_run_name(pipeline_name)
    Gets the default name for a pipeline run.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pipeline_name | str | Name of the pipeline which will be run. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Run name. | 
Source code in zenml/new/pipelines/run_utils.py
          def get_default_run_name(pipeline_name: str) -> str:
    """Gets the default name for a pipeline run.
    Args:
        pipeline_name: Name of the pipeline which will be run.
    Returns:
        Run name.
    """
    return f"{pipeline_name}-{{date}}-{{time}}"
get_placeholder_run(deployment_id)
    Get the placeholder run for a deployment.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment_id | UUID | ID of the deployment for which to get the placeholder run. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[PipelineRunResponse] | The placeholder run or  | 
Source code in zenml/new/pipelines/run_utils.py
          def get_placeholder_run(
    deployment_id: UUID,
) -> Optional["PipelineRunResponse"]:
    """Get the placeholder run for a deployment.
    Args:
        deployment_id: ID of the deployment for which to get the placeholder
            run.
    Returns:
        The placeholder run or `None` if there exists no placeholder run for the
        deployment.
    """
    runs = Client().list_pipeline_runs(
        sort_by="asc:created",
        size=1,
        deployment_id=deployment_id,
        status=ExecutionStatus.INITIALIZING,
    )
    if len(runs.items) == 0:
        return None
    run = runs.items[0]
    if run.orchestrator_run_id is None:
        return run
    return None
upload_notebook_cell_code_if_necessary(deployment, stack)
    Upload notebook cell code if necessary.
This function checks if any of the steps of the pipeline that will be executed in a different process are defined in a notebook. If that is the case, it will extract that notebook cell code into python files and upload an archive of all the necessary files to the artifact store.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentBase | The deployment. | required | 
| stack | Stack | The stack on which the deployment will happen. | required | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the code for one of the steps that will run out of process cannot be extracted into a python file. | 
Source code in zenml/new/pipelines/run_utils.py
          def upload_notebook_cell_code_if_necessary(
    deployment: "PipelineDeploymentBase", stack: "Stack"
) -> None:
    """Upload notebook cell code if necessary.
    This function checks if any of the steps of the pipeline that will be
    executed in a different process are defined in a notebook. If that is the
    case, it will extract that notebook cell code into python files and upload
    an archive of all the necessary files to the artifact store.
    Args:
        deployment: The deployment.
        stack: The stack on which the deployment will happen.
    Raises:
        RuntimeError: If the code for one of the steps that will run out of
            process cannot be extracted into a python file.
    """
    should_upload = False
    resolved_notebook_sources = source_utils.get_resolved_notebook_sources()
    for step in deployment.step_configurations.values():
        source = step.spec.source
        if source.type == SourceType.NOTEBOOK:
            if (
                stack.orchestrator.flavor != "local"
                or step.config.step_operator
            ):
                should_upload = True
                cell_code = resolved_notebook_sources.get(
                    source.import_path, None
                )
                # Code does not run in-process, which means we need to
                # extract the step code into a python file
                if not cell_code:
                    raise RuntimeError(
                        f"Unable to run step {step.config.name}. This step is "
                        "defined in a notebook and you're trying to run it "
                        "in a remote environment, but ZenML was not able to "
                        "detect the step code in the notebook. To fix "
                        "this error, define your step in a python file instead "
                        "of a notebook."
                    )
    if should_upload:
        logger.info("Uploading notebook code...")
        for _, cell_code in resolved_notebook_sources.items():
            notebook_utils.warn_about_notebook_cell_magic_commands(
                cell_code=cell_code
            )
            module_name = notebook_utils.compute_cell_replacement_module_name(
                cell_code=cell_code
            )
            file_name = f"{module_name}.py"
            code_utils.upload_notebook_code(
                artifact_store=stack.artifact_store,
                cell_code=cell_code,
                file_name=file_name,
            )
        all_deployment_sources = get_all_sources_from_value(deployment)
        for source in all_deployment_sources:
            if source.type == SourceType.NOTEBOOK:
                setattr(source, "artifact_store_id", stack.artifact_store.id)
        logger.info("Upload finished.")
validate_run_config_is_runnable_from_server(run_configuration)
    Validates that the run configuration can be used to run from the server.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_configuration | PipelineRunConfiguration | The run configuration to validate. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If there are values in the run configuration that are not allowed when running a pipeline from the server. | 
Source code in zenml/new/pipelines/run_utils.py
          def validate_run_config_is_runnable_from_server(
    run_configuration: "PipelineRunConfiguration",
) -> None:
    """Validates that the run configuration can be used to run from the server.
    Args:
        run_configuration: The run configuration to validate.
    Raises:
        ValueError: If there are values in the run configuration that are not
            allowed when running a pipeline from the server.
    """
    if run_configuration.parameters:
        raise ValueError(
            "Can't set parameters when running pipeline via Rest API."
        )
    if run_configuration.build:
        raise ValueError("Can't set build when running pipeline via Rest API.")
    if run_configuration.schedule:
        raise ValueError(
            "Can't set schedule when running pipeline via Rest API."
        )
    if run_configuration.settings.get("docker"):
        raise ValueError(
            "Can't set DockerSettings when running pipeline via Rest API."
        )
    for step_update in run_configuration.steps.values():
        if step_update.settings.get("docker"):
            raise ValueError(
                "Can't set DockerSettings when running pipeline via Rest API."
            )
validate_stack_is_runnable_from_server(zen_store, stack)
    Validate if a stack model is runnable from the server.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| zen_store | BaseZenStore | ZenStore to use for listing flavors. | required | 
| stack | StackResponse | The stack to validate. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If the stack has components of a custom flavor or local components. | 
Source code in zenml/new/pipelines/run_utils.py
          def validate_stack_is_runnable_from_server(
    zen_store: BaseZenStore, stack: StackResponse
) -> None:
    """Validate if a stack model is runnable from the server.
    Args:
        zen_store: ZenStore to use for listing flavors.
        stack: The stack to validate.
    Raises:
        ValueError: If the stack has components of a custom flavor or local
            components.
    """
    for component_list in stack.components.values():
        assert len(component_list) == 1
        component = component_list[0]
        flavors = zen_store.list_flavors(
            FlavorFilter(name=component.flavor, type=component.type)
        )
        assert len(flavors) == 1
        flavor_model = flavors[0]
        if flavor_model.workspace is not None:
            raise ValueError("No custom stack component flavors allowed.")
        flavor = Flavor.from_model(flavor_model)
        component_config = flavor.config_class(**component.configuration)
        if component_config.is_local:
            raise ValueError("No local stack components allowed.")
wait_for_pipeline_run_to_finish(run_id)
    Waits until a pipeline run is finished.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_id | UUID | ID of the run for which to wait. | required | 
Returns:
| Type | Description | 
|---|---|
| PipelineRunResponse | Model of the finished run. | 
Source code in zenml/new/pipelines/run_utils.py
          def wait_for_pipeline_run_to_finish(run_id: UUID) -> "PipelineRunResponse":
    """Waits until a pipeline run is finished.
    Args:
        run_id: ID of the run for which to wait.
    Returns:
        Model of the finished run.
    """
    sleep_interval = 1
    max_sleep_interval = 64
    while True:
        run = Client().get_pipeline_run(run_id)
        if run.status.is_finished:
            return run
        logger.info(
            "Waiting for pipeline run with ID %s to finish (current status: %s)",
            run_id,
            run.status,
        )
        time.sleep(sleep_interval)
        if sleep_interval < max_sleep_interval:
            sleep_interval *= 2
        steps
  
      special
  
    
        decorated_step
    Internal BaseStep subclass used by the step decorator.
        step_context
    Step context class.
        
StepContext        
    Provides additional context inside a step function.
This singleton class is used to access information about the current run, step run, or its outputs inside a step function.
Usage example:
from zenml.steps import get_step_context
@step
def my_trainer_step() -> Any:
    context = get_step_context()
    # get info about the current pipeline run
    current_pipeline_run = context.pipeline_run
    # get info about the current step run
    current_step_run = context.step_run
    # get info about the future output artifacts of this step
    output_artifact_uri = context.get_output_artifact_uri()
    ...
Source code in zenml/new/steps/step_context.py
          class StepContext(metaclass=SingletonMetaClass):
    """Provides additional context inside a step function.
    This singleton class is used to access information about the current run,
    step run, or its outputs inside a step function.
    Usage example:
    ```python
    from zenml.steps import get_step_context
    @step
    def my_trainer_step() -> Any:
        context = get_step_context()
        # get info about the current pipeline run
        current_pipeline_run = context.pipeline_run
        # get info about the current step run
        current_step_run = context.step_run
        # get info about the future output artifacts of this step
        output_artifact_uri = context.get_output_artifact_uri()
        ...
    ```
    """
    def __init__(
        self,
        pipeline_run: "PipelineRunResponse",
        step_run: "StepRunResponse",
        output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
        output_artifact_uris: Mapping[str, str],
        output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
        step_run_info: "StepRunInfo",
        cache_enabled: bool,
    ) -> None:
        """Initialize the context of the currently running step.
        Args:
            pipeline_run: The model of the current pipeline run.
            step_run: The model of the current step run.
            output_materializers: The output materializers of the step that
                this context is used in.
            output_artifact_uris: The output artifacts of the step that this
                context is used in.
            output_artifact_configs: The outputs' ArtifactConfigs of the step that this
                context is used in.
            step_run_info: (Deprecated) info about the currently running step.
            cache_enabled: (Deprecated) Whether caching is enabled for the step.
        Raises:
            StepContextError: If the keys of the output materializers and
                output artifacts do not match.
        """
        from zenml.client import Client
        try:
            pipeline_run = Client().get_pipeline_run(pipeline_run.id)
        except KeyError:
            pass
        self.pipeline_run = pipeline_run
        try:
            step_run = Client().get_run_step(step_run.id)
        except KeyError:
            pass
        self.step_run = step_run
        self._step_run_info = step_run_info
        self._cache_enabled = cache_enabled
        # Get the stack that we are running in
        self._stack = Client().active_stack
        self.step_name = self.step_run.name
        # set outputs
        if output_materializers.keys() != output_artifact_uris.keys():
            raise StepContextError(
                f"Mismatched keys in output materializers and output artifact "
                f"URIs for step `{self.step_name}`. Output materializer "
                f"keys: {set(output_materializers)}, output artifact URI "
                f"keys: {set(output_artifact_uris)}"
            )
        self._outputs = {
            key: StepContextOutput(
                materializer_classes=output_materializers[key],
                artifact_uri=output_artifact_uris[key],
                artifact_config=output_artifact_configs[key],
            )
            for key in output_materializers.keys()
        }
    @property
    def pipeline(self) -> "PipelineResponse":
        """Returns the current pipeline.
        Returns:
            The current pipeline or None.
        Raises:
            StepContextError: If the pipeline run does not have a pipeline.
        """
        if self.pipeline_run.pipeline:
            return self.pipeline_run.pipeline
        raise StepContextError(
            f"Unable to get pipeline in step `{self.step_name}` of pipeline "
            f"run '{self.pipeline_run.id}': This pipeline run does not have "
            f"a pipeline associated with it."
        )
    @property
    def model(self) -> "Model":
        """Returns configured Model.
        Order of resolution to search for Model is:
            1. Model from @step
            2. Model from @pipeline
        Returns:
            The `Model` object associated with the current step.
        Raises:
            StepContextError: If the `Model` object is not set in `@step` or `@pipeline`.
        """
        if (
            self.step_run.config.model is not None
            and self.step_run.model_version is not None
        ):
            model = self.step_run.model_version.to_model_class()
        elif self.pipeline_run.config.model is not None:
            if self.pipeline_run.model_version:
                model = self.pipeline_run.model_version.to_model_class()
            else:
                model = self.pipeline_run.config.model
        else:
            raise StepContextError(
                f"Unable to get Model in step `{self.step_name}` of pipeline "
                f"run '{self.pipeline_run.id}': it was not set in `@step` or `@pipeline`."
            )
        return model
    # TODO: deprecate me
    @property
    def model_version(self) -> "Model":
        """DEPRECATED, use `model` instead.
        Returns:
            The `Model` object associated with the current step.
        """
        logger.warning(
            "Step context `model_version` is deprecated. Please use `model` instead."
        )
        return self.model
    @property
    def inputs(self) -> Dict[str, "ArtifactVersionResponse"]:
        """Returns the input artifacts of the current step.
        Returns:
            The input artifacts of the current step.
        """
        return self.step_run.inputs
    def _get_output(
        self, output_name: Optional[str] = None
    ) -> "StepContextOutput":
        """Returns the materializer and artifact URI for a given step output.
        Args:
            output_name: Optional name of the output for which to get the
                materializer and URI.
        Returns:
            Tuple containing the materializer and artifact URI for the
                given output.
        Raises:
            StepContextError: If the step has no outputs, no output for
                the given `output_name` or if no `output_name` was given but
                the step has multiple outputs.
        """
        output_count = len(self._outputs)
        if output_count == 0:
            raise StepContextError(
                f"Unable to get step output for step `{self.step_name}`: "
                f"This step does not have any outputs."
            )
        if not output_name and output_count > 1:
            raise StepContextError(
                f"Unable to get step output for step `{self.step_name}`: "
                f"This step has multiple outputs ({set(self._outputs)}), "
                f"please specify which output to return."
            )
        if output_name:
            if output_name not in self._outputs:
                raise StepContextError(
                    f"Unable to get step output '{output_name}' for "
                    f"step `{self.step_name}`. This step does not have an "
                    f"output with the given name, please specify one of the "
                    f"available outputs: {set(self._outputs)}."
                )
            return self._outputs[output_name]
        else:
            return next(iter(self._outputs.values()))
    def get_output_materializer(
        self,
        output_name: Optional[str] = None,
        custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
        data_type: Optional[Type[Any]] = None,
    ) -> "BaseMaterializer":
        """Returns a materializer for a given step output.
        Args:
            output_name: Optional name of the output for which to get the
                materializer. If no name is given and the step only has a
                single output, the materializer of this output will be
                returned. If the step has multiple outputs, an exception
                will be raised.
            custom_materializer_class: If given, this `BaseMaterializer`
                subclass will be initialized with the output artifact instead
                of the materializer that was registered for this step output.
            data_type: If the output annotation is of type `Union` and the step
                therefore has multiple materializers configured, you can provide
                a data type for the output which will be used to select the
                correct materializer. If not provided, the first materializer
                will be used.
        Returns:
            A materializer initialized with the output artifact for
            the given output.
        """
        from zenml.utils import materializer_utils
        output = self._get_output(output_name)
        materializer_classes = output.materializer_classes
        artifact_uri = output.artifact_uri
        if custom_materializer_class:
            materializer_class = custom_materializer_class
        elif len(materializer_classes) == 1 or not data_type:
            materializer_class = materializer_classes[0]
        else:
            materializer_class = materializer_utils.select_materializer(
                data_type=data_type, materializer_classes=materializer_classes
            )
        return materializer_class(artifact_uri)
    def get_output_artifact_uri(
        self, output_name: Optional[str] = None
    ) -> str:
        """Returns the artifact URI for a given step output.
        Args:
            output_name: Optional name of the output for which to get the URI.
                If no name is given and the step only has a single output,
                the URI of this output will be returned. If the step has
                multiple outputs, an exception will be raised.
        Returns:
            Artifact URI for the given output.
        """
        return self._get_output(output_name).artifact_uri
    def get_output_metadata(
        self, output_name: Optional[str] = None
    ) -> Dict[str, "MetadataType"]:
        """Returns the metadata for a given step output.
        Args:
            output_name: Optional name of the output for which to get the
                metadata. If no name is given and the step only has a single
                output, the metadata of this output will be returned. If the
                step has multiple outputs, an exception will be raised.
        Returns:
            Metadata for the given output.
        """
        output = self._get_output(output_name)
        custom_metadata = output.run_metadata or {}
        if output.artifact_config:
            custom_metadata.update(
                **(output.artifact_config.run_metadata or {})
            )
        return custom_metadata
    def get_output_tags(self, output_name: Optional[str] = None) -> List[str]:
        """Returns the tags for a given step output.
        Args:
            output_name: Optional name of the output for which to get the
                metadata. If no name is given and the step only has a single
                output, the metadata of this output will be returned. If the
                step has multiple outputs, an exception will be raised.
        Returns:
            Tags for the given output.
        """
        output = self._get_output(output_name)
        custom_tags = set(output.tags or [])
        if output.artifact_config:
            return list(
                set(output.artifact_config.tags or []).union(custom_tags)
            )
        return list(custom_tags)
    def add_output_metadata(
        self,
        metadata: Dict[str, "MetadataType"],
        output_name: Optional[str] = None,
    ) -> None:
        """Adds metadata for a given step output.
        Args:
            metadata: The metadata to add.
            output_name: Optional name of the output for which to add the
                metadata. If no name is given and the step only has a single
                output, the metadata of this output will be added. If the
                step has multiple outputs, an exception will be raised.
        """
        output = self._get_output(output_name)
        if not output.run_metadata:
            output.run_metadata = {}
        output.run_metadata.update(**metadata)
    def add_output_tags(
        self,
        tags: List[str],
        output_name: Optional[str] = None,
    ) -> None:
        """Adds tags for a given step output.
        Args:
            tags: The tags to add.
            output_name: Optional name of the output for which to add the
                tags. If no name is given and the step only has a single
                output, the tags of this output will be added. If the
                step has multiple outputs, an exception will be raised.
        """
        output = self._get_output(output_name)
        if not output.tags:
            output.tags = []
        output.tags += tags
    def _set_artifact_config(
        self,
        artifact_config: "ArtifactConfig",
        output_name: Optional[str] = None,
    ) -> None:
        """Adds artifact config for a given step output.
        Args:
            artifact_config: The artifact config of the output to set.
            output_name: Optional name of the output for which to set the
                output signature. If no name is given and the step only has a single
                output, the metadata of this output will be added. If the
                step has multiple outputs, an exception will be raised.
        Raises:
            EntityExistsError: If the output already has an output signature.
        """
        output = self._get_output(output_name)
        if output.artifact_config is None:
            output.artifact_config = artifact_config
        else:
            raise EntityExistsError(
                f"Output with name '{output_name}' already has artifact config."
            )
inputs: Dict[str, ArtifactVersionResponse]
  
      property
      readonly
  
    Returns the input artifacts of the current step.
Returns:
| Type | Description | 
|---|---|
| Dict[str, ArtifactVersionResponse] | The input artifacts of the current step. | 
model: Model
  
      property
      readonly
  
    Returns configured Model.
Order of resolution to search for Model is: 1. Model from @step 2. Model from @pipeline
Returns:
| Type | Description | 
|---|---|
| Model | The  | 
Exceptions:
| Type | Description | 
|---|---|
| StepContextError | If the  | 
model_version: Model
  
      property
      readonly
  
    DEPRECATED, use model instead.
Returns:
| Type | Description | 
|---|---|
| Model | The  | 
pipeline: PipelineResponse
  
      property
      readonly
  
    Returns the current pipeline.
Returns:
| Type | Description | 
|---|---|
| PipelineResponse | The current pipeline or None. | 
Exceptions:
| Type | Description | 
|---|---|
| StepContextError | If the pipeline run does not have a pipeline. | 
__init__(self, pipeline_run, step_run, output_materializers, output_artifact_uris, output_artifact_configs, step_run_info, cache_enabled)
  
      special
  
    Initialize the context of the currently running step.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pipeline_run | PipelineRunResponse | The model of the current pipeline run. | required | 
| step_run | StepRunResponse | The model of the current step run. | required | 
| output_materializers | Mapping[str, Sequence[Type[BaseMaterializer]]] | The output materializers of the step that this context is used in. | required | 
| output_artifact_uris | Mapping[str, str] | The output artifacts of the step that this context is used in. | required | 
| output_artifact_configs | Mapping[str, Optional[ArtifactConfig]] | The outputs' ArtifactConfigs of the step that this context is used in. | required | 
| step_run_info | StepRunInfo | (Deprecated) info about the currently running step. | required | 
| cache_enabled | bool | (Deprecated) Whether caching is enabled for the step. | required | 
Exceptions:
| Type | Description | 
|---|---|
| StepContextError | If the keys of the output materializers and output artifacts do not match. | 
Source code in zenml/new/steps/step_context.py
          def __init__(
    self,
    pipeline_run: "PipelineRunResponse",
    step_run: "StepRunResponse",
    output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
    output_artifact_uris: Mapping[str, str],
    output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
    step_run_info: "StepRunInfo",
    cache_enabled: bool,
) -> None:
    """Initialize the context of the currently running step.
    Args:
        pipeline_run: The model of the current pipeline run.
        step_run: The model of the current step run.
        output_materializers: The output materializers of the step that
            this context is used in.
        output_artifact_uris: The output artifacts of the step that this
            context is used in.
        output_artifact_configs: The outputs' ArtifactConfigs of the step that this
            context is used in.
        step_run_info: (Deprecated) info about the currently running step.
        cache_enabled: (Deprecated) Whether caching is enabled for the step.
    Raises:
        StepContextError: If the keys of the output materializers and
            output artifacts do not match.
    """
    from zenml.client import Client
    try:
        pipeline_run = Client().get_pipeline_run(pipeline_run.id)
    except KeyError:
        pass
    self.pipeline_run = pipeline_run
    try:
        step_run = Client().get_run_step(step_run.id)
    except KeyError:
        pass
    self.step_run = step_run
    self._step_run_info = step_run_info
    self._cache_enabled = cache_enabled
    # Get the stack that we are running in
    self._stack = Client().active_stack
    self.step_name = self.step_run.name
    # set outputs
    if output_materializers.keys() != output_artifact_uris.keys():
        raise StepContextError(
            f"Mismatched keys in output materializers and output artifact "
            f"URIs for step `{self.step_name}`. Output materializer "
            f"keys: {set(output_materializers)}, output artifact URI "
            f"keys: {set(output_artifact_uris)}"
        )
    self._outputs = {
        key: StepContextOutput(
            materializer_classes=output_materializers[key],
            artifact_uri=output_artifact_uris[key],
            artifact_config=output_artifact_configs[key],
        )
        for key in output_materializers.keys()
    }
add_output_metadata(self, metadata, output_name=None)
    Adds metadata for a given step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| metadata | Dict[str, MetadataType] | The metadata to add. | required | 
| output_name | Optional[str] | Optional name of the output for which to add the metadata. If no name is given and the step only has a single output, the metadata of this output will be added. If the step has multiple outputs, an exception will be raised. | None | 
Source code in zenml/new/steps/step_context.py
          def add_output_metadata(
    self,
    metadata: Dict[str, "MetadataType"],
    output_name: Optional[str] = None,
) -> None:
    """Adds metadata for a given step output.
    Args:
        metadata: The metadata to add.
        output_name: Optional name of the output for which to add the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.run_metadata:
        output.run_metadata = {}
    output.run_metadata.update(**metadata)
add_output_tags(self, tags, output_name=None)
    Adds tags for a given step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tags | List[str] | The tags to add. | required | 
| output_name | Optional[str] | Optional name of the output for which to add the tags. If no name is given and the step only has a single output, the tags of this output will be added. If the step has multiple outputs, an exception will be raised. | None | 
Source code in zenml/new/steps/step_context.py
          def add_output_tags(
    self,
    tags: List[str],
    output_name: Optional[str] = None,
) -> None:
    """Adds tags for a given step output.
    Args:
        tags: The tags to add.
        output_name: Optional name of the output for which to add the
            tags. If no name is given and the step only has a single
            output, the tags of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.tags:
        output.tags = []
    output.tags += tags
get_output_artifact_uri(self, output_name=None)
    Returns the artifact URI for a given step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_name | Optional[str] | Optional name of the output for which to get the URI. If no name is given and the step only has a single output, the URI of this output will be returned. If the step has multiple outputs, an exception will be raised. | None | 
Returns:
| Type | Description | 
|---|---|
| str | Artifact URI for the given output. | 
Source code in zenml/new/steps/step_context.py
          def get_output_artifact_uri(
    self, output_name: Optional[str] = None
) -> str:
    """Returns the artifact URI for a given step output.
    Args:
        output_name: Optional name of the output for which to get the URI.
            If no name is given and the step only has a single output,
            the URI of this output will be returned. If the step has
            multiple outputs, an exception will be raised.
    Returns:
        Artifact URI for the given output.
    """
    return self._get_output(output_name).artifact_uri
get_output_materializer(self, output_name=None, custom_materializer_class=None, data_type=None)
    Returns a materializer for a given step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_name | Optional[str] | Optional name of the output for which to get the materializer. If no name is given and the step only has a single output, the materializer of this output will be returned. If the step has multiple outputs, an exception will be raised. | None | 
| custom_materializer_class | Optional[Type[BaseMaterializer]] | If given, this  | None | 
| data_type | Optional[Type[Any]] | If the output annotation is of type  | None | 
Returns:
| Type | Description | 
|---|---|
| BaseMaterializer | A materializer initialized with the output artifact for the given output. | 
Source code in zenml/new/steps/step_context.py
          def get_output_materializer(
    self,
    output_name: Optional[str] = None,
    custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
    data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
    """Returns a materializer for a given step output.
    Args:
        output_name: Optional name of the output for which to get the
            materializer. If no name is given and the step only has a
            single output, the materializer of this output will be
            returned. If the step has multiple outputs, an exception
            will be raised.
        custom_materializer_class: If given, this `BaseMaterializer`
            subclass will be initialized with the output artifact instead
            of the materializer that was registered for this step output.
        data_type: If the output annotation is of type `Union` and the step
            therefore has multiple materializers configured, you can provide
            a data type for the output which will be used to select the
            correct materializer. If not provided, the first materializer
            will be used.
    Returns:
        A materializer initialized with the output artifact for
        the given output.
    """
    from zenml.utils import materializer_utils
    output = self._get_output(output_name)
    materializer_classes = output.materializer_classes
    artifact_uri = output.artifact_uri
    if custom_materializer_class:
        materializer_class = custom_materializer_class
    elif len(materializer_classes) == 1 or not data_type:
        materializer_class = materializer_classes[0]
    else:
        materializer_class = materializer_utils.select_materializer(
            data_type=data_type, materializer_classes=materializer_classes
        )
    return materializer_class(artifact_uri)
get_output_metadata(self, output_name=None)
    Returns the metadata for a given step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_name | Optional[str] | Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised. | None | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, MetadataType] | Metadata for the given output. | 
Source code in zenml/new/steps/step_context.py
          def get_output_metadata(
    self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
    """Returns the metadata for a given step output.
    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.
    Returns:
        Metadata for the given output.
    """
    output = self._get_output(output_name)
    custom_metadata = output.run_metadata or {}
    if output.artifact_config:
        custom_metadata.update(
            **(output.artifact_config.run_metadata or {})
        )
    return custom_metadata
get_output_tags(self, output_name=None)
    Returns the tags for a given step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_name | Optional[str] | Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised. | None | 
Returns:
| Type | Description | 
|---|---|
| List[str] | Tags for the given output. | 
Source code in zenml/new/steps/step_context.py
          def get_output_tags(self, output_name: Optional[str] = None) -> List[str]:
    """Returns the tags for a given step output.
    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.
    Returns:
        Tags for the given output.
    """
    output = self._get_output(output_name)
    custom_tags = set(output.tags or [])
    if output.artifact_config:
        return list(
            set(output.artifact_config.tags or []).union(custom_tags)
        )
    return list(custom_tags)
        
StepContextOutput        
    Represents a step output in the step context.
Source code in zenml/new/steps/step_context.py
          class StepContextOutput:
    """Represents a step output in the step context."""
    materializer_classes: Sequence[Type["BaseMaterializer"]]
    artifact_uri: str
    run_metadata: Optional[Dict[str, "MetadataType"]] = None
    artifact_config: Optional["ArtifactConfig"]
    tags: Optional[List[str]] = None
    def __init__(
        self,
        materializer_classes: Sequence[Type["BaseMaterializer"]],
        artifact_uri: str,
        artifact_config: Optional["ArtifactConfig"],
    ):
        """Initialize the step output.
        Args:
            materializer_classes: The materializer classes for the output.
            artifact_uri: The artifact URI for the output.
            artifact_config: The ArtifactConfig object of the output.
        """
        self.materializer_classes = materializer_classes
        self.artifact_uri = artifact_uri
        self.artifact_config = artifact_config
__init__(self, materializer_classes, artifact_uri, artifact_config)
  
      special
  
    Initialize the step output.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| materializer_classes | Sequence[Type[BaseMaterializer]] | The materializer classes for the output. | required | 
| artifact_uri | str | The artifact URI for the output. | required | 
| artifact_config | Optional[ArtifactConfig] | The ArtifactConfig object of the output. | required | 
Source code in zenml/new/steps/step_context.py
          def __init__(
    self,
    materializer_classes: Sequence[Type["BaseMaterializer"]],
    artifact_uri: str,
    artifact_config: Optional["ArtifactConfig"],
):
    """Initialize the step output.
    Args:
        materializer_classes: The materializer classes for the output.
        artifact_uri: The artifact URI for the output.
        artifact_config: The ArtifactConfig object of the output.
    """
    self.materializer_classes = materializer_classes
    self.artifact_uri = artifact_uri
    self.artifact_config = artifact_config
get_step_context()
    Get the context of the currently running step.
Returns:
| Type | Description | 
|---|---|
| StepContext | The context of the currently running step. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If no step is currently running. | 
Source code in zenml/new/steps/step_context.py
          def get_step_context() -> "StepContext":
    """Get the context of the currently running step.
    Returns:
        The context of the currently running step.
    Raises:
        RuntimeError: If no step is currently running.
    """
    if StepContext._exists():
        return StepContext()  # type: ignore
    raise RuntimeError(
        "The step context is only available inside a step function."
    )
        step_decorator
    Step decorator function.
step(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, experiment_tracker=None, step_operator=None, output_materializers=None, settings=None, extra=None, on_failure=None, on_success=None, model=None, retry=None, model_version=None)
    Decorator to create a ZenML step.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| _func | Optional[F] | The decorated function. | None | 
| name | Optional[str] | The name of the step. If left empty, the name of the decorated function will be used as a fallback. | None | 
| enable_cache | Optional[bool] | Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default. | None | 
| enable_artifact_metadata | Optional[bool] | Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default. | None | 
| enable_artifact_visualization | Optional[bool] | Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default. | None | 
| enable_step_logs | Optional[bool] | Specify whether step logs are enabled for this step. | None | 
| experiment_tracker | Optional[str] | The experiment tracker to use for this step. | None | 
| step_operator | Optional[str] | The step operator to use for this step. | None | 
| output_materializers | Optional[OutputMaterializersSpecification] | Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs. | None | 
| settings | Optional[Dict[str, SettingsOrDict]] | Settings for this step. | None | 
| extra | Optional[Dict[str, Any]] | Extra configurations for this step. | None | 
| on_failure | Optional[HookSpecification] | Callback function in event of failure of the step. Can be a
function with a single argument of type  | None | 
| on_success | Optional[HookSpecification] | Callback function in event of success of the step. Can be a
function with no arguments, or a source path to such a function
(e.g.  | None | 
| model | Optional[Model] | configuration of the model in the Model Control Plane. | None | 
| retry | Optional[StepRetryConfig] | configuration of step retry in case of step failure. | None | 
| model_version | Optional[Model] | DEPRECATED, please use  | None | 
Returns:
| Type | Description | 
|---|---|
| Union[BaseStep, Callable[[F], BaseStep]] | The step instance. | 
Source code in zenml/new/steps/step_decorator.py
          def step(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional["OutputMaterializersSpecification"] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    model_version: Optional["Model"] = None,  # TODO: deprecate me
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
    """Decorator to create a ZenML step.
    Args:
        _func: The decorated function.
        name: The name of the step. If left empty, the name of the decorated
            function will be used as a fallback.
        enable_cache: Specify whether caching is enabled for this step. If no
            value is passed, caching is enabled by default.
        enable_artifact_metadata: Specify whether metadata is enabled for this
            step. If no value is passed, metadata is enabled by default.
        enable_artifact_visualization: Specify whether visualization is enabled
            for this step. If no value is passed, visualization is enabled by
            default.
        enable_step_logs: Specify whether step logs are enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: Settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: configuration of step retry in case of step failure.
        model_version: DEPRECATED, please use `model` instead.
    Returns:
        The step instance.
    """
    def inner_decorator(func: "F") -> "BaseStep":
        from zenml.new.steps.decorated_step import _DecoratedStep
        # TODO: deprecate me
        if model_version:
            logger.warning(
                "Step decorator argument `model_version` is deprecated. Please use `model` instead."
            )
        class_: Type["BaseStep"] = type(
            func.__name__,
            (_DecoratedStep,),
            {
                "entrypoint": staticmethod(func),
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )
        step_instance = class_(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            experiment_tracker=experiment_tracker,
            step_operator=step_operator,
            output_materializers=output_materializers,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model or model_version,
            retry=retry,
        )
        return step_instance
    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)