Skip to content

New

zenml.new special

pipelines special

build_utils

Pipeline build utilities.

build_required(deployment)

Checks whether a build is required for the deployment and active stack.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment for which to check.

required

Returns:

Type Description
bool

If a build is required.

Source code in zenml/new/pipelines/build_utils.py
def build_required(deployment: "PipelineDeploymentBase") -> bool:
    """Checks whether a build is required for the deployment and active stack.

    Args:
        deployment: The deployment for which to check.

    Returns:
        If a build is required.
    """
    stack = Client().active_stack
    return bool(stack.get_docker_builds(deployment=deployment))
code_download_possible(deployment, code_repository=None)

Checks whether code download is possible for the deployment.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment.

required
code_repository Optional[BaseCodeRepository]

If provided, this code repository can be used to download the code inside the container images.

None

Returns:

Type Description
bool

Whether code download is possible for the deployment.

Source code in zenml/new/pipelines/build_utils.py
def code_download_possible(
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
    """Checks whether code download is possible for the deployment.

    Args:
        deployment: The deployment.
        code_repository: If provided, this code repository can be used to
            download the code inside the container images.

    Returns:
        Whether code download is possible for the deployment.
    """
    for step in deployment.step_configurations.values():
        if step.config.docker_settings.allow_download_from_artifact_store:
            continue

        if (
            step.config.docker_settings.allow_download_from_code_repository
            and code_repository
        ):
            continue

        return False

    return True
compute_build_checksum(items, stack, code_repository=None)

Compute an overall checksum for a pipeline build.

Parameters:

Name Type Description Default
items List[BuildConfiguration]

Items of the build.

required
stack Stack

The stack associated with the build. Will be used to gather its requirements.

required
code_repository Optional[BaseCodeRepository]

The code repository that will be used to download files inside the build. Will be used for its dependency specification.

None

Returns:

Type Description
str

The build checksum.

Source code in zenml/new/pipelines/build_utils.py
def compute_build_checksum(
    items: List["BuildConfiguration"],
    stack: "Stack",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
    """Compute an overall checksum for a pipeline build.

    Args:
        items: Items of the build.
        stack: The stack associated with the build. Will be used to gather
            its requirements.
        code_repository: The code repository that will be used to download
            files inside the build. Will be used for its dependency
            specification.

    Returns:
        The build checksum.
    """
    hash_ = hashlib.md5()  # nosec

    for item in items:
        key = PipelineBuildBase.get_image_key(
            component_key=item.key, step=item.step_name
        )

        settings_checksum = item.compute_settings_checksum(
            stack=stack,
            code_repository=code_repository,
        )

        hash_.update(key.encode())
        hash_.update(settings_checksum.encode())

    return hash_.hexdigest()
compute_stack_checksum(stack)

Compute a stack checksum.

Parameters:

Name Type Description Default
stack StackResponse

The stack for which to compute the checksum.

required

Returns:

Type Description
str

The checksum.

Source code in zenml/new/pipelines/build_utils.py
def compute_stack_checksum(stack: StackResponse) -> str:
    """Compute a stack checksum.

    Args:
        stack: The stack for which to compute the checksum.

    Returns:
        The checksum.
    """
    hash_ = hashlib.md5()  # nosec

    # This checksum is used to see if the stack has been updated since a build
    # was created for it. We create this checksum not with specific requirements
    # as these might change with new ZenML releases, but they don't actually
    # invalidate those Docker images.
    required_integrations = sorted(
        {
            component.integration
            for components in stack.components.values()
            for component in components
            if component.integration and component.integration != "built-in"
        }
    )
    for integration in required_integrations:
        hash_.update(integration.encode())

    return hash_.hexdigest()
create_pipeline_build(deployment, pipeline_id=None, code_repository=None)

Builds images and registers the output in the server.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment.

required
pipeline_id Optional[uuid.UUID]

The ID of the pipeline.

None
code_repository Optional[BaseCodeRepository]

If provided, this code repository will be used to download inside the build images.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The build output.

Exceptions:

Type Description
RuntimeError

If multiple builds with the same key but different settings were specified.

Source code in zenml/new/pipelines/build_utils.py
def create_pipeline_build(
    deployment: "PipelineDeploymentBase",
    pipeline_id: Optional[UUID] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Builds images and registers the output in the server.

    Args:
        deployment: The pipeline deployment.
        pipeline_id: The ID of the pipeline.
        code_repository: If provided, this code repository will be used to
            download inside the build images.

    Returns:
        The build output.

    Raises:
        RuntimeError: If multiple builds with the same key but different
            settings were specified.
    """
    client = Client()
    stack_model = Client().active_stack_model
    stack = client.active_stack
    required_builds = stack.get_docker_builds(deployment=deployment)

    if not required_builds:
        logger.debug("No docker builds required.")
        return None

    logger.info(
        "Building Docker image(s) for pipeline `%s`.",
        deployment.pipeline_configuration.name,
    )

    docker_image_builder = PipelineDockerImageBuilder()
    images: Dict[str, BuildItem] = {}
    checksums: Dict[str, str] = {}

    for build_config in required_builds:
        combined_key = PipelineBuildBase.get_image_key(
            component_key=build_config.key, step=build_config.step_name
        )
        checksum = build_config.compute_settings_checksum(
            stack=stack, code_repository=code_repository
        )

        if combined_key in images:
            previous_checksum = images[combined_key].settings_checksum

            if previous_checksum != checksum:
                raise RuntimeError(
                    f"Trying to build image for key `{combined_key}` but "
                    "an image for this key was already built with a "
                    "different configuration. This happens if multiple "
                    "stack components specified Docker builds for the same "
                    "key in the `StackComponent.get_docker_builds(...)` "
                    "method. If you're using custom components, make sure "
                    "to provide unique keys when returning your build "
                    "configurations to avoid this error."
                )
            else:
                continue

        if checksum in checksums:
            item_key = checksums[checksum]
            image_name_or_digest = images[item_key].image
            contains_code = images[item_key].contains_code
            dockerfile = images[item_key].dockerfile
            requirements = images[item_key].requirements
        else:
            tag = deployment.pipeline_configuration.name
            if build_config.step_name:
                tag += f"-{build_config.step_name}"
            tag += f"-{build_config.key}"

            include_files = build_config.should_include_files(
                code_repository=code_repository,
            )
            download_files = build_config.should_download_files(
                code_repository=code_repository,
            )
            pass_code_repo = (
                build_config.should_download_files_from_code_repository(
                    code_repository=code_repository
                )
            )

            (
                image_name_or_digest,
                dockerfile,
                requirements,
            ) = docker_image_builder.build_docker_image(
                docker_settings=build_config.settings,
                tag=tag,
                stack=stack,
                include_files=include_files,
                download_files=download_files,
                entrypoint=build_config.entrypoint,
                extra_files=build_config.extra_files,
                code_repository=code_repository if pass_code_repo else None,
            )
            contains_code = include_files

        images[combined_key] = BuildItem(
            image=image_name_or_digest,
            dockerfile=dockerfile,
            requirements=requirements,
            settings_checksum=checksum,
            contains_code=contains_code,
            requires_code_download=download_files,
        )
        checksums[checksum] = combined_key

    logger.info("Finished building Docker image(s).")

    is_local = stack.container_registry is None
    contains_code = any(item.contains_code for item in images.values())
    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )
    stack_checksum = compute_stack_checksum(stack=stack_model)
    build_request = PipelineBuildRequest(
        user=client.active_user.id,
        workspace=client.active_workspace.id,
        stack=stack_model.id,
        pipeline=pipeline_id,
        is_local=is_local,
        contains_code=contains_code,
        images=images,
        zenml_version=zenml.__version__,
        python_version=platform.python_version(),
        checksum=build_checksum,
        stack_checksum=stack_checksum,
    )
    return client.zen_store.create_build(build_request)
find_existing_build(deployment, code_repository=None)

Find an existing build for a deployment.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment for which to find an existing build.

required
code_repository Optional[BaseCodeRepository]

The code repository that will be used to download files in the images.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The existing build to reuse if found.

Source code in zenml/new/pipelines/build_utils.py
def find_existing_build(
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Find an existing build for a deployment.

    Args:
        deployment: The deployment for which to find an existing build.
        code_repository: The code repository that will be used to download
            files in the images.

    Returns:
        The existing build to reuse if found.
    """
    client = Client()
    stack = client.active_stack

    python_version_prefix = ".".join(platform.python_version_tuple()[:2])
    required_builds = stack.get_docker_builds(deployment=deployment)

    if not required_builds:
        return None

    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )

    matches = client.list_builds(
        sort_by="desc:created",
        size=1,
        stack_id=stack.id,
        # The build is local and it's not clear whether the images
        # exist on the current machine or if they've been overwritten.
        # TODO: Should we support this by storing the unique Docker ID for
        #   the image and checking if an image with that ID exists locally?
        is_local=False,
        # The build contains some code which might be different from the
        # local code the user is expecting to run
        contains_code=False,
        zenml_version=zenml.__version__,
        # Match all patch versions of the same Python major + minor
        python_version=f"startswith:{python_version_prefix}",
        checksum=build_checksum,
    )

    if not matches.items:
        return None

    return matches[0]
requires_download_from_code_repository(deployment)

Checks whether the deployment needs to download code from a repository.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment.

required

Returns:

Type Description
bool

If the deployment needs to download code from a code repository.

Source code in zenml/new/pipelines/build_utils.py
def requires_download_from_code_repository(
    deployment: "PipelineDeploymentBase",
) -> bool:
    """Checks whether the deployment needs to download code from a repository.

    Args:
        deployment: The deployment.

    Returns:
        If the deployment needs to download code from a code repository.
    """
    for step in deployment.step_configurations.values():
        docker_settings = step.config.docker_settings

        if docker_settings.allow_download_from_artifact_store:
            return False

        if docker_settings.allow_including_files_in_images:
            return False

        if docker_settings.allow_download_from_code_repository:
            # The other two options are false, which means download from a
            # code repo is required.
            return True

    return False
requires_included_code(deployment, code_repository=None)

Checks whether the deployment requires included code.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment.

required
code_repository Optional[BaseCodeRepository]

If provided, this code repository can be used to download the code inside the container images.

None

Returns:

Type Description
bool

If the deployment requires code included in the container images.

Source code in zenml/new/pipelines/build_utils.py
def requires_included_code(
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> bool:
    """Checks whether the deployment requires included code.

    Args:
        deployment: The deployment.
        code_repository: If provided, this code repository can be used to
            download the code inside the container images.

    Returns:
        If the deployment requires code included in the container images.
    """
    for step in deployment.step_configurations.values():
        docker_settings = step.config.docker_settings

        if docker_settings.allow_download_from_artifact_store:
            return False

        if docker_settings.allow_download_from_code_repository:
            if code_repository:
                continue

        if docker_settings.allow_including_files_in_images:
            return True

    return False
reuse_or_create_pipeline_build(deployment, allow_build_reuse, pipeline_id=None, build=None, code_repository=None)

Loads or creates a pipeline build.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to load or create the build.

required
allow_build_reuse bool

If True, the build is allowed to reuse an existing build.

required
pipeline_id Optional[uuid.UUID]

Optional ID of the pipeline to reference in the build.

None
build Union[UUID, PipelineBuildBase]

Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created.

None
code_repository Optional[BaseCodeRepository]

If provided, this code repository can be used to download code inside the container images.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The build response.

Source code in zenml/new/pipelines/build_utils.py
def reuse_or_create_pipeline_build(
    deployment: "PipelineDeploymentBase",
    allow_build_reuse: bool,
    pipeline_id: Optional[UUID] = None,
    build: Union["UUID", "PipelineBuildBase", None] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponse"]:
    """Loads or creates a pipeline build.

    Args:
        deployment: The pipeline deployment for which to load or create the
            build.
        allow_build_reuse: If True, the build is allowed to reuse an
            existing build.
        pipeline_id: Optional ID of the pipeline to reference in the build.
        build: Optional existing build. If given, the build will be fetched
            (or registered) in the database. If not given, a new build will
            be created.
        code_repository: If provided, this code repository can be used to
            download code inside the container images.

    Returns:
        The build response.
    """
    if not build:
        if (
            allow_build_reuse
            and not deployment.should_prevent_build_reuse
            and not requires_included_code(
                deployment=deployment, code_repository=code_repository
            )
            and build_required(deployment=deployment)
        ):
            existing_build = find_existing_build(
                deployment=deployment, code_repository=code_repository
            )

            if existing_build:
                logger.info(
                    "Reusing existing build `%s` for stack `%s`.",
                    existing_build.id,
                    Client().active_stack.name,
                )
                return existing_build
            else:
                logger.info(
                    "Unable to find a build to reuse. A previous build can be "
                    "reused when the following conditions are met:\n"
                    "  * The existing build was created for the same stack, "
                    "ZenML version and Python version\n"
                    "  * The stack contains a container registry\n"
                    "  * The Docker settings of the pipeline and all its steps "
                    "are the same as for the existing build."
                )

        return create_pipeline_build(
            deployment=deployment,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )

    if isinstance(build, UUID):
        build_model = Client().zen_store.get_build(build_id=build)
    else:
        build_request = PipelineBuildRequest(
            user=Client().active_user.id,
            workspace=Client().active_workspace.id,
            stack=Client().active_stack_model.id,
            pipeline=pipeline_id,
            **build.model_dump(),
        )
        build_model = Client().zen_store.create_build(build=build_request)

    verify_custom_build(
        build=build_model,
        deployment=deployment,
        code_repository=code_repository,
    )

    return build_model
should_upload_code(deployment, build, code_reference)

Checks whether the current code should be uploaded for the deployment.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment.

required
build Optional[zenml.models.v2.core.pipeline_build.PipelineBuildResponse]

The build for the deployment.

required
code_reference Optional[zenml.models.v2.core.code_reference.CodeReferenceRequest]

The code reference for the deployment.

required

Returns:

Type Description
bool

Whether the current code should be uploaded for the deployment.

Source code in zenml/new/pipelines/build_utils.py
def should_upload_code(
    deployment: PipelineDeploymentBase,
    build: Optional[PipelineBuildResponse],
    code_reference: Optional[CodeReferenceRequest],
) -> bool:
    """Checks whether the current code should be uploaded for the deployment.

    Args:
        deployment: The deployment.
        build: The build for the deployment.
        code_reference: The code reference for the deployment.

    Returns:
        Whether the current code should be uploaded for the deployment.
    """
    if not build:
        # No build means we don't need to download code into a Docker container
        # for step execution. In other remote orchestrators that don't use
        # Docker containers but instead use e.g. Wheels to run, the code should
        # already be included.
        return False

    for step in deployment.step_configurations.values():
        docker_settings = step.config.docker_settings

        if (
            code_reference
            and docker_settings.allow_download_from_code_repository
        ):
            # No upload needed for this step
            continue

        if docker_settings.allow_download_from_artifact_store:
            return True

    return False
verify_custom_build(build, deployment, code_repository=None)

Verify a custom build for a pipeline deployment.

Parameters:

Name Type Description Default
build PipelineBuildResponse

The build to verify.

required
deployment PipelineDeploymentBase

The deployment for which to verify the build.

required
code_repository Optional[BaseCodeRepository]

Code repository that will be used to download files for the deployment.

None

Exceptions:

Type Description
RuntimeError

If the build can't be used for the deployment.

Source code in zenml/new/pipelines/build_utils.py
def verify_custom_build(
    build: "PipelineBuildResponse",
    deployment: "PipelineDeploymentBase",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
    """Verify a custom build for a pipeline deployment.

    Args:
        build: The build to verify.
        deployment: The deployment for which to verify the build.
        code_repository: Code repository that will be used to download files
            for the deployment.

    Raises:
        RuntimeError: If the build can't be used for the deployment.
    """
    stack = Client().active_stack
    required_builds = stack.get_docker_builds(deployment=deployment)

    if build.stack and build.stack.id != stack.id:
        logger.warning(
            "The stack `%s` used for the build `%s` is not the same as the "
            "stack `%s` that the pipeline will run on. This could lead "
            "to issues if the stacks have different build requirements.",
            build.stack.name,
            build.id,
            stack.name,
        )

    if build.contains_code:
        logger.warning(
            "The build you specified for this run contains code and will run "
            "with the step code that was included in the Docker images which "
            "might differ from the local code in your client environment."
        )

    if build.requires_code_download:
        if requires_included_code(
            deployment=deployment, code_repository=code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be included in the Docker "
                "image, but the build you "
                "specified requires code download. Either update your "
                "`DockerSettings` or specify a different build and try "
                "again."
            )

        if (
            requires_download_from_code_repository(deployment=deployment)
            and not code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be downloaded from a "
                "code repository but "
                "there is no code repository active at your current source "
                f"root `{source_utils.get_source_root()}`."
            )

        if not code_download_possible(
            deployment=deployment, code_repository=code_repository
        ):
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code can not be downloaded from the "
                "artifact store, but the build you specified requires code "
                "download. Either update your `DockerSettings` or specify a "
                "different build and try again."
            )

    if build.checksum:
        build_checksum = compute_build_checksum(
            required_builds, stack=stack, code_repository=code_repository
        )
        if build_checksum != build.checksum:
            logger.warning(
                "The Docker settings used for the build `%s` are "
                "not the same as currently specified for your pipeline. "
                "This means that the build you specified to run this "
                "pipeline might be outdated and most likely contains "
                "outdated requirements.",
                build.id,
            )
    else:
        # No checksum given for the entire build, we manually check that
        # all the images exist and the setting match
        for build_config in required_builds:
            try:
                image = build.get_image(
                    component_key=build_config.key,
                    step=build_config.step_name,
                )
            except KeyError:
                raise RuntimeError(
                    "The build you specified is missing an image for key: "
                    f"{build_config.key}."
                )

            if build_config.compute_settings_checksum(
                stack=stack, code_repository=code_repository
            ) != build.get_settings_checksum(
                component_key=build_config.key, step=build_config.step_name
            ):
                logger.warning(
                    "The Docker settings used to build the image `%s` are "
                    "not the same as currently specified for your pipeline. "
                    "This means that the build you specified to run this "
                    "pipeline might be outdated and most likely contains "
                    "outdated code or requirements.",
                    image,
                )

    if build.is_local:
        logger.warning(
            "You manually specified a local build to run your pipeline. "
            "This might lead to errors if the images don't exist on "
            "your local machine or the image tags have been "
            "overwritten since the original build happened."
        )
verify_local_repository_context(deployment, local_repo_context)

Verifies the local repository.

If the local repository exists and has no local changes, code download inside the images is possible.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment.

required
local_repo_context Optional[LocalRepositoryContext]

The local repository active at the source root.

required

Exceptions:

Type Description
RuntimeError

If the deployment requires code download but code download is not possible.

Returns:

Type Description
Optional[zenml.code_repositories.base_code_repository.BaseCodeRepository]

The code repository from which to download files for the runs of the deployment, or None if code download is not possible.

Source code in zenml/new/pipelines/build_utils.py
def verify_local_repository_context(
    deployment: "PipelineDeploymentBase",
    local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
    """Verifies the local repository.

    If the local repository exists and has no local changes, code download
    inside the images is possible.

    Args:
        deployment: The pipeline deployment.
        local_repo_context: The local repository active at the source root.

    Raises:
        RuntimeError: If the deployment requires code download but code download
            is not possible.

    Returns:
        The code repository from which to download files for the runs of the
        deployment, or None if code download is not possible.
    """
    if build_required(deployment=deployment):
        if requires_download_from_code_repository(deployment=deployment):
            if not local_repo_context:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "there is no code repository active at your current source "
                    f"root `{source_utils.get_source_root()}`."
                )
            elif local_repo_context.is_dirty:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "the code repository active at your current source root "
                    f"`{source_utils.get_source_root()}` has uncommitted "
                    "changes."
                )
            elif local_repo_context.has_local_changes:
                raise RuntimeError(
                    "The `DockerSettings` of the pipeline or one of its "
                    "steps specify that code should be downloaded from a "
                    "code repository, but "
                    "the code repository active at your current source root "
                    f"`{source_utils.get_source_root()}` has unpushed "
                    "changes."
                )

        if local_repo_context:
            if local_repo_context.is_dirty:
                logger.warning(
                    "Unable to use code repository to download code for this "
                    "run as there are uncommitted changes."
                )
            elif local_repo_context.has_local_changes:
                logger.warning(
                    "Unable to use code repository to download code for this "
                    "run as there are unpushed changes."
                )

    code_repository = None
    if local_repo_context and not local_repo_context.has_local_changes:
        model = Client().get_code_repository(
            local_repo_context.code_repository_id
        )
        code_repository = BaseCodeRepository.from_model(model)

    return code_repository

code_archive

Code archive.

CodeArchive (Archivable)

Code archive class.

This class is used to archive user code before uploading it to the artifact store. If the user code is stored in a Git repository, only files not excluded by gitignores will be included in the archive.

Source code in zenml/new/pipelines/code_archive.py
class CodeArchive(Archivable):
    """Code archive class.

    This class is used to archive user code before uploading it to the artifact
    store. If the user code is stored in a Git repository, only files not
    excluded by gitignores will be included in the archive.
    """

    def __init__(self, root: str) -> None:
        """Initialize the object.

        Args:
            root: Root directory of the archive.
        """
        super().__init__()
        self._root = root

    @property
    def git_repo(self) -> Optional["Repo"]:
        """Git repository active at the code archive root.

        Returns:
            The git repository if available.
        """
        try:
            # These imports fail when git is not installed on the machine
            from git.exc import InvalidGitRepositoryError
            from git.repo.base import Repo
        except ImportError:
            return None

        try:
            git_repo = Repo(path=self._root, search_parent_directories=True)
        except InvalidGitRepositoryError:
            return None

        return git_repo

    def _get_all_files(self) -> Dict[str, str]:
        """Get all files inside the archive root.

        Returns:
            All files inside the archive root.
        """
        all_files = {}
        for root, _, files in os.walk(self._root):
            for file in files:
                file_path = os.path.join(root, file)
                path_in_archive = os.path.relpath(file_path, self._root)
                all_files[path_in_archive] = file_path

        return all_files

    def get_files(self) -> Dict[str, str]:
        """Gets all regular files that should be included in the archive.

        Raises:
            RuntimeError: If the code archive would not include any files.

        Returns:
            A dict {path_in_archive: path_on_filesystem} for all regular files
            in the archive.
        """
        all_files = {}

        if repo := self.git_repo:
            try:
                result = repo.git.ls_files(
                    "--cached",
                    "--others",
                    "--modified",
                    "--exclude-standard",
                    self._root,
                )
            except Exception as e:
                logger.warning(
                    "Failed to get non-ignored files from git: %s", str(e)
                )
                all_files = self._get_all_files()
            else:
                for file in result.split():
                    file_path = os.path.join(repo.working_dir, file)
                    path_in_archive = os.path.relpath(file_path, self._root)

                    if os.path.exists(file_path):
                        all_files[path_in_archive] = file_path
        else:
            all_files = self._get_all_files()

        if not all_files:
            raise RuntimeError(
                "The code archive to be uploaded does not contain any files. "
                "This is probably because all files in your source root "
                f"`{self._root}` are ignored by a .gitignore file."
            )

        # Explicitly remove .zen directories as we write an updated version
        # to disk everytime ZenML is called. This updates the mtime of the
        # file, which invalidates the code upload caching. The values in
        # the .zen directory are not needed anyway as we set them as
        # environment variables.
        all_files = {
            path_in_archive: file_path
            for path_in_archive, file_path in sorted(all_files.items())
            if ".zen" not in Path(path_in_archive).parts[:-1]
        }

        return all_files

    def write_archive(
        self, output_file: IO[bytes], use_gzip: bool = True
    ) -> None:
        """Writes an archive of the build context to the given file.

        Args:
            output_file: The file to write the archive to.
            use_gzip: Whether to use `gzip` to compress the file.
        """
        super().write_archive(output_file=output_file, use_gzip=use_gzip)
        archive_size = os.path.getsize(output_file.name)
        if archive_size > 20 * 1024 * 1024:
            logger.warning(
                "Code archive size: `%s`. If you believe this is "
                "unreasonably large, make sure to version your code in git and "
                "ignore unnecessary files using a `.gitignore` file.",
                string_utils.get_human_readable_filesize(archive_size),
            )
git_repo: Optional[Repo] property readonly

Git repository active at the code archive root.

Returns:

Type Description
Optional[Repo]

The git repository if available.

__init__(self, root) special

Initialize the object.

Parameters:

Name Type Description Default
root str

Root directory of the archive.

required
Source code in zenml/new/pipelines/code_archive.py
def __init__(self, root: str) -> None:
    """Initialize the object.

    Args:
        root: Root directory of the archive.
    """
    super().__init__()
    self._root = root
get_files(self)

Gets all regular files that should be included in the archive.

Exceptions:

Type Description
RuntimeError

If the code archive would not include any files.

Returns:

Type Description
A dict {path_in_archive

path_on_filesystem} for all regular files in the archive.

Source code in zenml/new/pipelines/code_archive.py
def get_files(self) -> Dict[str, str]:
    """Gets all regular files that should be included in the archive.

    Raises:
        RuntimeError: If the code archive would not include any files.

    Returns:
        A dict {path_in_archive: path_on_filesystem} for all regular files
        in the archive.
    """
    all_files = {}

    if repo := self.git_repo:
        try:
            result = repo.git.ls_files(
                "--cached",
                "--others",
                "--modified",
                "--exclude-standard",
                self._root,
            )
        except Exception as e:
            logger.warning(
                "Failed to get non-ignored files from git: %s", str(e)
            )
            all_files = self._get_all_files()
        else:
            for file in result.split():
                file_path = os.path.join(repo.working_dir, file)
                path_in_archive = os.path.relpath(file_path, self._root)

                if os.path.exists(file_path):
                    all_files[path_in_archive] = file_path
    else:
        all_files = self._get_all_files()

    if not all_files:
        raise RuntimeError(
            "The code archive to be uploaded does not contain any files. "
            "This is probably because all files in your source root "
            f"`{self._root}` are ignored by a .gitignore file."
        )

    # Explicitly remove .zen directories as we write an updated version
    # to disk everytime ZenML is called. This updates the mtime of the
    # file, which invalidates the code upload caching. The values in
    # the .zen directory are not needed anyway as we set them as
    # environment variables.
    all_files = {
        path_in_archive: file_path
        for path_in_archive, file_path in sorted(all_files.items())
        if ".zen" not in Path(path_in_archive).parts[:-1]
    }

    return all_files
write_archive(self, output_file, use_gzip=True)

Writes an archive of the build context to the given file.

Parameters:

Name Type Description Default
output_file IO[bytes]

The file to write the archive to.

required
use_gzip bool

Whether to use gzip to compress the file.

True
Source code in zenml/new/pipelines/code_archive.py
def write_archive(
    self, output_file: IO[bytes], use_gzip: bool = True
) -> None:
    """Writes an archive of the build context to the given file.

    Args:
        output_file: The file to write the archive to.
        use_gzip: Whether to use `gzip` to compress the file.
    """
    super().write_archive(output_file=output_file, use_gzip=use_gzip)
    archive_size = os.path.getsize(output_file.name)
    if archive_size > 20 * 1024 * 1024:
        logger.warning(
            "Code archive size: `%s`. If you believe this is "
            "unreasonably large, make sure to version your code in git and "
            "ignore unnecessary files using a `.gitignore` file.",
            string_utils.get_human_readable_filesize(archive_size),
        )

pipeline

Definition of a ZenML pipeline.

Pipeline

ZenML pipeline class.

Source code in zenml/new/pipelines/pipeline.py
class Pipeline:
    """ZenML pipeline class."""

    # The active pipeline is the pipeline to which step invocations will be
    # added when a step is called. It is set using a context manager when a
    # pipeline is called (see Pipeline.__call__ for more context)
    ACTIVE_PIPELINE: ClassVar[Optional["Pipeline"]] = None

    def __init__(
        self,
        name: str,
        entrypoint: F,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
        model: Optional["Model"] = None,
    ) -> None:
        """Initializes a pipeline.

        Args:
            name: The name of the pipeline.
            entrypoint: The entrypoint function of the pipeline.
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline.
            enable_step_logs: If step logs should be enabled for this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            on_failure: Callback function in event of failure of the step. Can
                be a function with a single argument of type `BaseException`, or
                a source path to such a function (e.g. `module.my_function`).
            on_success: Callback function in event of success of the step. Can
                be a function with no arguments, or a source path to such a
                function (e.g. `module.my_function`).
            model: configuration of the model in the Model Control Plane.
        """
        self._invocations: Dict[str, StepInvocation] = {}
        self._run_args: Dict[str, Any] = {}

        self._configuration = PipelineConfiguration(
            name=name,
        )
        self._from_config_file: Dict[str, Any] = {}
        with self.__suppress_configure_warnings__():
            self.configure(
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                enable_artifact_visualization=enable_artifact_visualization,
                enable_step_logs=enable_step_logs,
                settings=settings,
                extra=extra,
                on_failure=on_failure,
                on_success=on_success,
                model=model,
            )
        self.entrypoint = entrypoint
        self._parameters: Dict[str, Any] = {}

        self.__suppress_warnings_flag__ = False

    @property
    def name(self) -> str:
        """The name of the pipeline.

        Returns:
            The name of the pipeline.
        """
        return self.configuration.name

    @property
    def enable_cache(self) -> Optional[bool]:
        """If caching is enabled for the pipeline.

        Returns:
            If caching is enabled for the pipeline.
        """
        return self.configuration.enable_cache

    @property
    def configuration(self) -> PipelineConfiguration:
        """The configuration of the pipeline.

        Returns:
            The configuration of the pipeline.
        """
        return self._configuration

    @property
    def invocations(self) -> Dict[str, StepInvocation]:
        """Returns the step invocations of this pipeline.

        This dictionary will only be populated once the pipeline has been
        called.

        Returns:
            The step invocations.
        """
        return self._invocations

    def resolve(self) -> "Source":
        """Resolves the pipeline.

        Returns:
            The pipeline source.
        """
        return source_utils.resolve(self.entrypoint, skip_validation=True)

    @property
    def source_object(self) -> Any:
        """The source object of this pipeline.

        Returns:
            The source object of this pipeline.
        """
        return self.entrypoint

    @property
    def source_code(self) -> str:
        """The source code of this pipeline.

        Returns:
            The source code of this pipeline.
        """
        return inspect.getsource(self.source_object)

    @property
    def model(self) -> "PipelineResponse":
        """Gets the registered pipeline model for this instance.

        Returns:
            The registered pipeline model.

        Raises:
            RuntimeError: If the pipeline has not been registered yet.
        """
        self._prepare_if_possible()

        pipelines = Client().list_pipelines(name=self.name)
        if len(pipelines) == 1:
            return pipelines.items[0]

        raise RuntimeError(
            f"Cannot get the model of pipeline '{self.name}' because it has "
            f"not been registered yet. Please ensure that the pipeline has "
            f"been run or built and try again."
        )

    @contextmanager
    def __suppress_configure_warnings__(self) -> Iterator[Any]:
        """Context manager to suppress warnings in `Pipeline.configure(...)`.

        Used to suppress warnings when called from inner code and not user-facing code.

        Yields:
            Nothing.
        """
        self.__suppress_warnings_flag__ = True
        yield
        self.__suppress_warnings_flag__ = False

    def configure(
        self: T,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
        model: Optional["Model"] = None,
        parameters: Optional[Dict[str, Any]] = None,
        merge: bool = True,
    ) -> T:
        """Configures the pipeline.

        Configuration merging example:
        * `merge==True`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=True)
            pipeline.configuration.extra # {"key1": 1, "key2": 2}
        * `merge==False`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=False)
            pipeline.configuration.extra # {"key2": 2}

        Args:
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline.
            enable_step_logs: If step logs should be enabled for this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            on_failure: Callback function in event of failure of the step. Can
                be a function with a single argument of type `BaseException`, or
                a source path to such a function (e.g. `module.my_function`).
            on_success: Callback function in event of success of the step. Can
                be a function with no arguments, or a source path to such a
                function (e.g. `module.my_function`).
            merge: If `True`, will merge the given dictionary configurations
                like `extra` and `settings` with existing
                configurations. If `False` the given configurations will
                overwrite all existing ones. See the general description of this
                method for an example.
            model: configuration of the model version in the Model Control Plane.
            parameters: input parameters for the pipeline.

        Returns:
            The pipeline instance that this method was called on.
        """
        failure_hook_source = None
        if on_failure:
            # string of on_failure hook function to be used for this pipeline
            failure_hook_source = resolve_and_validate_hook(on_failure)

        success_hook_source = None
        if on_success:
            # string of on_success hook function to be used for this pipeline
            success_hook_source = resolve_and_validate_hook(on_success)

        values = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "enable_artifact_visualization": enable_artifact_visualization,
                "enable_step_logs": enable_step_logs,
                "settings": settings,
                "extra": extra,
                "failure_hook_source": failure_hook_source,
                "success_hook_source": success_hook_source,
                "model": model,
                "parameters": parameters,
            }
        )
        if not self.__suppress_warnings_flag__:
            to_be_reapplied = []
            for param_, value_ in values.items():
                if (
                    param_ in PipelineRunConfiguration.model_fields
                    and param_ in self._from_config_file
                    and value_ != self._from_config_file[param_]
                ):
                    to_be_reapplied.append(
                        (param_, self._from_config_file[param_], value_)
                    )
            if to_be_reapplied:
                msg = ""
                reapply_during_run_warning = (
                    "The value of parameter '{name}' has changed from "
                    "'{file_value}' to '{new_value}' set in your configuration "
                    "file.\n"
                )
                for name, file_value, new_value in to_be_reapplied:
                    msg += reapply_during_run_warning.format(
                        name=name, file_value=file_value, new_value=new_value
                    )
                msg += (
                    "Configuration file value will be used during pipeline "
                    "run, so you change will not be efficient. Consider "
                    "updating your configuration file instead."
                )
                logger.warning(msg)

        config = PipelineConfigurationUpdate(**values)
        self._apply_configuration(config, merge=merge)
        return self

    @property
    def requires_parameters(self) -> bool:
        """If the pipeline entrypoint requires parameters.

        Returns:
            If the pipeline entrypoint requires parameters.
        """
        signature = inspect.signature(self.entrypoint, follow_wrapped=True)
        return any(
            parameter.default is inspect.Parameter.empty
            for parameter in signature.parameters.values()
        )

    @property
    def is_prepared(self) -> bool:
        """If the pipeline is prepared.

        Prepared means that the pipeline entrypoint has been called and the
        pipeline is fully defined.

        Returns:
            If the pipeline is prepared.
        """
        return len(self.invocations) > 0

    def prepare(self, *args: Any, **kwargs: Any) -> None:
        """Prepares the pipeline.

        Args:
            *args: Pipeline entrypoint input arguments.
            **kwargs: Pipeline entrypoint input keyword arguments.

        Raises:
            RuntimeError: If the pipeline has parameters configured differently in
                configuration file and code.
        """
        # Clear existing parameters and invocations
        self._parameters = {}
        self._invocations = {}

        conflicting_parameters = {}
        parameters_ = (self.configuration.parameters or {}).copy()
        if from_file_ := self._from_config_file.get("parameters", None):
            parameters_ = dict_utils.recursive_update(parameters_, from_file_)
        if parameters_:
            for k, v_runtime in kwargs.items():
                if k in parameters_:
                    v_config = parameters_[k]
                    if v_config != v_runtime:
                        conflicting_parameters[k] = (v_config, v_runtime)
            if conflicting_parameters:
                is_plural = "s" if len(conflicting_parameters) > 1 else ""
                msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
                for key, values in conflicting_parameters.items():
                    msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
                msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
    parameters:
        param_name: value1


# pipeline.py
@pipeline
def pipeline_(param_name: str):
    step_name()

if __name__=="__main__":
    pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
                raise RuntimeError(msg)
            for k, v_config in parameters_.items():
                if k not in kwargs:
                    kwargs[k] = v_config

        with self:
            # Enter the context manager, so we become the active pipeline. This
            # means that all steps that get called while the entrypoint function
            # is executed will be added as invocation to this pipeline instance.
            self._call_entrypoint(*args, **kwargs)

    def register(self) -> "PipelineResponse":
        """Register the pipeline in the server.

        Returns:
            The registered pipeline model.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry

        self._prepare_if_possible()
        integration_registry.activate_integrations()

        if self.configuration.model_dump(
            exclude_defaults=True, exclude={"name"}
        ):
            logger.warning(
                f"The pipeline `{self.name}` that you're registering has "
                "custom configurations applied to it. These will not be "
                "registered with the pipeline and won't be set when you build "
                "images or run the pipeline from the CLI. To provide these "
                "configurations, use the `--config` option of the `zenml "
                "pipeline build/run` commands."
            )

        return self._register()

    def build(
        self,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        config_path: Optional[str] = None,
    ) -> Optional["PipelineBuildResponse"]:
        """Builds Docker images for the pipeline.

        Args:
            settings: Settings for the pipeline.
            step_configurations: Configurations for steps of the pipeline.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.

        Returns:
            The build output.
        """
        with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
            self._prepare_if_possible()
            deployment, _, _ = self._compile(
                config_path=config_path,
                steps=step_configurations,
                settings=settings,
            )
            pipeline_id = self._register().id

            local_repo = code_repository_utils.find_active_code_repository()
            code_repository = build_utils.verify_local_repository_context(
                deployment=deployment, local_repo_context=local_repo
            )

            return build_utils.create_pipeline_build(
                deployment=deployment,
                pipeline_id=pipeline_id,
                code_repository=code_repository,
            )

    def _run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        enable_step_logs: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBase", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
    ) -> Optional[PipelineRunResponse]:
        """Runs the pipeline on the active stack.

        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline run.
            enable_step_logs: If step logs should be enabled for this pipeline.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: DEPRECATED: Use
                `DockerSettings.prevent_build_reuse` instead.

        Returns:
            Model of the pipeline run if running without a schedule, `None` if
            running with a schedule.

        Raises:
            ValueError: if the orchestrator doesn't support scheduling, but schedule was given
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls when importing modules needed to run a step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return None

        logger.info(f"Initiating a new run for the pipeline: `{self.name}`.")

        with track_handler(AnalyticsEvent.RUN_PIPELINE) as analytics_handler:
            deployment, schedule, build = self._compile(
                config_path=config_path,
                run_name=run_name,
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                enable_artifact_visualization=enable_artifact_visualization,
                enable_step_logs=enable_step_logs,
                steps=step_configurations,
                settings=settings,
                schedule=schedule,
                build=build,
                extra=extra,
            )

            skip_pipeline_registration = constants.handle_bool_env_var(
                constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
                default=False,
            )

            register_pipeline = not (skip_pipeline_registration or unlisted)

            pipeline_id = None
            if register_pipeline:
                pipeline_id = self._register().id

            else:
                logger.debug(f"Pipeline {self.name} is unlisted.")

            stack = Client().active_stack
            stack.validate()

            schedule_id = None
            if schedule:
                if not stack.orchestrator.config.is_schedulable:
                    raise ValueError(
                        f"Stack {stack.name} does not support scheduling. "
                        "Not all orchestrator types support scheduling, "
                        "kindly consult with "
                        "https://docs.zenml.io/how-to/build-pipelines/schedule-a-pipeline "
                        "for details."
                    )
                if schedule.name:
                    schedule_name = schedule.name
                else:
                    schedule_name = format_name_template(
                        deployment.run_name_template
                    )
                components = Client().active_stack_model.components
                orchestrator = components[StackComponentType.ORCHESTRATOR][0]
                schedule_model = ScheduleRequest(
                    workspace=Client().active_workspace.id,
                    user=Client().active_user.id,
                    pipeline_id=pipeline_id,
                    orchestrator_id=orchestrator.id,
                    name=schedule_name,
                    active=True,
                    cron_expression=schedule.cron_expression,
                    start_time=schedule.start_time,
                    end_time=schedule.end_time,
                    interval_second=schedule.interval_second,
                    catchup=schedule.catchup,
                    run_once_start_time=schedule.run_once_start_time,
                )
                schedule_id = (
                    Client().zen_store.create_schedule(schedule_model).id
                )
                logger.info(
                    f"Created schedule `{schedule_name}` for pipeline "
                    f"`{deployment.pipeline_configuration.name}`."
                )

            stack = Client().active_stack
            stack.validate()
            upload_notebook_cell_code_if_necessary(
                deployment=deployment, stack=stack
            )

            local_repo_context = (
                code_repository_utils.find_active_code_repository()
            )
            code_repository = build_utils.verify_local_repository_context(
                deployment=deployment, local_repo_context=local_repo_context
            )

            if prevent_build_reuse:
                logger.warning(
                    "Passing `prevent_build_reuse=True` to "
                    "`pipeline.with_opitions(...)` is deprecated. Use "
                    "`DockerSettings.prevent_build_reuse` instead."
                )

            build_model = build_utils.reuse_or_create_pipeline_build(
                deployment=deployment,
                pipeline_id=pipeline_id,
                allow_build_reuse=not prevent_build_reuse,
                build=build,
                code_repository=code_repository,
            )
            build_id = build_model.id if build_model else None

            code_reference = None
            if local_repo_context and not local_repo_context.is_dirty:
                source_root = source_utils.get_source_root()
                subdirectory = (
                    Path(source_root)
                    .resolve()
                    .relative_to(local_repo_context.root)
                )

                code_reference = CodeReferenceRequest(
                    commit=local_repo_context.current_commit,
                    subdirectory=subdirectory.as_posix(),
                    code_repository=local_repo_context.code_repository_id,
                )

            code_path = None
            if build_utils.should_upload_code(
                deployment=deployment,
                build=build_model,
                code_reference=code_reference,
            ):
                code_archive = code_utils.CodeArchive(
                    root=source_utils.get_source_root()
                )
                logger.info("Archiving pipeline code...")
                code_path = code_utils.upload_code_if_necessary(code_archive)

            deployment_request = PipelineDeploymentRequest(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=stack.id,
                pipeline=pipeline_id,
                build=build_id,
                schedule=schedule_id,
                code_reference=code_reference,
                code_path=code_path,
                **deployment.model_dump(),
            )
            deployment_model = Client().zen_store.create_deployment(
                deployment=deployment_request
            )

            self.log_pipeline_deployment_metadata(deployment_model)
            run = create_placeholder_run(deployment=deployment_model)

            analytics_handler.metadata = self._get_pipeline_analytics_metadata(
                deployment=deployment_model,
                stack=stack,
                run_id=run.id if run else None,
            )

            if run:
                run_url = dashboard_utils.get_run_url(run)
                if run_url:
                    logger.info(f"Dashboard URL for Pipeline Run: {run_url}")
                else:
                    logger.info(
                        "You can visualize your pipeline runs in the `ZenML "
                        "Dashboard`. In order to try it locally, please run "
                        "`zenml up`."
                    )

            deploy_pipeline(
                deployment=deployment_model, stack=stack, placeholder_run=run
            )
            if run:
                return Client().get_pipeline_run(run.id)
            return None

    @staticmethod
    def log_pipeline_deployment_metadata(
        deployment_model: PipelineDeploymentResponse,
    ) -> None:
        """Displays logs based on the deployment model upon running a pipeline.

        Args:
            deployment_model: The model for the pipeline deployment
        """
        try:
            # Log about the schedule/run
            if deployment_model.schedule:
                logger.info(
                    "Scheduling a run with the schedule: "
                    f"`{deployment_model.schedule.name}`"
                )
            else:
                logger.info("Executing a new run.")

            # Log about the caching status
            if deployment_model.pipeline_configuration.enable_cache is False:
                logger.info(
                    f"Caching is disabled by default for "
                    f"`{deployment_model.pipeline_configuration.name}`."
                )

            # Log about the used builds
            if deployment_model.build:
                logger.info("Using a build:")
                logger.info(
                    " Image(s): "
                    f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
                )

                # Log about version mismatches between local and build
                from zenml import __version__

                if deployment_model.build.zenml_version != __version__:
                    logger.info(
                        f"ZenML version (different than the local version): "
                        f"{deployment_model.build.zenml_version}"
                    )

                import platform

                if (
                    deployment_model.build.python_version
                    != platform.python_version()
                ):
                    logger.info(
                        f"Python version (different than the local version): "
                        f"{deployment_model.build.python_version}"
                    )

            # Log about the user, stack and components
            if deployment_model.user is not None:
                logger.info(f"Using user: `{deployment_model.user.name}`")

            if deployment_model.stack is not None:
                logger.info(f"Using stack: `{deployment_model.stack.name}`")

                for (
                    component_type,
                    component_models,
                ) in deployment_model.stack.components.items():
                    logger.info(
                        f"  {component_type.value}: `{component_models[0].name}`"
                    )
        except Exception as e:
            logger.debug(f"Logging pipeline deployment metadata failed: {e}")

    def get_runs(self, **kwargs: Any) -> List["PipelineRunResponse"]:
        """(Deprecated) Get runs of this pipeline.

        Args:
            **kwargs: Further arguments for filtering or pagination that are
                passed to `client.list_pipeline_runs()`.

        Returns:
            List of runs of this pipeline.
        """
        logger.warning(
            "`Pipeline.get_runs()` is deprecated and will be removed in a "
            "future version. Please use `Pipeline.model.get_runs()` instead."
        )
        return self.model.get_runs(**kwargs)

    def write_run_configuration_template(
        self, path: str, stack: Optional["Stack"] = None
    ) -> None:
        """Writes a run configuration yaml template.

        Args:
            path: The path where the template will be written.
            stack: The stack for which the template should be generated. If
                not given, the active stack will be used.
        """
        from zenml.config.base_settings import ConfigurationLevel
        from zenml.config.step_configurations import (
            PartialArtifactConfiguration,
        )

        self._prepare_if_possible()

        stack = stack or Client().active_stack

        setting_classes = stack.setting_classes
        setting_classes.update(settings_utils.get_general_settings())

        pipeline_settings = {}
        step_settings = {}
        for key, setting_class in setting_classes.items():
            fields = pydantic_utils.TemplateGenerator(setting_class).run()
            if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
                pipeline_settings[key] = fields
            if ConfigurationLevel.STEP in setting_class.LEVEL:
                step_settings[key] = fields

        steps = {}
        for step_name, invocation in self.invocations.items():
            step = invocation.step
            parameters = (
                pydantic_utils.TemplateGenerator(
                    step.entrypoint_definition.legacy_params.annotation
                ).run()
                if step.entrypoint_definition.legacy_params
                else {}
            )
            outputs = {
                name: PartialArtifactConfiguration()
                for name in step.entrypoint_definition.outputs
            }
            step_template = StepConfigurationUpdate(
                parameters=parameters,
                settings=step_settings,
                outputs=outputs,
            )
            steps[step_name] = step_template

        run_config = PipelineRunConfiguration(
            settings=pipeline_settings, steps=steps
        )
        template = pydantic_utils.TemplateGenerator(run_config).run()
        yaml_string = yaml.dump(template)
        yaml_string = yaml_utils.comment_out_yaml(yaml_string)

        with open(path, "w") as f:
            f.write(yaml_string)

    def _apply_configuration(
        self,
        config: PipelineConfigurationUpdate,
        merge: bool = True,
    ) -> None:
        """Applies an update to the pipeline configuration.

        Args:
            config: The configuration update.
            merge: Whether to merge the updates with the existing configuration
                or not. See the `BasePipeline.configure(...)` method for a
                detailed explanation.
        """
        self._validate_configuration(config)
        self._configuration = pydantic_utils.update_model(
            self._configuration, update=config, recursive=merge
        )
        logger.debug("Updated pipeline configuration:")
        logger.debug(self._configuration)

    @staticmethod
    def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
        """Validates a configuration update.

        Args:
            config: The configuration update to validate.
        """
        settings_utils.validate_setting_keys(list(config.settings))

    def _get_pipeline_analytics_metadata(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        run_id: Optional[UUID] = None,
    ) -> Dict[str, Any]:
        """Returns the pipeline deployment metadata.

        Args:
            deployment: The pipeline deployment to track.
            stack: The stack on which the pipeline will be deployed.
            run_id: The ID of the pipeline run.

        Returns:
            the metadata about the pipeline deployment
        """
        custom_materializer = False
        for step in deployment.step_configurations.values():
            for output in step.config.outputs.values():
                for source in output.materializer_source:
                    if not source.is_internal:
                        custom_materializer = True

        stack_creator = Client().get_stack(stack.id).user
        active_user = Client().active_user
        own_stack = stack_creator and stack_creator.id == active_user.id

        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        return {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "total_steps": len(self.invocations),
            "schedule": bool(deployment.schedule),
            "custom_materializer": custom_materializer,
            "own_stack": own_stack,
            "pipeline_run_id": str(run_id) if run_id else None,
        }

    def _compile(
        self, config_path: Optional[str] = None, **run_configuration_args: Any
    ) -> Tuple[
        "PipelineDeploymentBase",
        Optional["Schedule"],
        Union["PipelineBuildBase", UUID, None],
    ]:
        """Compiles the pipeline.

        Args:
            config_path: Path to a config file.
            **run_configuration_args: Configurations for the pipeline run.

        Returns:
            A tuple containing the deployment, schedule and build of
            the compiled pipeline.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        _from_config_file = self._parse_config_file(
            config_path=config_path,
            matcher=list(PipelineRunConfiguration.model_fields.keys()),
        )

        self._reconfigure_from_file_with_overrides(config_path=config_path)

        run_config = PipelineRunConfiguration(**_from_config_file)

        new_values = dict_utils.remove_none_values(run_configuration_args)
        update = PipelineRunConfiguration.model_validate(new_values)

        # Update with the values in code so they take precedence
        run_config = pydantic_utils.update_model(run_config, update=update)

        deployment = Compiler().compile(
            pipeline=self,
            stack=Client().active_stack,
            run_configuration=run_config,
        )

        return deployment, run_config.schedule, run_config.build

    def _register(self) -> "PipelineResponse":
        """Register the pipeline in the server.

        Returns:
            The registered pipeline model.
        """
        client = Client()

        def _get() -> PipelineResponse:
            matching_pipelines = client.list_pipelines(
                name=self.name,
                size=1,
                sort_by="desc:created",
            )

            if matching_pipelines.total:
                registered_pipeline = matching_pipelines.items[0]
                return registered_pipeline
            raise RuntimeError("No matching pipelines found.")

        try:
            return _get()
        except RuntimeError:
            request = PipelineRequest(
                workspace=client.active_workspace.id,
                user=client.active_user.id,
                name=self.name,
            )

            try:
                registered_pipeline = client.zen_store.create_pipeline(
                    pipeline=request
                )
                logger.info(
                    "Registered new pipeline: `%s`.",
                    registered_pipeline.name,
                )
                return registered_pipeline
            except EntityExistsError:
                return _get()

    def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
        """Computes a unique identifier from the pipeline spec and steps.

        Args:
            pipeline_spec: Compiled spec of the pipeline.

        Returns:
            The unique identifier of the pipeline.
        """
        from packaging import version

        hash_ = hashlib.md5()  # nosec
        hash_.update(pipeline_spec.json_with_string_sources.encode())

        if version.parse(pipeline_spec.version) >= version.parse("0.4"):
            # Only add this for newer versions to keep backwards compatibility
            hash_.update(self.source_code.encode())

        for step_spec in pipeline_spec.steps:
            invocation = self.invocations[step_spec.pipeline_parameter_name]
            step_source = invocation.step.source_code
            hash_.update(step_source.encode())

        return hash_.hexdigest()

    def add_step_invocation(
        self,
        step: "BaseStep",
        input_artifacts: Dict[str, StepArtifact],
        external_artifacts: Dict[str, "ExternalArtifact"],
        model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
        client_lazy_loaders: Dict[str, "ClientLazyLoader"],
        parameters: Dict[str, Any],
        default_parameters: Dict[str, Any],
        upstream_steps: Set[str],
        custom_id: Optional[str] = None,
        allow_id_suffix: bool = True,
    ) -> str:
        """Adds a step invocation to the pipeline.

        Args:
            step: The step for which to add an invocation.
            input_artifacts: The input artifacts for the invocation.
            external_artifacts: The external artifacts for the invocation.
            model_artifacts_or_metadata: The model artifacts or metadata for
                the invocation.
            client_lazy_loaders: The client lazy loaders for the invocation.
            parameters: The parameters for the invocation.
            default_parameters: The default parameters for the invocation.
            upstream_steps: The upstream steps for the invocation.
            custom_id: Custom ID to use for the invocation.
            allow_id_suffix: Whether a suffix can be appended to the invocation
                ID.

        Raises:
            RuntimeError: If the method is called on an inactive pipeline.
            RuntimeError: If the invocation was called with an artifact from
                a different pipeline.

        Returns:
            The step invocation ID.
        """
        if Pipeline.ACTIVE_PIPELINE != self:
            raise RuntimeError(
                "A step invocation can only be added to an active pipeline."
            )

        for artifact in input_artifacts.values():
            if artifact.pipeline is not self:
                raise RuntimeError(
                    "Got invalid input artifact for invocation of step "
                    f"{step.name}: The input artifact was produced by a step "
                    f"inside a different pipeline {artifact.pipeline.name}."
                )

        invocation_id = self._compute_invocation_id(
            step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
        )
        invocation = StepInvocation(
            id=invocation_id,
            step=step,
            input_artifacts=input_artifacts,
            external_artifacts=external_artifacts,
            model_artifacts_or_metadata=model_artifacts_or_metadata,
            client_lazy_loaders=client_lazy_loaders,
            parameters=parameters,
            default_parameters=default_parameters,
            upstream_steps=upstream_steps,
            pipeline=self,
        )
        self._invocations[invocation_id] = invocation
        return invocation_id

    def _compute_invocation_id(
        self,
        step: "BaseStep",
        custom_id: Optional[str] = None,
        allow_suffix: bool = True,
    ) -> str:
        """Compute the invocation ID.

        Args:
            step: The step for which to compute the ID.
            custom_id: Custom ID to use for the invocation.
            allow_suffix: Whether a suffix can be appended to the invocation
                ID.

        Raises:
            RuntimeError: If no ID suffix is allowed and an invocation for the
                same ID already exists.
            RuntimeError: If no unique invocation ID can be found.

        Returns:
            The invocation ID.
        """
        base_id = id_ = custom_id or step.name

        if id_ not in self.invocations:
            return id_

        if not allow_suffix:
            raise RuntimeError("Duplicate step ID")

        for index in range(2, 10000):
            id_ = f"{base_id}_{index}"
            if id_ not in self.invocations:
                return id_

        raise RuntimeError("Unable to find step ID")

    def __enter__(self: T) -> T:
        """Activate the pipeline context.

        Raises:
            RuntimeError: If a different pipeline is already active.

        Returns:
            The pipeline instance.
        """
        if Pipeline.ACTIVE_PIPELINE:
            raise RuntimeError(
                "Unable to enter pipeline context. A different pipeline "
                f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
            )

        Pipeline.ACTIVE_PIPELINE = self
        return self

    def __exit__(self, *args: Any) -> None:
        """Deactivates the pipeline context.

        Args:
            *args: The arguments passed to the context exit handler.
        """
        Pipeline.ACTIVE_PIPELINE = None

    def _parse_config_file(
        self, config_path: Optional[str], matcher: List[str]
    ) -> Dict[str, Any]:
        """Parses the given configuration file and sets `self._from_config_file`.

        Args:
            config_path: Path to a yaml configuration file.
            matcher: List of keys to match in the configuration file.

        Returns:
            Parsed config file according to matcher settings.
        """
        _from_config_file: Dict[str, Any] = {}
        if config_path:
            with open(config_path, "r") as f:
                _from_config_file = yaml.load(f, Loader=yaml.SafeLoader)
            _from_config_file = dict_utils.remove_none_values(
                {k: v for k, v in _from_config_file.items() if k in matcher}
            )

            # TODO: deprecate me
            if "model_version" in _from_config_file:
                logger.warning(
                    "YAML config option `model_version` is deprecated. Please use `model`."
                )
                _from_config_file["model"] = _from_config_file["model_version"]
                del _from_config_file["model_version"]

            if "model" in _from_config_file:
                if "model" in self._from_config_file:
                    _from_config_file["model"] = self._from_config_file[
                        "model"
                    ]
                else:
                    from zenml.model.model import Model

                    _from_config_file["model"] = Model.model_validate(
                        _from_config_file["model"]
                    )
        return _from_config_file

    def with_options(
        self,
        run_name: Optional[str] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBase", None] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
        **kwargs: Any,
    ) -> "Pipeline":
        """Copies the pipeline and applies the given configurations.

        Args:
            run_name: Name of the pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            step_configurations: Configurations for steps of the pipeline.
            steps: Configurations for steps of the pipeline. This is equivalent
                to `step_configurations`, and will be ignored if
                `step_configurations` is set as well.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: DEPRECATED: Use
                `DockerSettings.prevent_build_reuse` instead.
            **kwargs: Pipeline configuration options. These will be passed
                to the `pipeline.configure(...)` method.

        Returns:
            The copied pipeline instance.
        """
        if steps and step_configurations:
            logger.warning(
                "Step configurations were passed using both the "
                "`step_configurations` and `steps` keywords, ignoring the "
                "values passed using the `steps` keyword."
            )

        pipeline_copy = self.copy()

        pipeline_copy._reconfigure_from_file_with_overrides(
            config_path=config_path, **kwargs
        )

        run_args = dict_utils.remove_none_values(
            {
                "run_name": run_name,
                "schedule": schedule,
                "build": build,
                "step_configurations": step_configurations or steps,
                "config_path": config_path,
                "unlisted": unlisted,
                "prevent_build_reuse": prevent_build_reuse,
            }
        )
        pipeline_copy._run_args.update(run_args)
        return pipeline_copy

    def copy(self) -> "Pipeline":
        """Copies the pipeline.

        Returns:
            The pipeline copy.
        """
        return copy.deepcopy(self)

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Handle a call of the pipeline.

        This method does one of two things:
        * If there is an active pipeline context, it calls the pipeline
          entrypoint function within that context and the step invocations
          will be added to the active pipeline.
        * If no pipeline is active, it activates this pipeline before calling
          the entrypoint function.

        Args:
            *args: Entrypoint function arguments.
            **kwargs: Entrypoint function keyword arguments.

        Returns:
            The outputs of the entrypoint function call.
        """
        if Pipeline.ACTIVE_PIPELINE:
            # Calling a pipeline inside a pipeline, we return the potential
            # outputs of the entrypoint function

            # TODO: This currently ignores the configuration of the pipeline
            #   and instead applies the configuration of the previously active
            #   pipeline. Is this what we want?
            return self.entrypoint(*args, **kwargs)

        self.prepare(*args, **kwargs)
        return self._run(**self._run_args)

    def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
        """Calls the pipeline entrypoint function with the given arguments.

        Args:
            *args: Entrypoint function arguments.
            **kwargs: Entrypoint function keyword arguments.

        Raises:
            ValueError: If an input argument is missing or not JSON
                serializable.
        """
        try:
            validated_args = pydantic_utils.validate_function_args(
                self.entrypoint,
                ConfigDict(arbitrary_types_allowed=False),
                *args,
                **kwargs,
            )
        except ValidationError as e:
            raise ValueError(
                "Invalid or missing pipeline function entrypoint arguments. "
                "Only JSON serializable inputs are allowed as pipeline inputs."
                "Check out the pydantic error above for more details."
            ) from e

        self._parameters = validated_args
        self.entrypoint(**validated_args)

    def _prepare_if_possible(self) -> None:
        """Prepares the pipeline if possible.

        Raises:
            RuntimeError: If the pipeline is not prepared and the preparation
                requires parameters.
        """
        if not self.is_prepared:
            if self.requires_parameters:
                raise RuntimeError(
                    f"Failed while trying to prepare pipeline {self.name}. "
                    "The entrypoint function of the pipeline requires "
                    "arguments. Please prepare the pipeline by calling "
                    "`pipeline_instance.prepare(...)` and try again."
                )
            else:
                self.prepare()

    def _reconfigure_from_file_with_overrides(
        self,
        config_path: Optional[str] = None,
        **kwargs: Any,
    ) -> None:
        """Update the pipeline configuration from config file.

        Accepts overrides as kwargs.

        Args:
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            **kwargs: Pipeline configuration options. These will be passed
                to the `pipeline.configure(...)` method.
        """
        self._from_config_file = {}
        if config_path:
            self._from_config_file = self._parse_config_file(
                config_path=config_path,
                matcher=inspect.getfullargspec(self.configure)[0]
                + [
                    "model_version"
                ],  # TODO: deprecate `model_version` later on
            )

        _from_config_file = dict_utils.recursive_update(
            self._from_config_file, kwargs
        )

        with self.__suppress_configure_warnings__():
            self.configure(**_from_config_file)
configuration: PipelineConfiguration property readonly

The configuration of the pipeline.

Returns:

Type Description
PipelineConfiguration

The configuration of the pipeline.

enable_cache: Optional[bool] property readonly

If caching is enabled for the pipeline.

Returns:

Type Description
Optional[bool]

If caching is enabled for the pipeline.

invocations: Dict[str, zenml.steps.step_invocation.StepInvocation] property readonly

Returns the step invocations of this pipeline.

This dictionary will only be populated once the pipeline has been called.

Returns:

Type Description
Dict[str, zenml.steps.step_invocation.StepInvocation]

The step invocations.

is_prepared: bool property readonly

If the pipeline is prepared.

Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.

Returns:

Type Description
bool

If the pipeline is prepared.

model: PipelineResponse property readonly

Gets the registered pipeline model for this instance.

Returns:

Type Description
PipelineResponse

The registered pipeline model.

Exceptions:

Type Description
RuntimeError

If the pipeline has not been registered yet.

name: str property readonly

The name of the pipeline.

Returns:

Type Description
str

The name of the pipeline.

requires_parameters: bool property readonly

If the pipeline entrypoint requires parameters.

Returns:

Type Description
bool

If the pipeline entrypoint requires parameters.

source_code: str property readonly

The source code of this pipeline.

Returns:

Type Description
str

The source code of this pipeline.

source_object: Any property readonly

The source object of this pipeline.

Returns:

Type Description
Any

The source object of this pipeline.

__call__(self, *args, **kwargs) special

Handle a call of the pipeline.

This method does one of two things: * If there is an active pipeline context, it calls the pipeline entrypoint function within that context and the step invocations will be added to the active pipeline. * If no pipeline is active, it activates this pipeline before calling the entrypoint function.

Parameters:

Name Type Description Default
*args Any

Entrypoint function arguments.

()
**kwargs Any

Entrypoint function keyword arguments.

{}

Returns:

Type Description
Any

The outputs of the entrypoint function call.

Source code in zenml/new/pipelines/pipeline.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Handle a call of the pipeline.

    This method does one of two things:
    * If there is an active pipeline context, it calls the pipeline
      entrypoint function within that context and the step invocations
      will be added to the active pipeline.
    * If no pipeline is active, it activates this pipeline before calling
      the entrypoint function.

    Args:
        *args: Entrypoint function arguments.
        **kwargs: Entrypoint function keyword arguments.

    Returns:
        The outputs of the entrypoint function call.
    """
    if Pipeline.ACTIVE_PIPELINE:
        # Calling a pipeline inside a pipeline, we return the potential
        # outputs of the entrypoint function

        # TODO: This currently ignores the configuration of the pipeline
        #   and instead applies the configuration of the previously active
        #   pipeline. Is this what we want?
        return self.entrypoint(*args, **kwargs)

    self.prepare(*args, **kwargs)
    return self._run(**self._run_args)
__enter__(self) special

Activate the pipeline context.

Exceptions:

Type Description
RuntimeError

If a different pipeline is already active.

Returns:

Type Description
~T

The pipeline instance.

Source code in zenml/new/pipelines/pipeline.py
def __enter__(self: T) -> T:
    """Activate the pipeline context.

    Raises:
        RuntimeError: If a different pipeline is already active.

    Returns:
        The pipeline instance.
    """
    if Pipeline.ACTIVE_PIPELINE:
        raise RuntimeError(
            "Unable to enter pipeline context. A different pipeline "
            f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
        )

    Pipeline.ACTIVE_PIPELINE = self
    return self
__exit__(self, *args) special

Deactivates the pipeline context.

Parameters:

Name Type Description Default
*args Any

The arguments passed to the context exit handler.

()
Source code in zenml/new/pipelines/pipeline.py
def __exit__(self, *args: Any) -> None:
    """Deactivates the pipeline context.

    Args:
        *args: The arguments passed to the context exit handler.
    """
    Pipeline.ACTIVE_PIPELINE = None
__init__(self, name, entrypoint, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None) special

Initializes a pipeline.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
entrypoint ~F

The entrypoint function of the pipeline.

required
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
Source code in zenml/new/pipelines/pipeline.py
def __init__(
    self,
    name: str,
    entrypoint: F,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
) -> None:
    """Initializes a pipeline.

    Args:
        name: The name of the pipeline.
        entrypoint: The entrypoint function of the pipeline.
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
    """
    self._invocations: Dict[str, StepInvocation] = {}
    self._run_args: Dict[str, Any] = {}

    self._configuration = PipelineConfiguration(
        name=name,
    )
    self._from_config_file: Dict[str, Any] = {}
    with self.__suppress_configure_warnings__():
        self.configure(
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model,
        )
    self.entrypoint = entrypoint
    self._parameters: Dict[str, Any] = {}

    self.__suppress_warnings_flag__ = False
__suppress_configure_warnings__(self) special

Context manager to suppress warnings in Pipeline.configure(...).

Used to suppress warnings when called from inner code and not user-facing code.

Yields:

Type Description
Iterator[Any]

Nothing.

Source code in zenml/new/pipelines/pipeline.py
@contextmanager
def __suppress_configure_warnings__(self) -> Iterator[Any]:
    """Context manager to suppress warnings in `Pipeline.configure(...)`.

    Used to suppress warnings when called from inner code and not user-facing code.

    Yields:
        Nothing.
    """
    self.__suppress_warnings_flag__ = True
    yield
    self.__suppress_warnings_flag__ = False
add_step_invocation(self, step, input_artifacts, external_artifacts, model_artifacts_or_metadata, client_lazy_loaders, parameters, default_parameters, upstream_steps, custom_id=None, allow_id_suffix=True)

Adds a step invocation to the pipeline.

Parameters:

Name Type Description Default
step BaseStep

The step for which to add an invocation.

required
input_artifacts Dict[str, zenml.steps.entrypoint_function_utils.StepArtifact]

The input artifacts for the invocation.

required
external_artifacts Dict[str, ExternalArtifact]

The external artifacts for the invocation.

required
model_artifacts_or_metadata Dict[str, ModelVersionDataLazyLoader]

The model artifacts or metadata for the invocation.

required
client_lazy_loaders Dict[str, ClientLazyLoader]

The client lazy loaders for the invocation.

required
parameters Dict[str, Any]

The parameters for the invocation.

required
default_parameters Dict[str, Any]

The default parameters for the invocation.

required
upstream_steps Set[str]

The upstream steps for the invocation.

required
custom_id Optional[str]

Custom ID to use for the invocation.

None
allow_id_suffix bool

Whether a suffix can be appended to the invocation ID.

True

Exceptions:

Type Description
RuntimeError

If the method is called on an inactive pipeline.

RuntimeError

If the invocation was called with an artifact from a different pipeline.

Returns:

Type Description
str

The step invocation ID.

Source code in zenml/new/pipelines/pipeline.py
def add_step_invocation(
    self,
    step: "BaseStep",
    input_artifacts: Dict[str, StepArtifact],
    external_artifacts: Dict[str, "ExternalArtifact"],
    model_artifacts_or_metadata: Dict[str, "ModelVersionDataLazyLoader"],
    client_lazy_loaders: Dict[str, "ClientLazyLoader"],
    parameters: Dict[str, Any],
    default_parameters: Dict[str, Any],
    upstream_steps: Set[str],
    custom_id: Optional[str] = None,
    allow_id_suffix: bool = True,
) -> str:
    """Adds a step invocation to the pipeline.

    Args:
        step: The step for which to add an invocation.
        input_artifacts: The input artifacts for the invocation.
        external_artifacts: The external artifacts for the invocation.
        model_artifacts_or_metadata: The model artifacts or metadata for
            the invocation.
        client_lazy_loaders: The client lazy loaders for the invocation.
        parameters: The parameters for the invocation.
        default_parameters: The default parameters for the invocation.
        upstream_steps: The upstream steps for the invocation.
        custom_id: Custom ID to use for the invocation.
        allow_id_suffix: Whether a suffix can be appended to the invocation
            ID.

    Raises:
        RuntimeError: If the method is called on an inactive pipeline.
        RuntimeError: If the invocation was called with an artifact from
            a different pipeline.

    Returns:
        The step invocation ID.
    """
    if Pipeline.ACTIVE_PIPELINE != self:
        raise RuntimeError(
            "A step invocation can only be added to an active pipeline."
        )

    for artifact in input_artifacts.values():
        if artifact.pipeline is not self:
            raise RuntimeError(
                "Got invalid input artifact for invocation of step "
                f"{step.name}: The input artifact was produced by a step "
                f"inside a different pipeline {artifact.pipeline.name}."
            )

    invocation_id = self._compute_invocation_id(
        step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
    )
    invocation = StepInvocation(
        id=invocation_id,
        step=step,
        input_artifacts=input_artifacts,
        external_artifacts=external_artifacts,
        model_artifacts_or_metadata=model_artifacts_or_metadata,
        client_lazy_loaders=client_lazy_loaders,
        parameters=parameters,
        default_parameters=default_parameters,
        upstream_steps=upstream_steps,
        pipeline=self,
    )
    self._invocations[invocation_id] = invocation
    return invocation_id
build(self, settings=None, step_configurations=None, config_path=None)

Builds Docker images for the pipeline.

Parameters:

Name Type Description Default
settings Optional[Mapping[str, SettingsOrDict]]

Settings for the pipeline.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None

Returns:

Type Description
Optional[PipelineBuildResponse]

The build output.

Source code in zenml/new/pipelines/pipeline.py
def build(
    self,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponse"]:
    """Builds Docker images for the pipeline.

    Args:
        settings: Settings for the pipeline.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.

    Returns:
        The build output.
    """
    with track_handler(event=AnalyticsEvent.BUILD_PIPELINE):
        self._prepare_if_possible()
        deployment, _, _ = self._compile(
            config_path=config_path,
            steps=step_configurations,
            settings=settings,
        )
        pipeline_id = self._register().id

        local_repo = code_repository_utils.find_active_code_repository()
        code_repository = build_utils.verify_local_repository_context(
            deployment=deployment, local_repo_context=local_repo
        )

        return build_utils.create_pipeline_build(
            deployment=deployment,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )
configure(self, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None, parameters=None, merge=True)

Configures the pipeline.

Configuration merging example: * merge==True: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=True) pipeline.configuration.extra # {"key1": 1, "key2": 2} * merge==False: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=False) pipeline.configuration.extra # {"key2": 2}

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
merge bool

If True, will merge the given dictionary configurations like extra and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True
model Optional[Model]

configuration of the model version in the Model Control Plane.

None
parameters Optional[Dict[str, Any]]

input parameters for the pipeline.

None

Returns:

Type Description
~T

The pipeline instance that this method was called on.

Source code in zenml/new/pipelines/pipeline.py
def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    parameters: Optional[Dict[str, Any]] = None,
    merge: bool = True,
) -> T:
    """Configures the pipeline.

    Configuration merging example:
    * `merge==True`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=True)
        pipeline.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=False)
        pipeline.configuration.extra # {"key2": 2}

    Args:
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with a single argument of type `BaseException`, or
            a source path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can
            be a function with no arguments, or a source path to such a
            function (e.g. `module.my_function`).
        merge: If `True`, will merge the given dictionary configurations
            like `extra` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.
        model: configuration of the model version in the Model Control Plane.
        parameters: input parameters for the pipeline.

    Returns:
        The pipeline instance that this method was called on.
    """
    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this pipeline
        failure_hook_source = resolve_and_validate_hook(on_failure)

    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this pipeline
        success_hook_source = resolve_and_validate_hook(on_success)

    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "enable_step_logs": enable_step_logs,
            "settings": settings,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
            "model": model,
            "parameters": parameters,
        }
    )
    if not self.__suppress_warnings_flag__:
        to_be_reapplied = []
        for param_, value_ in values.items():
            if (
                param_ in PipelineRunConfiguration.model_fields
                and param_ in self._from_config_file
                and value_ != self._from_config_file[param_]
            ):
                to_be_reapplied.append(
                    (param_, self._from_config_file[param_], value_)
                )
        if to_be_reapplied:
            msg = ""
            reapply_during_run_warning = (
                "The value of parameter '{name}' has changed from "
                "'{file_value}' to '{new_value}' set in your configuration "
                "file.\n"
            )
            for name, file_value, new_value in to_be_reapplied:
                msg += reapply_during_run_warning.format(
                    name=name, file_value=file_value, new_value=new_value
                )
            msg += (
                "Configuration file value will be used during pipeline "
                "run, so you change will not be efficient. Consider "
                "updating your configuration file instead."
            )
            logger.warning(msg)

    config = PipelineConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
copy(self)

Copies the pipeline.

Returns:

Type Description
Pipeline

The pipeline copy.

Source code in zenml/new/pipelines/pipeline.py
def copy(self) -> "Pipeline":
    """Copies the pipeline.

    Returns:
        The pipeline copy.
    """
    return copy.deepcopy(self)
get_runs(self, **kwargs)

(Deprecated) Get runs of this pipeline.

Parameters:

Name Type Description Default
**kwargs Any

Further arguments for filtering or pagination that are passed to client.list_pipeline_runs().

{}

Returns:

Type Description
List[PipelineRunResponse]

List of runs of this pipeline.

Source code in zenml/new/pipelines/pipeline.py
def get_runs(self, **kwargs: Any) -> List["PipelineRunResponse"]:
    """(Deprecated) Get runs of this pipeline.

    Args:
        **kwargs: Further arguments for filtering or pagination that are
            passed to `client.list_pipeline_runs()`.

    Returns:
        List of runs of this pipeline.
    """
    logger.warning(
        "`Pipeline.get_runs()` is deprecated and will be removed in a "
        "future version. Please use `Pipeline.model.get_runs()` instead."
    )
    return self.model.get_runs(**kwargs)
log_pipeline_deployment_metadata(deployment_model) staticmethod

Displays logs based on the deployment model upon running a pipeline.

Parameters:

Name Type Description Default
deployment_model PipelineDeploymentResponse

The model for the pipeline deployment

required
Source code in zenml/new/pipelines/pipeline.py
@staticmethod
def log_pipeline_deployment_metadata(
    deployment_model: PipelineDeploymentResponse,
) -> None:
    """Displays logs based on the deployment model upon running a pipeline.

    Args:
        deployment_model: The model for the pipeline deployment
    """
    try:
        # Log about the schedule/run
        if deployment_model.schedule:
            logger.info(
                "Scheduling a run with the schedule: "
                f"`{deployment_model.schedule.name}`"
            )
        else:
            logger.info("Executing a new run.")

        # Log about the caching status
        if deployment_model.pipeline_configuration.enable_cache is False:
            logger.info(
                f"Caching is disabled by default for "
                f"`{deployment_model.pipeline_configuration.name}`."
            )

        # Log about the used builds
        if deployment_model.build:
            logger.info("Using a build:")
            logger.info(
                " Image(s): "
                f"{', '.join([i.image for i in deployment_model.build.images.values()])}"
            )

            # Log about version mismatches between local and build
            from zenml import __version__

            if deployment_model.build.zenml_version != __version__:
                logger.info(
                    f"ZenML version (different than the local version): "
                    f"{deployment_model.build.zenml_version}"
                )

            import platform

            if (
                deployment_model.build.python_version
                != platform.python_version()
            ):
                logger.info(
                    f"Python version (different than the local version): "
                    f"{deployment_model.build.python_version}"
                )

        # Log about the user, stack and components
        if deployment_model.user is not None:
            logger.info(f"Using user: `{deployment_model.user.name}`")

        if deployment_model.stack is not None:
            logger.info(f"Using stack: `{deployment_model.stack.name}`")

            for (
                component_type,
                component_models,
            ) in deployment_model.stack.components.items():
                logger.info(
                    f"  {component_type.value}: `{component_models[0].name}`"
                )
    except Exception as e:
        logger.debug(f"Logging pipeline deployment metadata failed: {e}")
prepare(self, *args, **kwargs)

Prepares the pipeline.

Parameters:

Name Type Description Default
*args Any

Pipeline entrypoint input arguments.

()
**kwargs Any

Pipeline entrypoint input keyword arguments.

{}

Exceptions:

Type Description
RuntimeError

If the pipeline has parameters configured differently in configuration file and code.

Source code in zenml/new/pipelines/pipeline.py
    def prepare(self, *args: Any, **kwargs: Any) -> None:
        """Prepares the pipeline.

        Args:
            *args: Pipeline entrypoint input arguments.
            **kwargs: Pipeline entrypoint input keyword arguments.

        Raises:
            RuntimeError: If the pipeline has parameters configured differently in
                configuration file and code.
        """
        # Clear existing parameters and invocations
        self._parameters = {}
        self._invocations = {}

        conflicting_parameters = {}
        parameters_ = (self.configuration.parameters or {}).copy()
        if from_file_ := self._from_config_file.get("parameters", None):
            parameters_ = dict_utils.recursive_update(parameters_, from_file_)
        if parameters_:
            for k, v_runtime in kwargs.items():
                if k in parameters_:
                    v_config = parameters_[k]
                    if v_config != v_runtime:
                        conflicting_parameters[k] = (v_config, v_runtime)
            if conflicting_parameters:
                is_plural = "s" if len(conflicting_parameters) > 1 else ""
                msg = f"Configured parameter{is_plural} for the pipeline `{self.name}` conflict{'' if not is_plural else 's'} with parameter{is_plural} passed in runtime:\n"
                for key, values in conflicting_parameters.items():
                    msg += f"`{key}`: config=`{values[0]}` | runtime=`{values[1]}`\n"
                msg += """This happens, if you define values for pipeline parameters in configuration file and pass same parameters from the code. Example:
```
# config.yaml
    parameters:
        param_name: value1


# pipeline.py
@pipeline
def pipeline_(param_name: str):
    step_name()

if __name__=="__main__":
    pipeline_.with_options(config_file="config.yaml")(param_name="value2")
```
To avoid this consider setting pipeline parameters only in one place (config or code).
"""
                raise RuntimeError(msg)
            for k, v_config in parameters_.items():
                if k not in kwargs:
                    kwargs[k] = v_config

        with self:
            # Enter the context manager, so we become the active pipeline. This
            # means that all steps that get called while the entrypoint function
            # is executed will be added as invocation to this pipeline instance.
            self._call_entrypoint(*args, **kwargs)
register(self)

Register the pipeline in the server.

Returns:

Type Description
PipelineResponse

The registered pipeline model.

Source code in zenml/new/pipelines/pipeline.py
def register(self) -> "PipelineResponse":
    """Register the pipeline in the server.

    Returns:
        The registered pipeline model.
    """
    # Activating the built-in integrations to load all materializers
    from zenml.integrations.registry import integration_registry

    self._prepare_if_possible()
    integration_registry.activate_integrations()

    if self.configuration.model_dump(
        exclude_defaults=True, exclude={"name"}
    ):
        logger.warning(
            f"The pipeline `{self.name}` that you're registering has "
            "custom configurations applied to it. These will not be "
            "registered with the pipeline and won't be set when you build "
            "images or run the pipeline from the CLI. To provide these "
            "configurations, use the `--config` option of the `zenml "
            "pipeline build/run` commands."
        )

    return self._register()
resolve(self)

Resolves the pipeline.

Returns:

Type Description
Source

The pipeline source.

Source code in zenml/new/pipelines/pipeline.py
def resolve(self) -> "Source":
    """Resolves the pipeline.

    Returns:
        The pipeline source.
    """
    return source_utils.resolve(self.entrypoint, skip_validation=True)
with_options(self, run_name=None, schedule=None, build=None, step_configurations=None, steps=None, config_path=None, unlisted=False, prevent_build_reuse=False, **kwargs)

Copies the pipeline and applies the given configurations.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
schedule Optional[zenml.config.schedule.Schedule]

Optional schedule to use for the run.

None
build Union[str, UUID, PipelineBuildBase]

Optional build to use for the run.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
steps Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline. This is equivalent to step_configurations, and will be ignored if step_configurations is set as well.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None
unlisted bool

Whether the pipeline run should be unlisted (not assigned to any pipeline).

False
prevent_build_reuse bool

DEPRECATED: Use DockerSettings.prevent_build_reuse instead.

False
**kwargs Any

Pipeline configuration options. These will be passed to the pipeline.configure(...) method.

{}

Returns:

Type Description
Pipeline

The copied pipeline instance.

Source code in zenml/new/pipelines/pipeline.py
def with_options(
    self,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBase", None] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    steps: Optional[Mapping[str, "StepConfigurationUpdateOrDict"]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
    prevent_build_reuse: bool = False,
    **kwargs: Any,
) -> "Pipeline":
    """Copies the pipeline and applies the given configurations.

    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        step_configurations: Configurations for steps of the pipeline.
        steps: Configurations for steps of the pipeline. This is equivalent
            to `step_configurations`, and will be ignored if
            `step_configurations` is set as well.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
        prevent_build_reuse: DEPRECATED: Use
            `DockerSettings.prevent_build_reuse` instead.
        **kwargs: Pipeline configuration options. These will be passed
            to the `pipeline.configure(...)` method.

    Returns:
        The copied pipeline instance.
    """
    if steps and step_configurations:
        logger.warning(
            "Step configurations were passed using both the "
            "`step_configurations` and `steps` keywords, ignoring the "
            "values passed using the `steps` keyword."
        )

    pipeline_copy = self.copy()

    pipeline_copy._reconfigure_from_file_with_overrides(
        config_path=config_path, **kwargs
    )

    run_args = dict_utils.remove_none_values(
        {
            "run_name": run_name,
            "schedule": schedule,
            "build": build,
            "step_configurations": step_configurations or steps,
            "config_path": config_path,
            "unlisted": unlisted,
            "prevent_build_reuse": prevent_build_reuse,
        }
    )
    pipeline_copy._run_args.update(run_args)
    return pipeline_copy
write_run_configuration_template(self, path, stack=None)

Writes a run configuration yaml template.

Parameters:

Name Type Description Default
path str

The path where the template will be written.

required
stack Optional[Stack]

The stack for which the template should be generated. If not given, the active stack will be used.

None
Source code in zenml/new/pipelines/pipeline.py
def write_run_configuration_template(
    self, path: str, stack: Optional["Stack"] = None
) -> None:
    """Writes a run configuration yaml template.

    Args:
        path: The path where the template will be written.
        stack: The stack for which the template should be generated. If
            not given, the active stack will be used.
    """
    from zenml.config.base_settings import ConfigurationLevel
    from zenml.config.step_configurations import (
        PartialArtifactConfiguration,
    )

    self._prepare_if_possible()

    stack = stack or Client().active_stack

    setting_classes = stack.setting_classes
    setting_classes.update(settings_utils.get_general_settings())

    pipeline_settings = {}
    step_settings = {}
    for key, setting_class in setting_classes.items():
        fields = pydantic_utils.TemplateGenerator(setting_class).run()
        if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
            pipeline_settings[key] = fields
        if ConfigurationLevel.STEP in setting_class.LEVEL:
            step_settings[key] = fields

    steps = {}
    for step_name, invocation in self.invocations.items():
        step = invocation.step
        parameters = (
            pydantic_utils.TemplateGenerator(
                step.entrypoint_definition.legacy_params.annotation
            ).run()
            if step.entrypoint_definition.legacy_params
            else {}
        )
        outputs = {
            name: PartialArtifactConfiguration()
            for name in step.entrypoint_definition.outputs
        }
        step_template = StepConfigurationUpdate(
            parameters=parameters,
            settings=step_settings,
            outputs=outputs,
        )
        steps[step_name] = step_template

    run_config = PipelineRunConfiguration(
        settings=pipeline_settings, steps=steps
    )
    template = pydantic_utils.TemplateGenerator(run_config).run()
    yaml_string = yaml.dump(template)
    yaml_string = yaml_utils.comment_out_yaml(yaml_string)

    with open(path, "w") as f:
        f.write(yaml_string)

pipeline_context

Pipeline context class.

PipelineContext

Provides pipeline configuration context.

Usage example:

from zenml import get_pipeline_context

...

@pipeline(
    extra={
        "complex_parameter": [
            ("sklearn.tree", "DecisionTreeClassifier"),
            ("sklearn.ensemble", "RandomForestClassifier"),
        ]
    }
)
def my_pipeline():
    context = get_pipeline_context()

    after = []
    search_steps_prefix = "hp_tuning_search_"
    for i, model_search_configuration in enumerate(
        context.extra["complex_parameter"]
    ):
        step_name = f"{search_steps_prefix}{i}"
        cross_validation(
            model_package=model_search_configuration[0],
            model_class=model_search_configuration[1],
            id=step_name
        )
        after.append(step_name)
    select_best_model(
        search_steps_prefix=search_steps_prefix,
        after=after,
    )
Source code in zenml/new/pipelines/pipeline_context.py
class PipelineContext:
    """Provides pipeline configuration context.

    Usage example:

    ```python
    from zenml import get_pipeline_context

    ...

    @pipeline(
        extra={
            "complex_parameter": [
                ("sklearn.tree", "DecisionTreeClassifier"),
                ("sklearn.ensemble", "RandomForestClassifier"),
            ]
        }
    )
    def my_pipeline():
        context = get_pipeline_context()

        after = []
        search_steps_prefix = "hp_tuning_search_"
        for i, model_search_configuration in enumerate(
            context.extra["complex_parameter"]
        ):
            step_name = f"{search_steps_prefix}{i}"
            cross_validation(
                model_package=model_search_configuration[0],
                model_class=model_search_configuration[1],
                id=step_name
            )
            after.append(step_name)
        select_best_model(
            search_steps_prefix=search_steps_prefix,
            after=after,
        )
    ```
    """

    def __init__(self, pipeline_configuration: "PipelineConfiguration"):
        """Initialize the context of the current pipeline.

        Args:
            pipeline_configuration: The configuration of the pipeline derived
                from Pipeline class.
        """
        self.name = pipeline_configuration.name
        self.enable_cache = pipeline_configuration.enable_cache
        self.enable_artifact_metadata = (
            pipeline_configuration.enable_artifact_metadata
        )
        self.enable_artifact_visualization = (
            pipeline_configuration.enable_artifact_visualization
        )
        self.enable_step_logs = pipeline_configuration.enable_step_logs
        self.settings = pipeline_configuration.settings
        self.extra = pipeline_configuration.extra
        self.model = pipeline_configuration.model
        self._model_version = pipeline_configuration.model

    # TODO: deprecate me
    @property
    def model_version(self) -> Optional["Model"]:
        """DEPRECATED, use `model` instead.

        Returns:
            The `Model` object associated with the current pipeline.
        """
        logger.warning(
            "Pipeline context `model_version` is deprecated. Please use `model` instead."
        )
        return self.model
model_version: Optional[Model] property readonly

DEPRECATED, use model instead.

Returns:

Type Description
Optional[Model]

The Model object associated with the current pipeline.

__init__(self, pipeline_configuration) special

Initialize the context of the current pipeline.

Parameters:

Name Type Description Default
pipeline_configuration PipelineConfiguration

The configuration of the pipeline derived from Pipeline class.

required
Source code in zenml/new/pipelines/pipeline_context.py
def __init__(self, pipeline_configuration: "PipelineConfiguration"):
    """Initialize the context of the current pipeline.

    Args:
        pipeline_configuration: The configuration of the pipeline derived
            from Pipeline class.
    """
    self.name = pipeline_configuration.name
    self.enable_cache = pipeline_configuration.enable_cache
    self.enable_artifact_metadata = (
        pipeline_configuration.enable_artifact_metadata
    )
    self.enable_artifact_visualization = (
        pipeline_configuration.enable_artifact_visualization
    )
    self.enable_step_logs = pipeline_configuration.enable_step_logs
    self.settings = pipeline_configuration.settings
    self.extra = pipeline_configuration.extra
    self.model = pipeline_configuration.model
    self._model_version = pipeline_configuration.model
get_pipeline_context()

Get the context of the current pipeline.

Returns:

Type Description
PipelineContext

The context of the current pipeline.

Exceptions:

Type Description
RuntimeError

If no active pipeline is found.

RuntimeError

If inside a running step.

Source code in zenml/new/pipelines/pipeline_context.py
def get_pipeline_context() -> "PipelineContext":
    """Get the context of the current pipeline.

    Returns:
        The context of the current pipeline.

    Raises:
        RuntimeError: If no active pipeline is found.
        RuntimeError: If inside a running step.
    """
    from zenml.new.pipelines.pipeline import Pipeline

    if Pipeline.ACTIVE_PIPELINE is None:
        try:
            from zenml.new.steps.step_context import get_step_context

            get_step_context()
        except RuntimeError:
            raise RuntimeError("No active pipeline found.")
        else:
            raise RuntimeError(
                "Inside a step use `from zenml import get_step_context` "
                "instead."
            )

    return PipelineContext(
        pipeline_configuration=Pipeline.ACTIVE_PIPELINE.configuration
    )

pipeline_decorator

ZenML pipeline decorator definition.

pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_step_logs=None, settings=None, extra=None, on_failure=None, on_success=None, model=None, model_version=None)

Decorator to create a pipeline.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
enable_step_logs Optional[bool]

If step logs should be enabled for this pipeline.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
model_version Optional[Model]

DEPRECATED, please use model instead.

None

Returns:

Type Description
Union[Pipeline, Callable[[F], Pipeline]]

A pipeline instance.

Source code in zenml/new/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    model_version: Optional["Model"] = None,  # TODO: deprecate me
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
    """Decorator to create a pipeline.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_step_logs: If step logs should be enabled for this pipeline.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        model_version: DEPRECATED, please use `model` instead.

    Returns:
        A pipeline instance.
    """

    def inner_decorator(func: "F") -> "Pipeline":
        from zenml.new.pipelines.pipeline import Pipeline

        if model_version:
            logger.warning(
                "Pipeline decorator argument `model_version` is deprecated. Please use `model` instead."
            )

        p = Pipeline(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_step_logs=enable_step_logs,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model or model_version,
            entrypoint=func,
        )

        p.__doc__ = func.__doc__
        return p

    return inner_decorator if _func is None else inner_decorator(_func)

run_utils

Utility functions for running pipelines.

create_placeholder_run(deployment)

Create a placeholder run for the deployment.

If the deployment contains a schedule, no placeholder run will be created.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment for which to create the placeholder run.

required

Returns:

Type Description
Optional[PipelineRunResponse]

The placeholder run or None if no run was created.

Source code in zenml/new/pipelines/run_utils.py
def create_placeholder_run(
    deployment: "PipelineDeploymentResponse",
) -> Optional["PipelineRunResponse"]:
    """Create a placeholder run for the deployment.

    If the deployment contains a schedule, no placeholder run will be
    created.

    Args:
        deployment: The deployment for which to create the placeholder run.

    Returns:
        The placeholder run or `None` if no run was created.
    """
    assert deployment.user

    if deployment.schedule:
        return None

    run_request = PipelineRunRequest(
        name=get_run_name(run_name_template=deployment.run_name_template),
        # We set the start time on the placeholder run already to
        # make it consistent with the {time} placeholder in the
        # run name. This means the placeholder run will usually
        # have longer durations than scheduled runs, as for them
        # the start_time is only set once the first step starts
        # running.
        start_time=datetime.utcnow(),
        orchestrator_run_id=None,
        user=deployment.user.id,
        workspace=deployment.workspace.id,
        deployment=deployment.id,
        pipeline=deployment.pipeline.id if deployment.pipeline else None,
        status=ExecutionStatus.INITIALIZING,
    )
    return Client().zen_store.create_run(run_request)
deploy_pipeline(deployment, stack, placeholder_run=None)

Run a deployment.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment to run.

required
stack Stack

The stack on which to run the deployment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment. This will be deleted in case the pipeline deployment failed.

None

Exceptions:

Type Description
Exception

Any exception that happened while deploying or running (in case it happens synchronously) the pipeline.

Source code in zenml/new/pipelines/run_utils.py
def deploy_pipeline(
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> None:
    """Run a deployment.

    Args:
        deployment: The deployment to run.
        stack: The stack on which to run the deployment.
        placeholder_run: An optional placeholder run for the deployment. This
            will be deleted in case the pipeline deployment failed.

    Raises:
        Exception: Any exception that happened while deploying or running
            (in case it happens synchronously) the pipeline.
    """
    stack.prepare_pipeline_deployment(deployment=deployment)

    # Prevent execution of nested pipelines which might lead to
    # unexpected behavior
    previous_value = constants.SHOULD_PREVENT_PIPELINE_EXECUTION
    constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
    try:
        stack.deploy_pipeline(deployment=deployment)
    except Exception as e:
        if (
            placeholder_run
            and Client().get_pipeline_run(placeholder_run.id).status
            == ExecutionStatus.INITIALIZING
        ):
            # The run hasn't actually started yet, which means that we
            # failed during initialization -> We don't want the
            # placeholder run to stay in the database
            Client().delete_pipeline_run(placeholder_run.id)

        raise e
    finally:
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = previous_value
get_all_sources_from_value(value)

Get all source objects from a value.

Parameters:

Name Type Description Default
value Any

The value from which to get all the source objects.

required

Returns:

Type Description
List[zenml.config.source.Source]

List of source objects for the given value.

Source code in zenml/new/pipelines/run_utils.py
def get_all_sources_from_value(value: Any) -> List[Source]:
    """Get all source objects from a value.

    Args:
        value: The value from which to get all the source objects.

    Returns:
        List of source objects for the given value.
    """
    sources = []
    if isinstance(value, Source):
        sources.append(value)
    elif isinstance(value, BaseModel):
        for v in value.__dict__.values():
            sources.extend(get_all_sources_from_value(v))
    elif isinstance(value, Dict):
        for v in value.values():
            sources.extend(get_all_sources_from_value(v))
    elif isinstance(value, (List, Set, tuple)):
        for v in value:
            sources.extend(get_all_sources_from_value(v))

    return sources
get_default_run_name(pipeline_name)

Gets the default name for a pipeline run.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline which will be run.

required

Returns:

Type Description
str

Run name.

Source code in zenml/new/pipelines/run_utils.py
def get_default_run_name(pipeline_name: str) -> str:
    """Gets the default name for a pipeline run.

    Args:
        pipeline_name: Name of the pipeline which will be run.

    Returns:
        Run name.
    """
    return f"{pipeline_name}-{{date}}-{{time}}"
get_placeholder_run(deployment_id)

Get the placeholder run for a deployment.

Parameters:

Name Type Description Default
deployment_id UUID

ID of the deployment for which to get the placeholder run.

required

Returns:

Type Description
Optional[PipelineRunResponse]

The placeholder run or None if there exists no placeholder run for the deployment.

Source code in zenml/new/pipelines/run_utils.py
def get_placeholder_run(
    deployment_id: UUID,
) -> Optional["PipelineRunResponse"]:
    """Get the placeholder run for a deployment.

    Args:
        deployment_id: ID of the deployment for which to get the placeholder
            run.

    Returns:
        The placeholder run or `None` if there exists no placeholder run for the
        deployment.
    """
    runs = Client().list_pipeline_runs(
        sort_by="asc:created",
        size=1,
        deployment_id=deployment_id,
        status=ExecutionStatus.INITIALIZING,
    )
    if len(runs.items) == 0:
        return None

    run = runs.items[0]
    if run.orchestrator_run_id is None:
        return run

    return None
upload_notebook_cell_code_if_necessary(deployment, stack)

Upload notebook cell code if necessary.

This function checks if any of the steps of the pipeline that will be executed in a different process are defined in a notebook. If that is the case, it will extract that notebook cell code into python files and upload an archive of all the necessary files to the artifact store.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The deployment.

required
stack Stack

The stack on which the deployment will happen.

required

Exceptions:

Type Description
RuntimeError

If the code for one of the steps that will run out of process cannot be extracted into a python file.

Source code in zenml/new/pipelines/run_utils.py
def upload_notebook_cell_code_if_necessary(
    deployment: "PipelineDeploymentBase", stack: "Stack"
) -> None:
    """Upload notebook cell code if necessary.

    This function checks if any of the steps of the pipeline that will be
    executed in a different process are defined in a notebook. If that is the
    case, it will extract that notebook cell code into python files and upload
    an archive of all the necessary files to the artifact store.

    Args:
        deployment: The deployment.
        stack: The stack on which the deployment will happen.

    Raises:
        RuntimeError: If the code for one of the steps that will run out of
            process cannot be extracted into a python file.
    """
    should_upload = False
    resolved_notebook_sources = source_utils.get_resolved_notebook_sources()

    for step in deployment.step_configurations.values():
        source = step.spec.source

        if source.type == SourceType.NOTEBOOK:
            if (
                stack.orchestrator.flavor != "local"
                or step.config.step_operator
            ):
                should_upload = True
                cell_code = resolved_notebook_sources.get(
                    source.import_path, None
                )

                # Code does not run in-process, which means we need to
                # extract the step code into a python file
                if not cell_code:
                    raise RuntimeError(
                        f"Unable to run step {step.config.name}. This step is "
                        "defined in a notebook and you're trying to run it "
                        "in a remote environment, but ZenML was not able to "
                        "detect the step code in the notebook. To fix "
                        "this error, define your step in a python file instead "
                        "of a notebook."
                    )

    if should_upload:
        logger.info("Uploading notebook code...")

        for _, cell_code in resolved_notebook_sources.items():
            notebook_utils.warn_about_notebook_cell_magic_commands(
                cell_code=cell_code
            )
            module_name = notebook_utils.compute_cell_replacement_module_name(
                cell_code=cell_code
            )
            file_name = f"{module_name}.py"

            code_utils.upload_notebook_code(
                artifact_store=stack.artifact_store,
                cell_code=cell_code,
                file_name=file_name,
            )

        all_deployment_sources = get_all_sources_from_value(deployment)

        for source in all_deployment_sources:
            if source.type == SourceType.NOTEBOOK:
                setattr(source, "artifact_store_id", stack.artifact_store.id)

        logger.info("Upload finished.")
validate_run_config_is_runnable_from_server(run_configuration)

Validates that the run configuration can be used to run from the server.

Parameters:

Name Type Description Default
run_configuration PipelineRunConfiguration

The run configuration to validate.

required

Exceptions:

Type Description
ValueError

If there are values in the run configuration that are not allowed when running a pipeline from the server.

Source code in zenml/new/pipelines/run_utils.py
def validate_run_config_is_runnable_from_server(
    run_configuration: "PipelineRunConfiguration",
) -> None:
    """Validates that the run configuration can be used to run from the server.

    Args:
        run_configuration: The run configuration to validate.

    Raises:
        ValueError: If there are values in the run configuration that are not
            allowed when running a pipeline from the server.
    """
    if run_configuration.parameters:
        raise ValueError(
            "Can't set parameters when running pipeline via Rest API."
        )

    if run_configuration.build:
        raise ValueError("Can't set build when running pipeline via Rest API.")

    if run_configuration.schedule:
        raise ValueError(
            "Can't set schedule when running pipeline via Rest API."
        )

    if run_configuration.settings.get("docker"):
        raise ValueError(
            "Can't set DockerSettings when running pipeline via Rest API."
        )

    for step_update in run_configuration.steps.values():
        if step_update.settings.get("docker"):
            raise ValueError(
                "Can't set DockerSettings when running pipeline via Rest API."
            )
validate_stack_is_runnable_from_server(zen_store, stack)

Validate if a stack model is runnable from the server.

Parameters:

Name Type Description Default
zen_store BaseZenStore

ZenStore to use for listing flavors.

required
stack StackResponse

The stack to validate.

required

Exceptions:

Type Description
ValueError

If the stack has components of a custom flavor or local components.

Source code in zenml/new/pipelines/run_utils.py
def validate_stack_is_runnable_from_server(
    zen_store: BaseZenStore, stack: StackResponse
) -> None:
    """Validate if a stack model is runnable from the server.

    Args:
        zen_store: ZenStore to use for listing flavors.
        stack: The stack to validate.

    Raises:
        ValueError: If the stack has components of a custom flavor or local
            components.
    """
    for component_list in stack.components.values():
        assert len(component_list) == 1
        component = component_list[0]
        flavors = zen_store.list_flavors(
            FlavorFilter(name=component.flavor, type=component.type)
        )
        assert len(flavors) == 1
        flavor_model = flavors[0]

        if flavor_model.workspace is not None:
            raise ValueError("No custom stack component flavors allowed.")

        flavor = Flavor.from_model(flavor_model)
        component_config = flavor.config_class(**component.configuration)

        if component_config.is_local:
            raise ValueError("No local stack components allowed.")
wait_for_pipeline_run_to_finish(run_id)

Waits until a pipeline run is finished.

Parameters:

Name Type Description Default
run_id UUID

ID of the run for which to wait.

required

Returns:

Type Description
PipelineRunResponse

Model of the finished run.

Source code in zenml/new/pipelines/run_utils.py
def wait_for_pipeline_run_to_finish(run_id: UUID) -> "PipelineRunResponse":
    """Waits until a pipeline run is finished.

    Args:
        run_id: ID of the run for which to wait.

    Returns:
        Model of the finished run.
    """
    sleep_interval = 1
    max_sleep_interval = 64

    while True:
        run = Client().get_pipeline_run(run_id)

        if run.status.is_finished:
            return run

        logger.info(
            "Waiting for pipeline run with ID %s to finish (current status: %s)",
            run_id,
            run.status,
        )
        time.sleep(sleep_interval)
        if sleep_interval < max_sleep_interval:
            sleep_interval *= 2

steps special

decorated_step

Internal BaseStep subclass used by the step decorator.

step_context

Step context class.

StepContext

Provides additional context inside a step function.

This singleton class is used to access information about the current run, step run, or its outputs inside a step function.

Usage example:

from zenml.steps import get_step_context

@step
def my_trainer_step() -> Any:
    context = get_step_context()

    # get info about the current pipeline run
    current_pipeline_run = context.pipeline_run

    # get info about the current step run
    current_step_run = context.step_run

    # get info about the future output artifacts of this step
    output_artifact_uri = context.get_output_artifact_uri()

    ...
Source code in zenml/new/steps/step_context.py
class StepContext(metaclass=SingletonMetaClass):
    """Provides additional context inside a step function.

    This singleton class is used to access information about the current run,
    step run, or its outputs inside a step function.

    Usage example:

    ```python
    from zenml.steps import get_step_context

    @step
    def my_trainer_step() -> Any:
        context = get_step_context()

        # get info about the current pipeline run
        current_pipeline_run = context.pipeline_run

        # get info about the current step run
        current_step_run = context.step_run

        # get info about the future output artifacts of this step
        output_artifact_uri = context.get_output_artifact_uri()

        ...
    ```
    """

    def __init__(
        self,
        pipeline_run: "PipelineRunResponse",
        step_run: "StepRunResponse",
        output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
        output_artifact_uris: Mapping[str, str],
        output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
        step_run_info: "StepRunInfo",
        cache_enabled: bool,
    ) -> None:
        """Initialize the context of the currently running step.

        Args:
            pipeline_run: The model of the current pipeline run.
            step_run: The model of the current step run.
            output_materializers: The output materializers of the step that
                this context is used in.
            output_artifact_uris: The output artifacts of the step that this
                context is used in.
            output_artifact_configs: The outputs' ArtifactConfigs of the step that this
                context is used in.
            step_run_info: (Deprecated) info about the currently running step.
            cache_enabled: (Deprecated) Whether caching is enabled for the step.

        Raises:
            StepContextError: If the keys of the output materializers and
                output artifacts do not match.
        """
        from zenml.client import Client

        try:
            pipeline_run = Client().get_pipeline_run(pipeline_run.id)
        except KeyError:
            pass
        self.pipeline_run = pipeline_run
        try:
            step_run = Client().get_run_step(step_run.id)
        except KeyError:
            pass
        self.step_run = step_run
        self._step_run_info = step_run_info
        self._cache_enabled = cache_enabled

        # Get the stack that we are running in
        self._stack = Client().active_stack

        self.step_name = self.step_run.name

        # set outputs
        if output_materializers.keys() != output_artifact_uris.keys():
            raise StepContextError(
                f"Mismatched keys in output materializers and output artifact "
                f"URIs for step `{self.step_name}`. Output materializer "
                f"keys: {set(output_materializers)}, output artifact URI "
                f"keys: {set(output_artifact_uris)}"
            )
        self._outputs = {
            key: StepContextOutput(
                materializer_classes=output_materializers[key],
                artifact_uri=output_artifact_uris[key],
                artifact_config=output_artifact_configs[key],
            )
            for key in output_materializers.keys()
        }

    @property
    def pipeline(self) -> "PipelineResponse":
        """Returns the current pipeline.

        Returns:
            The current pipeline or None.

        Raises:
            StepContextError: If the pipeline run does not have a pipeline.
        """
        if self.pipeline_run.pipeline:
            return self.pipeline_run.pipeline
        raise StepContextError(
            f"Unable to get pipeline in step `{self.step_name}` of pipeline "
            f"run '{self.pipeline_run.id}': This pipeline run does not have "
            f"a pipeline associated with it."
        )

    @property
    def model(self) -> "Model":
        """Returns configured Model.

        Order of resolution to search for Model is:
            1. Model from @step
            2. Model from @pipeline

        Returns:
            The `Model` object associated with the current step.

        Raises:
            StepContextError: If the `Model` object is not set in `@step` or `@pipeline`.
        """
        if (
            self.step_run.config.model is not None
            and self.step_run.model_version is not None
        ):
            model = self.step_run.model_version.to_model_class()
        elif self.pipeline_run.config.model is not None:
            if self.pipeline_run.model_version:
                model = self.pipeline_run.model_version.to_model_class()
            else:
                model = self.pipeline_run.config.model
        else:
            raise StepContextError(
                f"Unable to get Model in step `{self.step_name}` of pipeline "
                f"run '{self.pipeline_run.id}': it was not set in `@step` or `@pipeline`."
            )

        return model

    # TODO: deprecate me
    @property
    def model_version(self) -> "Model":
        """DEPRECATED, use `model` instead.

        Returns:
            The `Model` object associated with the current step.
        """
        logger.warning(
            "Step context `model_version` is deprecated. Please use `model` instead."
        )
        return self.model

    @property
    def inputs(self) -> Dict[str, "ArtifactVersionResponse"]:
        """Returns the input artifacts of the current step.

        Returns:
            The input artifacts of the current step.
        """
        return self.step_run.inputs

    def _get_output(
        self, output_name: Optional[str] = None
    ) -> "StepContextOutput":
        """Returns the materializer and artifact URI for a given step output.

        Args:
            output_name: Optional name of the output for which to get the
                materializer and URI.

        Returns:
            Tuple containing the materializer and artifact URI for the
                given output.

        Raises:
            StepContextError: If the step has no outputs, no output for
                the given `output_name` or if no `output_name` was given but
                the step has multiple outputs.
        """
        output_count = len(self._outputs)
        if output_count == 0:
            raise StepContextError(
                f"Unable to get step output for step `{self.step_name}`: "
                f"This step does not have any outputs."
            )

        if not output_name and output_count > 1:
            raise StepContextError(
                f"Unable to get step output for step `{self.step_name}`: "
                f"This step has multiple outputs ({set(self._outputs)}), "
                f"please specify which output to return."
            )

        if output_name:
            if output_name not in self._outputs:
                raise StepContextError(
                    f"Unable to get step output '{output_name}' for "
                    f"step `{self.step_name}`. This step does not have an "
                    f"output with the given name, please specify one of the "
                    f"available outputs: {set(self._outputs)}."
                )
            return self._outputs[output_name]
        else:
            return next(iter(self._outputs.values()))

    def get_output_materializer(
        self,
        output_name: Optional[str] = None,
        custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
        data_type: Optional[Type[Any]] = None,
    ) -> "BaseMaterializer":
        """Returns a materializer for a given step output.

        Args:
            output_name: Optional name of the output for which to get the
                materializer. If no name is given and the step only has a
                single output, the materializer of this output will be
                returned. If the step has multiple outputs, an exception
                will be raised.
            custom_materializer_class: If given, this `BaseMaterializer`
                subclass will be initialized with the output artifact instead
                of the materializer that was registered for this step output.
            data_type: If the output annotation is of type `Union` and the step
                therefore has multiple materializers configured, you can provide
                a data type for the output which will be used to select the
                correct materializer. If not provided, the first materializer
                will be used.

        Returns:
            A materializer initialized with the output artifact for
            the given output.
        """
        from zenml.utils import materializer_utils

        output = self._get_output(output_name)
        materializer_classes = output.materializer_classes
        artifact_uri = output.artifact_uri

        if custom_materializer_class:
            materializer_class = custom_materializer_class
        elif len(materializer_classes) == 1 or not data_type:
            materializer_class = materializer_classes[0]
        else:
            materializer_class = materializer_utils.select_materializer(
                data_type=data_type, materializer_classes=materializer_classes
            )

        return materializer_class(artifact_uri)

    def get_output_artifact_uri(
        self, output_name: Optional[str] = None
    ) -> str:
        """Returns the artifact URI for a given step output.

        Args:
            output_name: Optional name of the output for which to get the URI.
                If no name is given and the step only has a single output,
                the URI of this output will be returned. If the step has
                multiple outputs, an exception will be raised.

        Returns:
            Artifact URI for the given output.
        """
        return self._get_output(output_name).artifact_uri

    def get_output_metadata(
        self, output_name: Optional[str] = None
    ) -> Dict[str, "MetadataType"]:
        """Returns the metadata for a given step output.

        Args:
            output_name: Optional name of the output for which to get the
                metadata. If no name is given and the step only has a single
                output, the metadata of this output will be returned. If the
                step has multiple outputs, an exception will be raised.

        Returns:
            Metadata for the given output.
        """
        output = self._get_output(output_name)
        custom_metadata = output.run_metadata or {}
        if output.artifact_config:
            custom_metadata.update(
                **(output.artifact_config.run_metadata or {})
            )
        return custom_metadata

    def get_output_tags(self, output_name: Optional[str] = None) -> List[str]:
        """Returns the tags for a given step output.

        Args:
            output_name: Optional name of the output for which to get the
                metadata. If no name is given and the step only has a single
                output, the metadata of this output will be returned. If the
                step has multiple outputs, an exception will be raised.

        Returns:
            Tags for the given output.
        """
        output = self._get_output(output_name)
        custom_tags = set(output.tags or [])
        if output.artifact_config:
            return list(
                set(output.artifact_config.tags or []).union(custom_tags)
            )
        return list(custom_tags)

    def add_output_metadata(
        self,
        metadata: Dict[str, "MetadataType"],
        output_name: Optional[str] = None,
    ) -> None:
        """Adds metadata for a given step output.

        Args:
            metadata: The metadata to add.
            output_name: Optional name of the output for which to add the
                metadata. If no name is given and the step only has a single
                output, the metadata of this output will be added. If the
                step has multiple outputs, an exception will be raised.
        """
        output = self._get_output(output_name)
        if not output.run_metadata:
            output.run_metadata = {}
        output.run_metadata.update(**metadata)

    def add_output_tags(
        self,
        tags: List[str],
        output_name: Optional[str] = None,
    ) -> None:
        """Adds tags for a given step output.

        Args:
            tags: The tags to add.
            output_name: Optional name of the output for which to add the
                tags. If no name is given and the step only has a single
                output, the tags of this output will be added. If the
                step has multiple outputs, an exception will be raised.
        """
        output = self._get_output(output_name)
        if not output.tags:
            output.tags = []
        output.tags += tags

    def _set_artifact_config(
        self,
        artifact_config: "ArtifactConfig",
        output_name: Optional[str] = None,
    ) -> None:
        """Adds artifact config for a given step output.

        Args:
            artifact_config: The artifact config of the output to set.
            output_name: Optional name of the output for which to set the
                output signature. If no name is given and the step only has a single
                output, the metadata of this output will be added. If the
                step has multiple outputs, an exception will be raised.

        Raises:
            EntityExistsError: If the output already has an output signature.
        """
        output = self._get_output(output_name)

        if output.artifact_config is None:
            output.artifact_config = artifact_config
        else:
            raise EntityExistsError(
                f"Output with name '{output_name}' already has artifact config."
            )
inputs: Dict[str, ArtifactVersionResponse] property readonly

Returns the input artifacts of the current step.

Returns:

Type Description
Dict[str, ArtifactVersionResponse]

The input artifacts of the current step.

model: Model property readonly

Returns configured Model.

Order of resolution to search for Model is: 1. Model from @step 2. Model from @pipeline

Returns:

Type Description
Model

The Model object associated with the current step.

Exceptions:

Type Description
StepContextError

If the Model object is not set in @step or @pipeline.

model_version: Model property readonly

DEPRECATED, use model instead.

Returns:

Type Description
Model

The Model object associated with the current step.

pipeline: PipelineResponse property readonly

Returns the current pipeline.

Returns:

Type Description
PipelineResponse

The current pipeline or None.

Exceptions:

Type Description
StepContextError

If the pipeline run does not have a pipeline.

__init__(self, pipeline_run, step_run, output_materializers, output_artifact_uris, output_artifact_configs, step_run_info, cache_enabled) special

Initialize the context of the currently running step.

Parameters:

Name Type Description Default
pipeline_run PipelineRunResponse

The model of the current pipeline run.

required
step_run StepRunResponse

The model of the current step run.

required
output_materializers Mapping[str, Sequence[Type[BaseMaterializer]]]

The output materializers of the step that this context is used in.

required
output_artifact_uris Mapping[str, str]

The output artifacts of the step that this context is used in.

required
output_artifact_configs Mapping[str, Optional[ArtifactConfig]]

The outputs' ArtifactConfigs of the step that this context is used in.

required
step_run_info StepRunInfo

(Deprecated) info about the currently running step.

required
cache_enabled bool

(Deprecated) Whether caching is enabled for the step.

required

Exceptions:

Type Description
StepContextError

If the keys of the output materializers and output artifacts do not match.

Source code in zenml/new/steps/step_context.py
def __init__(
    self,
    pipeline_run: "PipelineRunResponse",
    step_run: "StepRunResponse",
    output_materializers: Mapping[str, Sequence[Type["BaseMaterializer"]]],
    output_artifact_uris: Mapping[str, str],
    output_artifact_configs: Mapping[str, Optional["ArtifactConfig"]],
    step_run_info: "StepRunInfo",
    cache_enabled: bool,
) -> None:
    """Initialize the context of the currently running step.

    Args:
        pipeline_run: The model of the current pipeline run.
        step_run: The model of the current step run.
        output_materializers: The output materializers of the step that
            this context is used in.
        output_artifact_uris: The output artifacts of the step that this
            context is used in.
        output_artifact_configs: The outputs' ArtifactConfigs of the step that this
            context is used in.
        step_run_info: (Deprecated) info about the currently running step.
        cache_enabled: (Deprecated) Whether caching is enabled for the step.

    Raises:
        StepContextError: If the keys of the output materializers and
            output artifacts do not match.
    """
    from zenml.client import Client

    try:
        pipeline_run = Client().get_pipeline_run(pipeline_run.id)
    except KeyError:
        pass
    self.pipeline_run = pipeline_run
    try:
        step_run = Client().get_run_step(step_run.id)
    except KeyError:
        pass
    self.step_run = step_run
    self._step_run_info = step_run_info
    self._cache_enabled = cache_enabled

    # Get the stack that we are running in
    self._stack = Client().active_stack

    self.step_name = self.step_run.name

    # set outputs
    if output_materializers.keys() != output_artifact_uris.keys():
        raise StepContextError(
            f"Mismatched keys in output materializers and output artifact "
            f"URIs for step `{self.step_name}`. Output materializer "
            f"keys: {set(output_materializers)}, output artifact URI "
            f"keys: {set(output_artifact_uris)}"
        )
    self._outputs = {
        key: StepContextOutput(
            materializer_classes=output_materializers[key],
            artifact_uri=output_artifact_uris[key],
            artifact_config=output_artifact_configs[key],
        )
        for key in output_materializers.keys()
    }
add_output_metadata(self, metadata, output_name=None)

Adds metadata for a given step output.

Parameters:

Name Type Description Default
metadata Dict[str, MetadataType]

The metadata to add.

required
output_name Optional[str]

Optional name of the output for which to add the metadata. If no name is given and the step only has a single output, the metadata of this output will be added. If the step has multiple outputs, an exception will be raised.

None
Source code in zenml/new/steps/step_context.py
def add_output_metadata(
    self,
    metadata: Dict[str, "MetadataType"],
    output_name: Optional[str] = None,
) -> None:
    """Adds metadata for a given step output.

    Args:
        metadata: The metadata to add.
        output_name: Optional name of the output for which to add the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.run_metadata:
        output.run_metadata = {}
    output.run_metadata.update(**metadata)
add_output_tags(self, tags, output_name=None)

Adds tags for a given step output.

Parameters:

Name Type Description Default
tags List[str]

The tags to add.

required
output_name Optional[str]

Optional name of the output for which to add the tags. If no name is given and the step only has a single output, the tags of this output will be added. If the step has multiple outputs, an exception will be raised.

None
Source code in zenml/new/steps/step_context.py
def add_output_tags(
    self,
    tags: List[str],
    output_name: Optional[str] = None,
) -> None:
    """Adds tags for a given step output.

    Args:
        tags: The tags to add.
        output_name: Optional name of the output for which to add the
            tags. If no name is given and the step only has a single
            output, the tags of this output will be added. If the
            step has multiple outputs, an exception will be raised.
    """
    output = self._get_output(output_name)
    if not output.tags:
        output.tags = []
    output.tags += tags
get_output_artifact_uri(self, output_name=None)

Returns the artifact URI for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the URI. If no name is given and the step only has a single output, the URI of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
str

Artifact URI for the given output.

Source code in zenml/new/steps/step_context.py
def get_output_artifact_uri(
    self, output_name: Optional[str] = None
) -> str:
    """Returns the artifact URI for a given step output.

    Args:
        output_name: Optional name of the output for which to get the URI.
            If no name is given and the step only has a single output,
            the URI of this output will be returned. If the step has
            multiple outputs, an exception will be raised.

    Returns:
        Artifact URI for the given output.
    """
    return self._get_output(output_name).artifact_uri
get_output_materializer(self, output_name=None, custom_materializer_class=None, data_type=None)

Returns a materializer for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the materializer. If no name is given and the step only has a single output, the materializer of this output will be returned. If the step has multiple outputs, an exception will be raised.

None
custom_materializer_class Optional[Type[BaseMaterializer]]

If given, this BaseMaterializer subclass will be initialized with the output artifact instead of the materializer that was registered for this step output.

None
data_type Optional[Type[Any]]

If the output annotation is of type Union and the step therefore has multiple materializers configured, you can provide a data type for the output which will be used to select the correct materializer. If not provided, the first materializer will be used.

None

Returns:

Type Description
BaseMaterializer

A materializer initialized with the output artifact for the given output.

Source code in zenml/new/steps/step_context.py
def get_output_materializer(
    self,
    output_name: Optional[str] = None,
    custom_materializer_class: Optional[Type["BaseMaterializer"]] = None,
    data_type: Optional[Type[Any]] = None,
) -> "BaseMaterializer":
    """Returns a materializer for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            materializer. If no name is given and the step only has a
            single output, the materializer of this output will be
            returned. If the step has multiple outputs, an exception
            will be raised.
        custom_materializer_class: If given, this `BaseMaterializer`
            subclass will be initialized with the output artifact instead
            of the materializer that was registered for this step output.
        data_type: If the output annotation is of type `Union` and the step
            therefore has multiple materializers configured, you can provide
            a data type for the output which will be used to select the
            correct materializer. If not provided, the first materializer
            will be used.

    Returns:
        A materializer initialized with the output artifact for
        the given output.
    """
    from zenml.utils import materializer_utils

    output = self._get_output(output_name)
    materializer_classes = output.materializer_classes
    artifact_uri = output.artifact_uri

    if custom_materializer_class:
        materializer_class = custom_materializer_class
    elif len(materializer_classes) == 1 or not data_type:
        materializer_class = materializer_classes[0]
    else:
        materializer_class = materializer_utils.select_materializer(
            data_type=data_type, materializer_classes=materializer_classes
        )

    return materializer_class(artifact_uri)
get_output_metadata(self, output_name=None)

Returns the metadata for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
Dict[str, MetadataType]

Metadata for the given output.

Source code in zenml/new/steps/step_context.py
def get_output_metadata(
    self, output_name: Optional[str] = None
) -> Dict[str, "MetadataType"]:
    """Returns the metadata for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.

    Returns:
        Metadata for the given output.
    """
    output = self._get_output(output_name)
    custom_metadata = output.run_metadata or {}
    if output.artifact_config:
        custom_metadata.update(
            **(output.artifact_config.run_metadata or {})
        )
    return custom_metadata
get_output_tags(self, output_name=None)

Returns the tags for a given step output.

Parameters:

Name Type Description Default
output_name Optional[str]

Optional name of the output for which to get the metadata. If no name is given and the step only has a single output, the metadata of this output will be returned. If the step has multiple outputs, an exception will be raised.

None

Returns:

Type Description
List[str]

Tags for the given output.

Source code in zenml/new/steps/step_context.py
def get_output_tags(self, output_name: Optional[str] = None) -> List[str]:
    """Returns the tags for a given step output.

    Args:
        output_name: Optional name of the output for which to get the
            metadata. If no name is given and the step only has a single
            output, the metadata of this output will be returned. If the
            step has multiple outputs, an exception will be raised.

    Returns:
        Tags for the given output.
    """
    output = self._get_output(output_name)
    custom_tags = set(output.tags or [])
    if output.artifact_config:
        return list(
            set(output.artifact_config.tags or []).union(custom_tags)
        )
    return list(custom_tags)
StepContextOutput

Represents a step output in the step context.

Source code in zenml/new/steps/step_context.py
class StepContextOutput:
    """Represents a step output in the step context."""

    materializer_classes: Sequence[Type["BaseMaterializer"]]
    artifact_uri: str
    run_metadata: Optional[Dict[str, "MetadataType"]] = None
    artifact_config: Optional["ArtifactConfig"]
    tags: Optional[List[str]] = None

    def __init__(
        self,
        materializer_classes: Sequence[Type["BaseMaterializer"]],
        artifact_uri: str,
        artifact_config: Optional["ArtifactConfig"],
    ):
        """Initialize the step output.

        Args:
            materializer_classes: The materializer classes for the output.
            artifact_uri: The artifact URI for the output.
            artifact_config: The ArtifactConfig object of the output.
        """
        self.materializer_classes = materializer_classes
        self.artifact_uri = artifact_uri
        self.artifact_config = artifact_config
__init__(self, materializer_classes, artifact_uri, artifact_config) special

Initialize the step output.

Parameters:

Name Type Description Default
materializer_classes Sequence[Type[BaseMaterializer]]

The materializer classes for the output.

required
artifact_uri str

The artifact URI for the output.

required
artifact_config Optional[ArtifactConfig]

The ArtifactConfig object of the output.

required
Source code in zenml/new/steps/step_context.py
def __init__(
    self,
    materializer_classes: Sequence[Type["BaseMaterializer"]],
    artifact_uri: str,
    artifact_config: Optional["ArtifactConfig"],
):
    """Initialize the step output.

    Args:
        materializer_classes: The materializer classes for the output.
        artifact_uri: The artifact URI for the output.
        artifact_config: The ArtifactConfig object of the output.
    """
    self.materializer_classes = materializer_classes
    self.artifact_uri = artifact_uri
    self.artifact_config = artifact_config
get_step_context()

Get the context of the currently running step.

Returns:

Type Description
StepContext

The context of the currently running step.

Exceptions:

Type Description
RuntimeError

If no step is currently running.

Source code in zenml/new/steps/step_context.py
def get_step_context() -> "StepContext":
    """Get the context of the currently running step.

    Returns:
        The context of the currently running step.

    Raises:
        RuntimeError: If no step is currently running.
    """
    if StepContext._exists():
        return StepContext()  # type: ignore
    raise RuntimeError(
        "The step context is only available inside a step function."
    )

step_decorator

Step decorator function.

step(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, enable_step_logs=None, experiment_tracker=None, step_operator=None, output_materializers=None, settings=None, extra=None, on_failure=None, on_success=None, model=None, retry=None, model_version=None)

Decorator to create a ZenML step.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the step. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default.

None
enable_artifact_metadata Optional[bool]

Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default.

None
enable_artifact_visualization Optional[bool]

Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default.

None
enable_step_logs Optional[bool]

Specify whether step logs are enabled for this step.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with a single argument of type BaseException, or a source path to such a function (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of success of the step. Can be a function with no arguments, or a source path to such a function (e.g. module.my_function).

None
model Optional[Model]

configuration of the model in the Model Control Plane.

None
retry Optional[StepRetryConfig]

configuration of step retry in case of step failure.

None
model_version Optional[Model]

DEPRECATED, please use model instead.

None

Returns:

Type Description
Union[BaseStep, Callable[[F], BaseStep]]

The step instance.

Source code in zenml/new/steps/step_decorator.py
def step(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional["OutputMaterializersSpecification"] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    model: Optional["Model"] = None,
    retry: Optional["StepRetryConfig"] = None,
    model_version: Optional["Model"] = None,  # TODO: deprecate me
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
    """Decorator to create a ZenML step.

    Args:
        _func: The decorated function.
        name: The name of the step. If left empty, the name of the decorated
            function will be used as a fallback.
        enable_cache: Specify whether caching is enabled for this step. If no
            value is passed, caching is enabled by default.
        enable_artifact_metadata: Specify whether metadata is enabled for this
            step. If no value is passed, metadata is enabled by default.
        enable_artifact_visualization: Specify whether visualization is enabled
            for this step. If no value is passed, visualization is enabled by
            default.
        enable_step_logs: Specify whether step logs are enabled for this step.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: Settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can be a
            function with a single argument of type `BaseException`, or a source
            path to such a function (e.g. `module.my_function`).
        on_success: Callback function in event of success of the step. Can be a
            function with no arguments, or a source path to such a function
            (e.g. `module.my_function`).
        model: configuration of the model in the Model Control Plane.
        retry: configuration of step retry in case of step failure.
        model_version: DEPRECATED, please use `model` instead.

    Returns:
        The step instance.
    """

    def inner_decorator(func: "F") -> "BaseStep":
        from zenml.new.steps.decorated_step import _DecoratedStep

        # TODO: deprecate me
        if model_version:
            logger.warning(
                "Step decorator argument `model_version` is deprecated. Please use `model` instead."
            )

        class_: Type["BaseStep"] = type(
            func.__name__,
            (_DecoratedStep,),
            {
                "entrypoint": staticmethod(func),
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

        step_instance = class_(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            enable_step_logs=enable_step_logs,
            experiment_tracker=experiment_tracker,
            step_operator=step_operator,
            output_materializers=output_materializers,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            model=model or model_version,
            retry=retry,
        )

        return step_instance

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)