Utils
        zenml.utils
  
      special
  
    Initialization of the utils module.
The utils module contains utility functions handling analytics, reading and
writing YAML data as well as other general purpose functions.
        archivable
    Archivable mixin.
        
Archivable            (ABC)
        
    Archivable mixin class.
Source code in zenml/utils/archivable.py
          class Archivable(ABC):
    """Archivable mixin class."""
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the object.
        Args:
            *args: Unused args for subclasses.
            **kwargs: Unused keyword args for subclasses.
        """
        self._extra_files: Dict[str, str] = {}
    def add_file(self, source: str, destination: str) -> None:
        """Adds a file to the archive.
        Args:
            source: The source of the file to add. This can either be a path
                or the file content.
            destination: The path inside the archive where the file
                should be added.
        """
        if fileio.exists(source):
            with fileio.open(source) as f:
                self._extra_files[destination] = f.read()
        else:
            self._extra_files[destination] = source
    def add_directory(self, source: str, destination: str) -> None:
        """Adds a directory to the archive.
        Args:
            source: Path to the directory.
            destination: The path inside the build context where the directory
                should be added.
        Raises:
            ValueError: If `source` does not point to a directory.
        """
        if not fileio.isdir(source):
            raise ValueError(
                f"Can't add directory {source} to the build context as it "
                "does not exist or is not a directory."
            )
        for dir, _, files in fileio.walk(source):
            dir_path = Path(fileio.convert_to_str(dir))
            for file_name in files:
                file_name = fileio.convert_to_str(file_name)
                file_source = dir_path / file_name
                file_destination = (
                    Path(destination)
                    / dir_path.relative_to(source)
                    / file_name
                )
                with file_source.open("r") as f:
                    self._extra_files[file_destination.as_posix()] = f.read()
    def write_archive(
        self, output_file: IO[bytes], use_gzip: bool = True
    ) -> None:
        """Writes an archive of the build context to the given file.
        Args:
            output_file: The file to write the archive to.
            use_gzip: Whether to use `gzip` to compress the file.
        """
        files = self.get_files()
        extra_files = self.get_extra_files()
        if use_gzip:
            from gzip import GzipFile
            # We don't use the builtin gzip functionality of the `tarfile`
            # library as that one includes the tar filename and creation
            # timestamp in the archive which causes the hash of the resulting
            # file to be different each time. We use this hash to avoid
            # duplicate uploads, which is why we pass empty values for filename
            # and mtime here.
            fileobj: Any = GzipFile(
                filename="", mode="wb", fileobj=output_file, mtime=0.0
            )
        else:
            fileobj = output_file
        with tarfile.open(mode="w", fileobj=fileobj) as tf:
            for archive_path, file_path in files.items():
                if archive_path in extra_files:
                    continue
                if info := tf.gettarinfo(file_path, arcname=archive_path):
                    if info.isfile():
                        with open(file_path, "rb") as f:
                            tf.addfile(info, f)
                    else:
                        tf.addfile(info, None)
            for archive_path, contents in extra_files.items():
                info = tarfile.TarInfo(archive_path)
                contents_encoded = contents.encode("utf-8")
                info.size = len(contents_encoded)
                tf.addfile(info, io.BytesIO(contents_encoded))
        if use_gzip:
            fileobj.close()
        output_file.seek(0)
    @abstractmethod
    def get_files(self) -> Dict[str, str]:
        """Gets all regular files that should be included in the archive.
        Returns:
            A dict {path_in_archive: path_on_filesystem} for all regular files
            in the archive.
        """
    def get_extra_files(self) -> Dict[str, str]:
        """Gets all extra files that should be included in the archive.
        Returns:
            A dict {path_in_archive: file_content} for all extra files in the
            archive.
        """
        return self._extra_files.copy()
__init__(self, *args, **kwargs)
  
      special
  
    Initialize the object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Unused args for subclasses. | () | 
| **kwargs | Any | Unused keyword args for subclasses. | {} | 
Source code in zenml/utils/archivable.py
          def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the object.
    Args:
        *args: Unused args for subclasses.
        **kwargs: Unused keyword args for subclasses.
    """
    self._extra_files: Dict[str, str] = {}
add_directory(self, source, destination)
    Adds a directory to the archive.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source | str | Path to the directory. | required | 
| destination | str | The path inside the build context where the directory should be added. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If  | 
Source code in zenml/utils/archivable.py
          def add_directory(self, source: str, destination: str) -> None:
    """Adds a directory to the archive.
    Args:
        source: Path to the directory.
        destination: The path inside the build context where the directory
            should be added.
    Raises:
        ValueError: If `source` does not point to a directory.
    """
    if not fileio.isdir(source):
        raise ValueError(
            f"Can't add directory {source} to the build context as it "
            "does not exist or is not a directory."
        )
    for dir, _, files in fileio.walk(source):
        dir_path = Path(fileio.convert_to_str(dir))
        for file_name in files:
            file_name = fileio.convert_to_str(file_name)
            file_source = dir_path / file_name
            file_destination = (
                Path(destination)
                / dir_path.relative_to(source)
                / file_name
            )
            with file_source.open("r") as f:
                self._extra_files[file_destination.as_posix()] = f.read()
add_file(self, source, destination)
    Adds a file to the archive.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source | str | The source of the file to add. This can either be a path or the file content. | required | 
| destination | str | The path inside the archive where the file should be added. | required | 
Source code in zenml/utils/archivable.py
          def add_file(self, source: str, destination: str) -> None:
    """Adds a file to the archive.
    Args:
        source: The source of the file to add. This can either be a path
            or the file content.
        destination: The path inside the archive where the file
            should be added.
    """
    if fileio.exists(source):
        with fileio.open(source) as f:
            self._extra_files[destination] = f.read()
    else:
        self._extra_files[destination] = source
get_extra_files(self)
    Gets all extra files that should be included in the archive.
Returns:
| Type | Description | 
|---|---|
| A dict {path_in_archive | file_content} for all extra files in the archive. | 
Source code in zenml/utils/archivable.py
          def get_extra_files(self) -> Dict[str, str]:
    """Gets all extra files that should be included in the archive.
    Returns:
        A dict {path_in_archive: file_content} for all extra files in the
        archive.
    """
    return self._extra_files.copy()
get_files(self)
    Gets all regular files that should be included in the archive.
Returns:
| Type | Description | 
|---|---|
| A dict {path_in_archive | path_on_filesystem} for all regular files in the archive. | 
Source code in zenml/utils/archivable.py
          @abstractmethod
def get_files(self) -> Dict[str, str]:
    """Gets all regular files that should be included in the archive.
    Returns:
        A dict {path_in_archive: path_on_filesystem} for all regular files
        in the archive.
    """
write_archive(self, output_file, use_gzip=True)
    Writes an archive of the build context to the given file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_file | IO[bytes] | The file to write the archive to. | required | 
| use_gzip | bool | Whether to use  | True | 
Source code in zenml/utils/archivable.py
          def write_archive(
    self, output_file: IO[bytes], use_gzip: bool = True
) -> None:
    """Writes an archive of the build context to the given file.
    Args:
        output_file: The file to write the archive to.
        use_gzip: Whether to use `gzip` to compress the file.
    """
    files = self.get_files()
    extra_files = self.get_extra_files()
    if use_gzip:
        from gzip import GzipFile
        # We don't use the builtin gzip functionality of the `tarfile`
        # library as that one includes the tar filename and creation
        # timestamp in the archive which causes the hash of the resulting
        # file to be different each time. We use this hash to avoid
        # duplicate uploads, which is why we pass empty values for filename
        # and mtime here.
        fileobj: Any = GzipFile(
            filename="", mode="wb", fileobj=output_file, mtime=0.0
        )
    else:
        fileobj = output_file
    with tarfile.open(mode="w", fileobj=fileobj) as tf:
        for archive_path, file_path in files.items():
            if archive_path in extra_files:
                continue
            if info := tf.gettarinfo(file_path, arcname=archive_path):
                if info.isfile():
                    with open(file_path, "rb") as f:
                        tf.addfile(info, f)
                else:
                    tf.addfile(info, None)
        for archive_path, contents in extra_files.items():
            info = tarfile.TarInfo(archive_path)
            contents_encoded = contents.encode("utf-8")
            info.size = len(contents_encoded)
            tf.addfile(info, io.BytesIO(contents_encoded))
    if use_gzip:
        fileobj.close()
    output_file.seek(0)
        cloud_utils
    Utilities for ZenML Pro.
is_cloud_model_version(model_version)
    Check if a model version is from a ZenML Pro server.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model_version | ModelVersionResponse | The model version to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the model version is from a ZenML Pro server, else False. | 
Source code in zenml/utils/cloud_utils.py
          def is_cloud_model_version(model_version: ModelVersionResponse) -> bool:
    """Check if a model version is from a ZenML Pro server.
    Args:
        model_version: The model version to check.
    Returns:
        True if the model version is from a ZenML Pro server, else False.
    """
    model_version_url = get_model_version_url(model_version.id)
    if model_version_url:
        logger.info(
            f"Dashboard URL for Model Version with name {model_version.name} "
            f": {model_version_url}"
        )
        return True
    else:
        return False
        code_repository_utils
    Utilities for code repositories.
find_active_code_repository(path=None)
    Find the active code repository for a given path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | Optional[str] | Path at which to look for the code repository. If not given, the source root will be used. | None | 
Returns:
| Type | Description | 
|---|---|
| Optional[LocalRepositoryContext] | The local repository context active at that path or None. | 
Source code in zenml/utils/code_repository_utils.py
          def find_active_code_repository(
    path: Optional[str] = None,
) -> Optional["LocalRepositoryContext"]:
    """Find the active code repository for a given path.
    Args:
        path: Path at which to look for the code repository. If not given, the
            source root will be used.
    Returns:
        The local repository context active at that path or None.
    """
    global _CODE_REPOSITORY_CACHE
    from zenml.client import Client
    from zenml.code_repositories import BaseCodeRepository
    path = path or source_utils.get_source_root()
    path = os.path.abspath(path)
    if path in _CODE_REPOSITORY_CACHE:
        return _CODE_REPOSITORY_CACHE[path]
    local_context: Optional["LocalRepositoryContext"] = None
    for model in depaginate(list_method=Client().list_code_repositories):
        try:
            repo = BaseCodeRepository.from_model(model)
        except Exception:
            logger.debug(
                "Failed to instantiate code repository class.", exc_info=True
            )
            continue
        local_context = repo.get_local_context(path)
        if local_context:
            break
    _CODE_REPOSITORY_CACHE[path] = local_context
    return local_context
set_custom_local_repository(root, commit, repo)
    Manually defines a local repository for a path.
To explain what this function does we need to take a dive into source
resolving and what happens inside the Docker image entrypoint:
* When trying to resolve an object to a source, we first determine whether
the file is a user file or not.
* If the file is a user file, we check if that user file is inside a clean
code repository using the
code_repository_utils.find_active_code_repository(...) function. If that
is the case, the object will be resolved to a CodeRepositorySource which
includes additional information about the current commit and the ID of the
code repository.
* The code_repository_utils.find_active_code_repository(...) uses the
code repository implementation classes to check whether the code repository
"exists" at that local path. For git repositories, this check might look as
follows: The code repository first checks if there is a git repository at
that path or in any parent directory. If there is, the remote URLs of this
git repository will be checked to see if one matches the URL defined for
the code repository.
* When running a step inside a Docker image, ZenML potentially downloads
files from a code repository. This usually does not download the entire
repository (and in the case of git might not download a .git directory which
defines a local git repository) but only specific files. If we now try to
resolve any object while running in this container, it will not get resolved
to a CodeRepositorySource as
code_repository_utils.find_active_code_repository(...) won't find an
active repository. As we downloaded these files, we however know that they
belong to a certain code repository at a specific commit, and that's what we
can define using this function.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| root | str | The repository root. | required | 
| commit | str | The commit of the repository. | required | 
| repo | BaseCodeRepository | The code repository associated with the local repository. | required | 
Source code in zenml/utils/code_repository_utils.py
          def set_custom_local_repository(
    root: str, commit: str, repo: "BaseCodeRepository"
) -> None:
    """Manually defines a local repository for a path.
    To explain what this function does we need to take a dive into source
    resolving and what happens inside the Docker image entrypoint:
    * When trying to resolve an object to a source, we first determine whether
    the file is a user file or not.
    * If the file is a user file, we check if that user file is inside a clean
    code repository using the
    `code_repository_utils.find_active_code_repository(...)` function. If that
    is the case, the object will be resolved to a `CodeRepositorySource` which
    includes additional information about the current commit and the ID of the
    code repository.
    * The `code_repository_utils.find_active_code_repository(...)` uses the
    code repository implementation classes to check whether the code repository
    "exists" at that local path. For git repositories, this check might look as
    follows: The code repository first checks if there is a git repository at
    that path or in any parent directory. If there is, the remote URLs of this
    git repository will be checked to see if one matches the URL defined for
    the code repository.
    * When running a step inside a Docker image, ZenML potentially downloads
    files from a code repository. This usually does not download the entire
    repository (and in the case of git might not download a .git directory which
    defines a local git repository) but only specific files. If we now try to
    resolve any object while running in this container, it will not get resolved
    to a `CodeRepositorySource` as
    `code_repository_utils.find_active_code_repository(...)` won't find an
    active repository. As we downloaded these files, we however know that they
    belong to a certain code repository at a specific commit, and that's what we
    can define using this function.
    Args:
        root: The repository root.
        commit: The commit of the repository.
        repo: The code repository associated with the local repository.
    """
    from zenml.utils.downloaded_repository_context import (
        _DownloadedRepositoryContext,
    )
    global _CODE_REPOSITORY_CACHE
    path = os.path.abspath(source_utils.get_source_root())
    _CODE_REPOSITORY_CACHE[path] = _DownloadedRepositoryContext(
        code_repository_id=repo.id, root=root, commit=commit
    )
        code_utils
    Code utilities.
        
CodeArchive            (Archivable)
        
    Code archive class.
This class is used to archive user code before uploading it to the artifact store. If the user code is stored in a Git repository, only files not excluded by gitignores will be included in the archive.
Source code in zenml/utils/code_utils.py
          class CodeArchive(Archivable):
    """Code archive class.
    This class is used to archive user code before uploading it to the artifact
    store. If the user code is stored in a Git repository, only files not
    excluded by gitignores will be included in the archive.
    """
    def __init__(self, root: Optional[str] = None) -> None:
        """Initialize the object.
        Args:
            root: Root directory of the archive.
        """
        super().__init__()
        self._root = root
    @property
    def git_repo(self) -> Optional["Repo"]:
        """Git repository active at the code archive root.
        Returns:
            The git repository if available.
        """
        try:
            # These imports fail when git is not installed on the machine
            from git.exc import InvalidGitRepositoryError
            from git.repo.base import Repo
        except ImportError:
            return None
        try:
            git_repo = Repo(path=self._root, search_parent_directories=True)
        except InvalidGitRepositoryError:
            return None
        return git_repo
    def _get_all_files(self, archive_root: str) -> Dict[str, str]:
        """Get all files inside the archive root.
        Args:
            archive_root: The root directory from which to get all files.
        Returns:
            All files inside the archive root.
        """
        all_files = {}
        for root, _, files in os.walk(archive_root):
            for file in files:
                file_path = os.path.join(root, file)
                path_in_archive = os.path.relpath(file_path, archive_root)
                all_files[path_in_archive] = file_path
        return all_files
    def get_files(self) -> Dict[str, str]:
        """Gets all regular files that should be included in the archive.
        Raises:
            RuntimeError: If the code archive would not include any files.
        Returns:
            A dict {path_in_archive: path_on_filesystem} for all regular files
            in the archive.
        """
        if not self._root:
            return {}
        all_files = {}
        if repo := self.git_repo:
            try:
                result = repo.git.ls_files(
                    "--cached",
                    "--others",
                    "--modified",
                    "--exclude-standard",
                    self._root,
                )
            except Exception as e:
                logger.warning(
                    "Failed to get non-ignored files from git: %s", str(e)
                )
                all_files = self._get_all_files(archive_root=self._root)
            else:
                for file in result.split():
                    file_path = os.path.join(repo.working_dir, file)
                    path_in_archive = os.path.relpath(file_path, self._root)
                    if os.path.exists(file_path):
                        all_files[path_in_archive] = file_path
        else:
            all_files = self._get_all_files(archive_root=self._root)
        if not all_files:
            raise RuntimeError(
                "The code archive to be uploaded does not contain any files. "
                "This is probably because all files in your source root "
                f"`{self._root}` are ignored by a .gitignore file."
            )
        # Explicitly remove .zen directories as we write an updated version
        # to disk everytime ZenML is called. This updates the mtime of the
        # file, which invalidates the code upload caching. The values in
        # the .zen directory are not needed anyway as we set them as
        # environment variables.
        all_files = {
            path_in_archive: file_path
            for path_in_archive, file_path in sorted(all_files.items())
            if ".zen" not in Path(path_in_archive).parts[:-1]
        }
        return all_files
    def write_archive(
        self, output_file: IO[bytes], use_gzip: bool = True
    ) -> None:
        """Writes an archive of the build context to the given file.
        Args:
            output_file: The file to write the archive to.
            use_gzip: Whether to use `gzip` to compress the file.
        """
        super().write_archive(output_file=output_file, use_gzip=use_gzip)
        archive_size = os.path.getsize(output_file.name)
        if archive_size > 20 * 1024 * 1024:
            logger.warning(
                "Code archive size: `%s`. If you believe this is "
                "unreasonably large, make sure to version your code in git and "
                "ignore unnecessary files using a `.gitignore` file.",
                string_utils.get_human_readable_filesize(archive_size),
            )
git_repo: Optional[Repo]
  
      property
      readonly
  
    Git repository active at the code archive root.
Returns:
| Type | Description | 
|---|---|
| Optional[Repo] | The git repository if available. | 
__init__(self, root=None)
  
      special
  
    Initialize the object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| root | Optional[str] | Root directory of the archive. | None | 
Source code in zenml/utils/code_utils.py
          def __init__(self, root: Optional[str] = None) -> None:
    """Initialize the object.
    Args:
        root: Root directory of the archive.
    """
    super().__init__()
    self._root = root
get_files(self)
    Gets all regular files that should be included in the archive.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the code archive would not include any files. | 
Returns:
| Type | Description | 
|---|---|
| A dict {path_in_archive | path_on_filesystem} for all regular files in the archive. | 
Source code in zenml/utils/code_utils.py
          def get_files(self) -> Dict[str, str]:
    """Gets all regular files that should be included in the archive.
    Raises:
        RuntimeError: If the code archive would not include any files.
    Returns:
        A dict {path_in_archive: path_on_filesystem} for all regular files
        in the archive.
    """
    if not self._root:
        return {}
    all_files = {}
    if repo := self.git_repo:
        try:
            result = repo.git.ls_files(
                "--cached",
                "--others",
                "--modified",
                "--exclude-standard",
                self._root,
            )
        except Exception as e:
            logger.warning(
                "Failed to get non-ignored files from git: %s", str(e)
            )
            all_files = self._get_all_files(archive_root=self._root)
        else:
            for file in result.split():
                file_path = os.path.join(repo.working_dir, file)
                path_in_archive = os.path.relpath(file_path, self._root)
                if os.path.exists(file_path):
                    all_files[path_in_archive] = file_path
    else:
        all_files = self._get_all_files(archive_root=self._root)
    if not all_files:
        raise RuntimeError(
            "The code archive to be uploaded does not contain any files. "
            "This is probably because all files in your source root "
            f"`{self._root}` are ignored by a .gitignore file."
        )
    # Explicitly remove .zen directories as we write an updated version
    # to disk everytime ZenML is called. This updates the mtime of the
    # file, which invalidates the code upload caching. The values in
    # the .zen directory are not needed anyway as we set them as
    # environment variables.
    all_files = {
        path_in_archive: file_path
        for path_in_archive, file_path in sorted(all_files.items())
        if ".zen" not in Path(path_in_archive).parts[:-1]
    }
    return all_files
write_archive(self, output_file, use_gzip=True)
    Writes an archive of the build context to the given file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| output_file | IO[bytes] | The file to write the archive to. | required | 
| use_gzip | bool | Whether to use  | True | 
Source code in zenml/utils/code_utils.py
          def write_archive(
    self, output_file: IO[bytes], use_gzip: bool = True
) -> None:
    """Writes an archive of the build context to the given file.
    Args:
        output_file: The file to write the archive to.
        use_gzip: Whether to use `gzip` to compress the file.
    """
    super().write_archive(output_file=output_file, use_gzip=use_gzip)
    archive_size = os.path.getsize(output_file.name)
    if archive_size > 20 * 1024 * 1024:
        logger.warning(
            "Code archive size: `%s`. If you believe this is "
            "unreasonably large, make sure to version your code in git and "
            "ignore unnecessary files using a `.gitignore` file.",
            string_utils.get_human_readable_filesize(archive_size),
        )
download_and_extract_code(code_path, extract_dir)
    Download and extract code.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| code_path | str | Path where the code is uploaded. | required | 
| extract_dir | str | Directory where to code should be extracted to. | required | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the code is stored in an artifact store which is not active. | 
Source code in zenml/utils/code_utils.py
          def download_and_extract_code(code_path: str, extract_dir: str) -> None:
    """Download and extract code.
    Args:
        code_path: Path where the code is uploaded.
        extract_dir: Directory where to code should be extracted to.
    Raises:
        RuntimeError: If the code is stored in an artifact store which is
            not active.
    """
    artifact_store = Client().active_stack.artifact_store
    if not code_path.startswith(artifact_store.path):
        raise RuntimeError("Code stored in different artifact store.")
    download_path = os.path.basename(code_path)
    fileio.copy(code_path, download_path)
    shutil.unpack_archive(filename=download_path, extract_dir=extract_dir)
    os.remove(download_path)
upload_code_if_necessary(code_archive)
    Upload code to the artifact store if necessary.
This function computes a hash of the code to be uploaded, and if an archive with the same hash already exists it will not re-upload but instead return the path to the existing archive.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| code_archive | CodeArchive | The code archive to upload. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The path where to archived code is uploaded. | 
Source code in zenml/utils/code_utils.py
          def upload_code_if_necessary(code_archive: CodeArchive) -> str:
    """Upload code to the artifact store if necessary.
    This function computes a hash of the code to be uploaded, and if an archive
    with the same hash already exists it will not re-upload but instead return
    the path to the existing archive.
    Args:
        code_archive: The code archive to upload.
    Returns:
        The path where to archived code is uploaded.
    """
    artifact_store = Client().active_stack.artifact_store
    with tempfile.NamedTemporaryFile(
        mode="w+b", delete=False, suffix=".tar.gz"
    ) as f:
        code_archive.write_archive(f)
        hash_ = hashlib.sha1()  # nosec
        while True:
            data = f.read(64 * 1024)
            if not data:
                break
            hash_.update(data)
        filename = f"{hash_.hexdigest()}.tar.gz"
        upload_dir = os.path.join(artifact_store.path, "code_uploads")
        fileio.makedirs(upload_dir)
        upload_path = os.path.join(upload_dir, filename)
        if not fileio.exists(upload_path):
            archive_size = string_utils.get_human_readable_filesize(
                os.path.getsize(f.name)
            )
            logger.info(
                "Uploading code to `%s` (Size: %s).", upload_path, archive_size
            )
            fileio.copy(f.name, upload_path)
            logger.info("Code upload finished.")
        else:
            logger.info(
                "Code already exists in artifact store, skipping upload."
            )
    if os.path.exists(f.name):
        os.remove(f.name)
    return upload_path
        cuda_utils
    Utilities for managing GPU memory.
cleanup_gpu_memory(force=False)
    Clean up GPU memory.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| force | bool | whether to force the cleanup of GPU memory (must be passed explicitly) | False | 
Source code in zenml/utils/cuda_utils.py
          def cleanup_gpu_memory(force: bool = False) -> None:
    """Clean up GPU memory.
    Args:
        force: whether to force the cleanup of GPU memory (must be passed explicitly)
    """
    if not force:
        logger.warning(
            "This will clean up all GPU memory on current physical machine. "
            "This action is considered to be dangerous by default, since "
            "it might affect other processes running in the same environment. "
            "If this is intended, please explicitly pass `force=True`."
        )
    else:
        try:
            import torch
        except ModuleNotFoundError:
            logger.warning(
                "No PyTorch installed. Skipping GPU memory cleanup."
            )
            return
        logger.info("Cleaning up GPU memory...")
        while gc.collect():
            torch.cuda.empty_cache()
        daemon
    Utility functions to start/stop daemon processes.
This is only implemented for UNIX systems and therefore doesn't work on Windows. Based on https://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
check_if_daemon_is_running(pid_file)
    Checks whether a daemon process indicated by the PID file is running.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pid_file | str | Path to file containing the PID of the daemon process to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the daemon process is running, otherwise False. | 
Source code in zenml/utils/daemon.py
          def check_if_daemon_is_running(pid_file: str) -> bool:
    """Checks whether a daemon process indicated by the PID file is running.
    Args:
        pid_file: Path to file containing the PID of the daemon
            process to check.
    Returns:
        True if the daemon process is running, otherwise False.
    """
    return get_daemon_pid_if_running(pid_file) is not None
daemonize(pid_file, log_file=None, working_directory='/')
    Decorator that executes the decorated function as a daemon process.
Use this decorator to easily transform any function into a daemon process.
For example,
import time
from zenml.utils.daemon import daemonize
@daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
def sleeping_daemon(period: int) -> None:
    print(f"I'm a daemon! I will sleep for {period} seconds.")
    time.sleep(period)
    print("Done sleeping, flying away.")
sleeping_daemon(period=30)
print("I'm the daemon's parent!.")
time.sleep(10) # just to prove that the daemon is running in parallel
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pid_file | str | a file where the PID of the daemon process will be stored. | required | 
| log_file | Optional[str] | file where stdout and stderr are redirected for the daemon process. If not supplied, the daemon will be silenced (i.e. have its stdout/stderr redirected to /dev/null). | None | 
| working_directory | str | working directory for the daemon process, defaults to the root directory. | '/' | 
Returns:
| Type | Description | 
|---|---|
| Callable[[~F], ~F] | Decorated function that, when called, will detach from the current process and continue executing in the background, as a daemon process. | 
Source code in zenml/utils/daemon.py
          def daemonize(
    pid_file: str,
    log_file: Optional[str] = None,
    working_directory: str = "/",
) -> Callable[[F], F]:
    """Decorator that executes the decorated function as a daemon process.
    Use this decorator to easily transform any function into a daemon
    process.
    For example,
    ```python
    import time
    from zenml.utils.daemon import daemonize
    @daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
    def sleeping_daemon(period: int) -> None:
        print(f"I'm a daemon! I will sleep for {period} seconds.")
        time.sleep(period)
        print("Done sleeping, flying away.")
    sleeping_daemon(period=30)
    print("I'm the daemon's parent!.")
    time.sleep(10) # just to prove that the daemon is running in parallel
    ```
    Args:
        pid_file: a file where the PID of the daemon process will
            be stored.
        log_file: file where stdout and stderr are redirected for the daemon
            process. If not supplied, the daemon will be silenced (i.e. have
            its stdout/stderr redirected to /dev/null).
        working_directory: working directory for the daemon process,
            defaults to the root directory.
    Returns:
        Decorated function that, when called, will detach from the current
        process and continue executing in the background, as a daemon
        process.
    """
    def inner_decorator(_func: F) -> F:
        def daemon(*args: Any, **kwargs: Any) -> None:
            """Standard daemonization of a process.
            Args:
                *args: Arguments to be passed to the decorated function.
                **kwargs: Keyword arguments to be passed to the decorated
                    function.
            """
            if sys.platform == "win32":
                logger.error(
                    "Daemon functionality is currently not supported on Windows."
                )
            else:
                run_as_daemon(
                    _func,
                    log_file=log_file,
                    pid_file=pid_file,
                    working_directory=working_directory,
                    *args,
                    **kwargs,
                )
        return cast(F, daemon)
    return inner_decorator
get_daemon_pid_if_running(pid_file)
    Read and return the PID value from a PID file.
It does this if the daemon process tracked by the PID file is running.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pid_file | str | Path to file containing the PID of the daemon process to check. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[int] | The PID of the daemon process if it is running, otherwise None. | 
Source code in zenml/utils/daemon.py
          def get_daemon_pid_if_running(pid_file: str) -> Optional[int]:
    """Read and return the PID value from a PID file.
    It does this if the daemon process tracked by the PID file is running.
    Args:
        pid_file: Path to file containing the PID of the daemon
            process to check.
    Returns:
        The PID of the daemon process if it is running, otherwise None.
    """
    try:
        with open(pid_file, "r") as f:
            pid = int(f.read().strip())
    except (IOError, FileNotFoundError):
        logger.debug(
            f"Daemon PID file '{pid_file}' does not exist or cannot be read."
        )
        return None
    if not pid or not psutil.pid_exists(pid):
        logger.debug(f"Daemon with PID '{pid}' is no longer running.")
        return None
    logger.debug(f"Daemon with PID '{pid}' is running.")
    return pid
run_as_daemon(daemon_function, *args, *, pid_file, log_file=None, working_directory='/', **kwargs)
    Runs a function as a daemon process.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| daemon_function | ~F | The function to run as a daemon. | required | 
| pid_file | str | Path to file in which to store the PID of the daemon process. | required | 
| log_file | Optional[str] | Optional file to which the daemons stdout/stderr will be redirected to. | None | 
| working_directory | str | Working directory for the daemon process, defaults to the root directory. | '/' | 
| args | Any | Positional arguments to pass to the daemon function. | () | 
| kwargs | Any | Keyword arguments to pass to the daemon function. | {} | 
Exceptions:
| Type | Description | 
|---|---|
| FileExistsError | If the PID file already exists. | 
Source code in zenml/utils/daemon.py
          def run_as_daemon(
    daemon_function: F,
    *args: Any,
    pid_file: str,
    log_file: Optional[str] = None,
    working_directory: str = "/",
    **kwargs: Any,
) -> None:
    """Runs a function as a daemon process.
    Args:
        daemon_function: The function to run as a daemon.
        pid_file: Path to file in which to store the PID of the daemon
            process.
        log_file: Optional file to which the daemons stdout/stderr will be
            redirected to.
        working_directory: Working directory for the daemon process,
            defaults to the root directory.
        args: Positional arguments to pass to the daemon function.
        kwargs: Keyword arguments to pass to the daemon function.
    Raises:
        FileExistsError: If the PID file already exists.
    """
    # convert to absolute path as we will change working directory later
    if pid_file:
        pid_file = os.path.abspath(pid_file)
    if log_file:
        log_file = os.path.abspath(log_file)
    # create parent directory if necessary
    dir_name = os.path.dirname(pid_file)
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)
    # check if PID file exists
    if pid_file and os.path.exists(pid_file):
        pid = get_daemon_pid_if_running(pid_file)
        if pid:
            raise FileExistsError(
                f"The PID file '{pid_file}' already exists and a daemon "
                f"process with the same PID '{pid}' is already running."
                f"Please remove the PID file or kill the daemon process "
                f"before starting a new daemon."
            )
        logger.warning(
            f"Removing left over PID file '{pid_file}' from a previous "
            f"daemon process that didn't shut down correctly."
        )
        os.remove(pid_file)
    # first fork
    try:
        pid = os.fork()
        if pid > 0:
            # this is the process that called `run_as_daemon` so we
            # wait for the child process to finish to avoid creating
            # zombie processes. Then we simply return so the current process
            # can continue what it was doing.
            os.wait()
            return
    except OSError as e:
        logger.error("Unable to fork (error code: %d)", e.errno)
        sys.exit(1)
    # decouple from parent environment
    os.chdir(working_directory)
    os.setsid()
    os.umask(0o22)
    # second fork
    try:
        pid = os.fork()
        if pid > 0:
            # this is the parent of the future daemon process, kill it
            # so the daemon gets adopted by the init process.
            # we use os._exit here to prevent the inherited code from
            # catching the SystemExit exception and doing something else.
            os._exit(0)
    except OSError as e:
        sys.stderr.write(f"Unable to fork (error code: {e.errno})")
        # we use os._exit here to prevent the inherited code from
        # catching the SystemExit exception and doing something else.
        os._exit(1)
    # redirect standard file descriptors to devnull (or the given logfile)
    devnull = "/dev/null"
    if hasattr(os, "devnull"):
        devnull = os.devnull
    devnull_fd = os.open(devnull, os.O_RDWR)
    log_fd = (
        os.open(log_file, os.O_CREAT | os.O_RDWR | os.O_APPEND)
        if log_file
        else None
    )
    out_fd = log_fd or devnull_fd
    try:
        os.dup2(devnull_fd, sys.stdin.fileno())
    except io.UnsupportedOperation:
        # stdin is not a file descriptor
        pass
    try:
        os.dup2(out_fd, sys.stdout.fileno())
    except io.UnsupportedOperation:
        # stdout is not a file descriptor
        pass
    try:
        os.dup2(out_fd, sys.stderr.fileno())
    except io.UnsupportedOperation:
        # stderr is not a file descriptor
        pass
    if pid_file:
        # write the PID file
        with open(pid_file, "w+") as f:
            f.write(f"{os.getpid()}\n")
    # register actions in case this process exits/gets killed
    def cleanup() -> None:
        """Daemon cleanup."""
        sys.stderr.write("Cleanup: terminating children processes...\n")
        terminate_children()
        if pid_file and os.path.exists(pid_file):
            sys.stderr.write(f"Cleanup: removing PID file {pid_file}...\n")
            os.remove(pid_file)
        sys.stderr.flush()
    def sighndl(signum: int, frame: Optional[types.FrameType]) -> None:
        """Daemon signal handler.
        Args:
            signum: Signal number.
            frame: Frame object.
        """
        sys.stderr.write(f"Handling signal {signum}...\n")
        cleanup()
    signal.signal(signal.SIGTERM, sighndl)
    signal.signal(signal.SIGINT, sighndl)
    atexit.register(cleanup)
    # finally run the actual daemon code
    daemon_function(*args, **kwargs)
    sys.exit(0)
stop_daemon(pid_file)
    Stops a daemon process.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pid_file | str | Path to file containing the PID of the daemon process to kill. | required | 
Source code in zenml/utils/daemon.py
          def stop_daemon(pid_file: str) -> None:
    """Stops a daemon process.
    Args:
        pid_file: Path to file containing the PID of the daemon process to
            kill.
    """
    try:
        with open(pid_file, "r") as f:
            pid = int(f.read().strip())
    except (IOError, FileNotFoundError):
        logger.warning("Daemon PID file '%s' does not exist.", pid_file)
        return
    if psutil.pid_exists(pid):
        process = psutil.Process(pid)
        process.terminate()
    else:
        logger.warning("PID from '%s' does not exist.", pid_file)
terminate_children()
    Terminate all processes that are children of the currently running process.
Source code in zenml/utils/daemon.py
          def terminate_children() -> None:
    """Terminate all processes that are children of the currently running process."""
    pid = os.getpid()
    try:
        parent = psutil.Process(pid)
    except psutil.Error:
        # could not find parent process id
        return
    children = parent.children(recursive=False)
    for p in children:
        sys.stderr.write(
            f"Terminating child process with PID {p.pid}...\n"
        )
        p.terminate()
    _, alive = psutil.wait_procs(
        children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
    )
    for p in alive:
        sys.stderr.write(f"Killing child process with PID {p.pid}...\n")
        p.kill()
    _, alive = psutil.wait_procs(
        children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
    )
        dashboard_utils
    Utility class to help with interacting with the dashboard.
get_cloud_dashboard_url()
    Get the base url of the cloud dashboard if the server is a cloud tenant.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The base url of the cloud dashboard. | 
Source code in zenml/utils/dashboard_utils.py
          def get_cloud_dashboard_url() -> Optional[str]:
    """Get the base url of the cloud dashboard if the server is a cloud tenant.
    Returns:
        The base url of the cloud dashboard.
    """
    client = Client()
    if client.zen_store.type == StoreType.REST:
        server_info = client.zen_store.get_store_info()
        if is_cloud_server(server_info):
            return server_info.dashboard_url
    return None
get_component_url(component)
    Function to get the dashboard URL of a given component model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| component | ComponentResponse | the response model of the given component. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the URL to the component if the dashboard is available, else None. | 
Source code in zenml/utils/dashboard_utils.py
          def get_component_url(component: ComponentResponse) -> Optional[str]:
    """Function to get the dashboard URL of a given component model.
    Args:
        component: the response model of the given component.
    Returns:
        the URL to the component if the dashboard is available, else None.
    """
    base_url, is_legacy_dashboard = get_server_dashboard_url()
    if base_url:
        if is_legacy_dashboard:
            return (
                base_url
                + f"{constants.STACK_COMPONENTS}/{component.type.value}/{component.id}/configuration"
            )
        else:
            return base_url + constants.STACKS
    return None
get_model_version_url(model_version_id)
    Function to get the dashboard URL of a given model version.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model_version_id | UUID | the id of the model version. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the URL to the model version if the dashboard is available, else None. | 
Source code in zenml/utils/dashboard_utils.py
          def get_model_version_url(model_version_id: UUID) -> Optional[str]:
    """Function to get the dashboard URL of a given model version.
    Args:
        model_version_id: the id of the model version.
    Returns:
        the URL to the model version if the dashboard is available, else None.
    """
    base_url = get_cloud_dashboard_url()
    if base_url:
        return f"{base_url}/model-versions/{str(model_version_id)}"
    return None
get_run_url(run)
    Function to get the dashboard URL of a given pipeline run.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run | PipelineRunResponse | the response model of the given pipeline run. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the URL to the pipeline run if the dashboard is available, else None. | 
Source code in zenml/utils/dashboard_utils.py
          def get_run_url(run: PipelineRunResponse) -> Optional[str]:
    """Function to get the dashboard URL of a given pipeline run.
    Args:
        run: the response model of the given pipeline run.
    Returns:
        the URL to the pipeline run if the dashboard is available, else None.
    """
    cloud_url = get_cloud_dashboard_url()
    if cloud_url:
        return f"{cloud_url}{constants.RUNS}/{run.id}"
    dashboard_url, is_legacy_dashboard = get_server_dashboard_url()
    if dashboard_url:
        if is_legacy_dashboard:
            if run.pipeline:
                return f"{dashboard_url}{constants.PIPELINES}/{run.pipeline.id}{constants.RUNS}/{run.id}/dag"
            else:
                return f"{dashboard_url}/all-runs/{run.id}/dag"
        else:
            return f"{dashboard_url}{constants.RUNS}/{run.id}"
    return None
get_server_dashboard_url()
    Get the base url of the dashboard deployed by the server.
Returns:
| Type | Description | 
|---|---|
| Tuple[Optional[str], bool] | The server dashboard url and whether the dashboard is the legacy dashboard or not. | 
Source code in zenml/utils/dashboard_utils.py
          def get_server_dashboard_url() -> Tuple[Optional[str], bool]:
    """Get the base url of the dashboard deployed by the server.
    Returns:
        The server dashboard url and whether the dashboard is the legacy
        dashboard or not.
    """
    client = Client()
    if client.zen_store.type == StoreType.REST:
        server_info = client.zen_store.get_store_info()
        if server_info.use_legacy_dashboard:
            suffix = f"{constants.WORKSPACES}/{client.active_workspace.name}"
        else:
            suffix = ""
        if server_info.server_url:
            url = server_info.server_url
        else:
            url = client.zen_store.url
        return url + suffix, server_info.use_legacy_dashboard
    return None, False
get_stack_url(stack)
    Function to get the dashboard URL of a given stack model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack | StackResponse | the response model of the given stack. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | the URL to the stack if the dashboard is available, else None. | 
Source code in zenml/utils/dashboard_utils.py
          def get_stack_url(stack: StackResponse) -> Optional[str]:
    """Function to get the dashboard URL of a given stack model.
    Args:
        stack: the response model of the given stack.
    Returns:
        the URL to the stack if the dashboard is available, else None.
    """
    base_url, is_legacy_dashboard = get_server_dashboard_url()
    if base_url:
        if is_legacy_dashboard:
            return base_url + f"{constants.STACKS}/{stack.id}/configuration"
        else:
            return base_url + constants.STACKS
    return None
is_cloud_server(server_info)
    Checks whether the server info refers to a ZenML Pro server.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| server_info | ServerModel | Info of the server. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the server info refers to a ZenML Pro server. | 
Source code in zenml/utils/dashboard_utils.py
          def is_cloud_server(server_info: ServerModel) -> bool:
    """Checks whether the server info refers to a ZenML Pro server.
    Args:
        server_info: Info of the server.
    Returns:
        Whether the server info refers to a ZenML Pro server.
    """
    return (
        "organization_id" in server_info.metadata
        and "cloud.zenml.io" in server_info.dashboard_url
    )
show_dashboard(url)
    Show the ZenML dashboard at the given URL.
In native environments, the dashboard is opened in the default browser. In notebook environments, the dashboard is embedded in an iframe.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| url | str | URL of the ZenML dashboard. | required | 
Source code in zenml/utils/dashboard_utils.py
          def show_dashboard(url: str) -> None:
    """Show the ZenML dashboard at the given URL.
    In native environments, the dashboard is opened in the default browser.
    In notebook environments, the dashboard is embedded in an iframe.
    Args:
        url: URL of the ZenML dashboard.
    """
    environment = get_environment()
    if environment in (EnvironmentType.NOTEBOOK, EnvironmentType.COLAB):
        from IPython.core.display import display
        from IPython.display import IFrame
        display(IFrame(src=url, width="100%", height=720))
    elif environment in (EnvironmentType.NATIVE, EnvironmentType.WSL):
        if constants.handle_bool_env_var(
            constants.ENV_AUTO_OPEN_DASHBOARD, default=True
        ):
            try:
                import webbrowser
                if environment == EnvironmentType.WSL:
                    webbrowser.get("wslview %s").open(url)
                else:
                    webbrowser.open(url)
                logger.info(
                    "Automatically opening the dashboard in your "
                    "browser. To disable this, set the env variable "
                    "AUTO_OPEN_DASHBOARD=false."
                )
            except Exception as e:
                logger.error(e)
        else:
            logger.info(
                "To open the dashboard in a browser automatically, "
                "set the env variable AUTO_OPEN_DASHBOARD=true."
            )
    else:
        logger.info(f"The ZenML dashboard is available at {url}.")
        deprecation_utils
    Deprecation utilities.
deprecate_pydantic_attributes(*attributes)
    Utility function for deprecating and migrating pydantic attributes.
Usage: To use this, you can specify it on any pydantic BaseModel subclass like this (all the deprecated attributes need to be non-required):
from pydantic import BaseModel
from typing import Optional
class MyModel(BaseModel):
    deprecated: Optional[int] = None
    old_name: Optional[str] = None
    new_name: str
    _deprecation_validator = deprecate_pydantic_attributes(
        "deprecated", ("old_name", "new_name")
    )
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *attributes | Union[str, Tuple[str, str]] | List of attributes to deprecate. This is either the name of the attribute to deprecate, or a tuple containing the name of the deprecated attribute, and it's replacement. | () | 
Returns:
| Type | Description | 
|---|---|
| Any | Pydantic validator class method to be used on BaseModel subclasses to deprecate or migrate attributes. | 
Source code in zenml/utils/deprecation_utils.py
          def deprecate_pydantic_attributes(
    *attributes: Union[str, Tuple[str, str]],
) -> Any:
    """Utility function for deprecating and migrating pydantic attributes.
    **Usage**:
    To use this, you can specify it on any pydantic BaseModel subclass like
    this (all the deprecated attributes need to be non-required):
    ```python
    from pydantic import BaseModel
    from typing import Optional
    class MyModel(BaseModel):
        deprecated: Optional[int] = None
        old_name: Optional[str] = None
        new_name: str
        _deprecation_validator = deprecate_pydantic_attributes(
            "deprecated", ("old_name", "new_name")
        )
    ```
    Args:
        *attributes: List of attributes to deprecate. This is either the name
            of the attribute to deprecate, or a tuple containing the name of
            the deprecated attribute, and it's replacement.
    Returns:
        Pydantic validator class method to be used on BaseModel subclasses
        to deprecate or migrate attributes.
    """
    @model_validator(mode="before")  # type: ignore[misc]
    @classmethod
    @before_validator_handler
    def _deprecation_validator(
        cls: Type[BaseModel], data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Pydantic validator function for deprecating pydantic attributes.
        Args:
            cls: The class on which the attributes are defined.
            data: All values passed at model initialization.
        Raises:
            AssertionError: If either the deprecated or replacement attribute
                don't exist.
            TypeError: If the deprecated attribute is a required attribute.
            ValueError: If the deprecated attribute and replacement attribute
                contain different values.
        Returns:
            Input values with potentially migrated values.
        """
        previous_deprecation_warnings: Set[str] = getattr(
            cls, PREVIOUS_DEPRECATION_WARNINGS_ATTRIBUTE, set()
        )
        def _warn(message: str, attribute: str) -> None:
            """Logs and raises a warning for a deprecated attribute.
            Args:
                message: The warning message.
                attribute: The name of the attribute.
            """
            if attribute not in previous_deprecation_warnings:
                logger.warning(message)
                previous_deprecation_warnings.add(attribute)
            warnings.warn(
                message,
                DeprecationWarning,
            )
        for attribute in attributes:
            if isinstance(attribute, str):
                deprecated_attribute = attribute
                replacement_attribute = None
            else:
                deprecated_attribute, replacement_attribute = attribute
                assert (
                    replacement_attribute in cls.model_fields
                ), f"Unable to find attribute {replacement_attribute}."
            assert (
                deprecated_attribute in cls.model_fields
            ), f"Unable to find attribute {deprecated_attribute}."
            if cls.model_fields[deprecated_attribute].is_required():
                raise TypeError(
                    f"Unable to deprecate attribute '{deprecated_attribute}' "
                    f"of class {cls.__name__}. In order to deprecate an "
                    "attribute, it needs to be a non-required attribute. "
                    "To do so, mark the attribute with an `Optional[...] type "
                    "annotation."
                )
            if data.get(deprecated_attribute, None) is None:
                continue
            if replacement_attribute is None:
                _warn(
                    message=f"The attribute `{deprecated_attribute}` of class "
                    f"`{cls.__name__}` will be deprecated soon.",
                    attribute=deprecated_attribute,
                )
                continue
            _warn(
                message=f"The attribute `{deprecated_attribute}` of class "
                f"`{cls.__name__}` will be deprecated soon. Use the "
                f"attribute `{replacement_attribute}` instead.",
                attribute=deprecated_attribute,
            )
            if data.get(replacement_attribute, None) is None:
                logger.debug(
                    "Migrating value of deprecated attribute %s to "
                    "replacement attribute %s.",
                    deprecated_attribute,
                    replacement_attribute,
                )
                data[replacement_attribute] = data.pop(deprecated_attribute)
            elif data[deprecated_attribute] != data[replacement_attribute]:
                raise ValueError(
                    "Got different values for deprecated attribute "
                    f"{deprecated_attribute} and replacement "
                    f"attribute {replacement_attribute}."
                )
            else:
                # Both values are identical, no need to do anything
                pass
        setattr(
            cls,
            PREVIOUS_DEPRECATION_WARNINGS_ATTRIBUTE,
            previous_deprecation_warnings,
        )
        return data
    return _deprecation_validator
        dict_utils
    Util functions for dictionaries.
dict_to_bytes(dict_)
    Converts a dictionary to bytes.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dict_ | Dict[str, Any] | The dictionary to convert. | required | 
Returns:
| Type | Description | 
|---|---|
| bytes | The dictionary as bytes. | 
Source code in zenml/utils/dict_utils.py
          def dict_to_bytes(dict_: Dict[str, Any]) -> bytes:
    """Converts a dictionary to bytes.
    Args:
        dict_: The dictionary to convert.
    Returns:
        The dictionary as bytes.
    """
    return base64.b64encode(
        json.dumps(
            dict_,
            sort_keys=False,
            default=pydantic_encoder,
        ).encode("utf-8")
    )
recursive_update(original, update)
    Recursively updates a dictionary.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| original | Dict[str, Any] | The dictionary to update. | required | 
| update | Dict[str, Any] | The dictionary containing the updated values. | required | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | The updated dictionary. | 
Source code in zenml/utils/dict_utils.py
          def recursive_update(
    original: Dict[str, Any], update: Dict[str, Any]
) -> Dict[str, Any]:
    """Recursively updates a dictionary.
    Args:
        original: The dictionary to update.
        update: The dictionary containing the updated values.
    Returns:
        The updated dictionary.
    """
    for key, value in update.items():
        if isinstance(value, Dict):
            original_value = original.get(key, None) or {}
            if isinstance(original_value, Dict):
                original[key] = recursive_update(original_value, value)
            else:
                original[key] = value
        else:
            original[key] = value
    return original
remove_none_values(dict_, recursive=False)
    Removes all key-value pairs with None value.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dict_ | Dict[str, Any] | The dict from which the key-value pairs should be removed. | required | 
| recursive | bool | If  | False | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | The updated dictionary. | 
Source code in zenml/utils/dict_utils.py
          def remove_none_values(
    dict_: Dict[str, Any], recursive: bool = False
) -> Dict[str, Any]:
    """Removes all key-value pairs with `None` value.
    Args:
        dict_: The dict from which the key-value pairs should be removed.
        recursive: If `True`, will recursively remove `None` values in all
            child dicts.
    Returns:
        The updated dictionary.
    """
    def _maybe_recurse(value: Any) -> Any:
        """Calls `remove_none_values` recursively if required.
        Args:
            value: A dictionary value.
        Returns:
            The updated dictionary value.
        """
        if recursive and isinstance(value, Dict):
            return remove_none_values(value, recursive=True)
        else:
            return value
    return {k: _maybe_recurse(v) for k, v in dict_.items() if v is not None}
        docker_utils
    Utility functions relating to Docker.
build_image(image_name, dockerfile, build_context_root=None, dockerignore=None, extra_files=(), **custom_build_options)
    Builds a docker image.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| image_name | str | The name to use for the built docker image. | required | 
| dockerfile | Union[str, List[str]] | Path to a dockerfile or a list of strings representing the Dockerfile lines/commands. | required | 
| build_context_root | Optional[str] | Optional path to a directory that will be sent to the Docker daemon as build context. If left empty, the Docker build context will be empty. | None | 
| dockerignore | Optional[str] | Optional path to a dockerignore file. If no value is
given, the .dockerignore in the root of the build context will be
used if it exists. Otherwise, all files inside  | None | 
| extra_files | Sequence[Tuple[str, str]] | Additional files to include in the build context. The files should be passed as a tuple (filepath_inside_build_context, file_content) and will overwrite existing files in the build context if they share the same path. | () | 
| **custom_build_options | Any | Additional options that will be passed unmodified to the Docker build call when building the image. You can use this to for example specify build args or a target stage. See https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build for a full list of available options. | {} | 
Source code in zenml/utils/docker_utils.py
          def build_image(
    image_name: str,
    dockerfile: Union[str, List[str]],
    build_context_root: Optional[str] = None,
    dockerignore: Optional[str] = None,
    extra_files: Sequence[Tuple[str, str]] = (),
    **custom_build_options: Any,
) -> None:
    """Builds a docker image.
    Args:
        image_name: The name to use for the built docker image.
        dockerfile: Path to a dockerfile or a list of strings representing the
            Dockerfile lines/commands.
        build_context_root: Optional path to a directory that will be sent to
            the Docker daemon as build context. If left empty, the Docker build
            context will be empty.
        dockerignore: Optional path to a dockerignore file. If no value is
            given, the .dockerignore in the root of the build context will be
            used if it exists. Otherwise, all files inside `build_context_root`
            are included in the build context.
        extra_files: Additional files to include in the build context. The
            files should be passed as a tuple
            (filepath_inside_build_context, file_content) and will overwrite
            existing files in the build context if they share the same path.
        **custom_build_options: Additional options that will be passed
            unmodified to the Docker build call when building the image. You
            can use this to for example specify build args or a target stage.
            See https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
            for a full list of available options.
    """
    if isinstance(dockerfile, str):
        dockerfile_contents = io_utils.read_file_contents_as_string(dockerfile)
        logger.info("Using Dockerfile `%s`.", os.path.abspath(dockerfile))
    else:
        dockerfile_contents = "\n".join(dockerfile)
    build_context = _create_custom_build_context(
        dockerfile_contents=dockerfile_contents,
        build_context_root=build_context_root,
        dockerignore=dockerignore,
        extra_files=extra_files,
    )
    build_options = {
        "rm": False,  # don't remove intermediate containers to improve caching
        "pull": True,  # always pull parent images
        **custom_build_options,
    }
    logger.info("Building Docker image `%s`.", image_name)
    logger.debug("Docker build options: %s", build_options)
    logger.info("Building the image might take a while...")
    docker_client = _try_get_docker_client_from_env()
    # We use the client api directly here, so we can stream the logs
    output_stream = docker_client.images.client.api.build(
        fileobj=build_context,
        custom_context=True,
        tag=image_name,
        **build_options,
    )
    _process_stream(output_stream)
    logger.info("Finished building Docker image `%s`.", image_name)
check_docker()
    Checks if Docker is installed and running.
Returns:
| Type | Description | 
|---|---|
| bool | 
 | 
Source code in zenml/utils/docker_utils.py
          def check_docker() -> bool:
    """Checks if Docker is installed and running.
    Returns:
        `True` if Docker is installed, `False` otherwise.
    """
    # Try to ping Docker, to see if it's running
    try:
        docker_client = DockerClient.from_env()
        docker_client.ping()
        return True
    except Exception:
        logger.debug("Docker is not running.", exc_info=True)
    return False
get_image_digest(image_name)
    Gets the digest of an image.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| image_name | str | Name of the image to get the digest for. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | Returns the repo digest for the given image if there exists exactly one.
If there are zero or multiple repo digests, returns  | 
Source code in zenml/utils/docker_utils.py
          def get_image_digest(image_name: str) -> Optional[str]:
    """Gets the digest of an image.
    Args:
        image_name: Name of the image to get the digest for.
    Returns:
        Returns the repo digest for the given image if there exists exactly one.
        If there are zero or multiple repo digests, returns `None`.
    """
    docker_client = _try_get_docker_client_from_env()
    image = docker_client.images.get(image_name)
    repo_digests = image.attrs["RepoDigests"]
    if len(repo_digests) == 1:
        return cast(str, repo_digests[0])
    else:
        logger.debug(
            "Found zero or more repo digests for docker image '%s': %s",
            image_name,
            repo_digests,
        )
        return None
is_local_image(image_name)
    Returns whether an image was pulled from a registry or not.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| image_name | str | Name of the image to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 
 | 
Source code in zenml/utils/docker_utils.py
          def is_local_image(image_name: str) -> bool:
    """Returns whether an image was pulled from a registry or not.
    Args:
        image_name: Name of the image to check.
    Returns:
        `True` if the image was pulled from a registry, `False` otherwise.
    """
    docker_client = _try_get_docker_client_from_env()
    images = docker_client.images.list(name=image_name)
    if images:
        # An image with this name is available locally -> now check whether it
        # was pulled from a repo or built locally (in which case the repo
        # digest is empty)
        return get_image_digest(image_name) is None
    else:
        # no image with this name found locally
        return False
push_image(image_name, docker_client=None)
    Pushes an image to a container registry.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| image_name | str | The full name (including a tag) of the image to push. | required | 
| docker_client | Optional[docker.client.DockerClient] | Optional Docker client to use for pushing the image. If no client is given, a new client will be created using the default Docker environment. | None | 
Returns:
| Type | Description | 
|---|---|
| str | The Docker repository digest of the pushed image. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If fetching the repository digest of the image failed. | 
Source code in zenml/utils/docker_utils.py
          def push_image(
    image_name: str, docker_client: Optional[DockerClient] = None
) -> str:
    """Pushes an image to a container registry.
    Args:
        image_name: The full name (including a tag) of the image to push.
        docker_client: Optional Docker client to use for pushing the image. If
            no client is given, a new client will be created using the default
            Docker environment.
    Returns:
        The Docker repository digest of the pushed image.
    Raises:
        RuntimeError: If fetching the repository digest of the image failed.
    """
    logger.info("Pushing Docker image `%s`.", image_name)
    docker_client = docker_client or _try_get_docker_client_from_env()
    output_stream = docker_client.images.push(image_name, stream=True)
    aux_info = _process_stream(output_stream)
    logger.info("Finished pushing Docker image.")
    image_name_without_tag, _ = image_name.rsplit(":", maxsplit=1)
    for info in reversed(aux_info):
        try:
            repo_digest = info["Digest"]
            return f"{image_name_without_tag}@{repo_digest}"
        except KeyError:
            pass
    else:
        raise RuntimeError(
            f"Unable to find repo digest after pushing image {image_name}."
        )
tag_image(image_name, target)
    Tags an image.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| image_name | str | The name of the image to tag. | required | 
| target | str | The full target name including a tag. | required | 
Source code in zenml/utils/docker_utils.py
          def tag_image(image_name: str, target: str) -> None:
    """Tags an image.
    Args:
        image_name: The name of the image to tag.
        target: The full target name including a tag.
    """
    docker_client = _try_get_docker_client_from_env()
    image = docker_client.images.get(image_name)
    image.tag(target)
        downloaded_repository_context
    Downloaded code repository.
        enum_utils
    Util functions for enums.
        
StrEnum            (str, Enum)
        
    Base enum type for string enum values.
Source code in zenml/utils/enum_utils.py
          class StrEnum(str, Enum):
    """Base enum type for string enum values."""
    def __str__(self) -> str:
        """Returns the enum string value.
        Returns:
            The enum string value.
        """
        return self.value  # type: ignore
    @classmethod
    def names(cls) -> List[str]:
        """Get all enum names as a list of strings.
        Returns:
            A list of all enum names.
        """
        return [c.name for c in cls]
    @classmethod
    def values(cls) -> List[str]:
        """Get all enum values as a list of strings.
        Returns:
            A list of all enum values.
        """
        return [c.value for c in cls]
__format__(self, format_spec)
  
      special
  
    Default object formatter.
Source code in zenml/utils/enum_utils.py
          def __format__(self, format_spec):
    return str.__format__(str(self), format_spec)
__str__(self)
  
      special
  
    Returns the enum string value.
Returns:
| Type | Description | 
|---|---|
| str | The enum string value. | 
Source code in zenml/utils/enum_utils.py
          def __str__(self) -> str:
    """Returns the enum string value.
    Returns:
        The enum string value.
    """
    return self.value  # type: ignore
        env_utils
    Utility functions for handling environment variables.
reconstruct_environment_variables(env=None)
    Reconstruct environment variables that were split into chunks.
Reconstructs the environment variables with values that were split into individual chunks because they were too large. The input environment variables are modified in-place.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| env | Optional[Dict[str, str]] | Input environment variables dictionary. If not supplied, the OS environment variables are used. | None | 
Source code in zenml/utils/env_utils.py
          def reconstruct_environment_variables(
    env: Optional[Dict[str, str]] = None,
) -> None:
    """Reconstruct environment variables that were split into chunks.
    Reconstructs the environment variables with values that were split into
    individual chunks because they were too large. The input environment
    variables are modified in-place.
    Args:
        env: Input environment variables dictionary. If not supplied, the OS
            environment variables are used.
    """
    if env is None:
        env = cast(Dict[str, str], os.environ)
    chunks: Dict[str, List[str]] = {}
    for key in env.keys():
        if not key[:-1].endswith(ENV_VAR_CHUNK_SUFFIX):
            continue
        # Collect all chunks of the same environment variable
        original_key = key[: -(len(ENV_VAR_CHUNK_SUFFIX) + 1)]
        chunks.setdefault(original_key, [])
        chunks[original_key].append(key)
    # Reconstruct the environment variables from their chunks
    for key, chunk_keys in chunks.items():
        chunk_keys.sort()
        value = "".join([env[key] for key in chunk_keys])
        env[key] = value
        # Remove the chunk environment variables
        for key in chunk_keys:
            env.pop(key)
split_environment_variables(size_limit, env=None)
    Split long environment variables into chunks.
Splits the input environment variables with values that exceed the supplied maximum length into individual components. The input environment variables are modified in-place.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| size_limit | int | Maximum length of an environment variable value. | required | 
| env | Optional[Dict[str, str]] | Input environment variables dictionary. If not supplied, the OS environment variables are used. | None | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If an environment variable value is too large and requires more than 10 chunks. | 
Source code in zenml/utils/env_utils.py
          def split_environment_variables(
    size_limit: int,
    env: Optional[Dict[str, str]] = None,
) -> None:
    """Split long environment variables into chunks.
    Splits the input environment variables with values that exceed the supplied
    maximum length into individual components. The input environment variables
    are modified in-place.
    Args:
        size_limit: Maximum length of an environment variable value.
        env: Input environment variables dictionary. If not supplied, the
            OS environment variables are used.
    Raises:
        RuntimeError: If an environment variable value is too large and requires
            more than 10 chunks.
    """
    if env is None:
        env = cast(Dict[str, str], os.environ)
    for key, value in env.copy().items():
        if len(value) <= size_limit:
            continue
        # We keep the number of chunks to a maximum of 10 to avoid generating
        # too many environment variables chunks and also to make the
        # reconstruction easier to implement
        if len(value) > size_limit * 10:
            raise RuntimeError(
                f"Environment variable {key} exceeds the maximum length of "
                f"{size_limit * 10} characters."
            )
        env.pop(key)
        # Split the environment variable into chunks
        chunks = [
            value[i : i + size_limit] for i in range(0, len(value), size_limit)
        ]
        for i, chunk in enumerate(chunks):
            env[f"{key}{ENV_VAR_CHUNK_SUFFIX}{i}"] = chunk
        filesync_model
    Filesync utils for ZenML.
        
FileSyncModel            (BaseModel)
        
    Pydantic model synchronized with a configuration file.
Use this class as a base Pydantic model that is automatically synchronized with a configuration file on disk.
This class overrides the setattr and getattr magic methods to ensure that the FileSyncModel instance acts as an in-memory cache of the information stored in the associated configuration file.
Source code in zenml/utils/filesync_model.py
          class FileSyncModel(BaseModel):
    """Pydantic model synchronized with a configuration file.
    Use this class as a base Pydantic model that is automatically synchronized
    with a configuration file on disk.
    This class overrides the __setattr__ and __getattr__ magic methods to
    ensure that the FileSyncModel instance acts as an in-memory cache of the
    information stored in the associated configuration file.
    """
    _config_file: str
    _config_file_timestamp: Optional[float] = None
    @model_validator(mode="wrap")
    @classmethod
    def config_validator(
        cls,
        data: Any,
        handler: ValidatorFunctionWrapHandler,
        info: ValidationInfo,
    ) -> "FileSyncModel":
        """Wrap model validator to infer the config_file during initialization.
        Args:
            data: The raw data that is provided before the validation.
            handler: The actual validation function pydantic would use for the
                built-in validation function.
            info: The context information during the execution of this
                validation function.
        Returns:
            the actual instance after the validation
        Raises:
            ValidationError: if you try to validate through a JSON string. You
                need to provide a config_file path when you create a
                FileSyncModel.
            AssertionError: if the raw input does not include a config_file
                path for the configuration file.
        """
        # Disable json validation
        if info.mode == "json":
            raise ValidationError(
                "You can not instantiate filesync models using the JSON mode."
            )
        if isinstance(data, dict):
            # Assert that the config file is defined
            assert (
                "config_file" in data
            ), "You have to provide a path for the configuration file."
            config_file = data.pop("config_file")
            # Load the current values and update with new values
            config_dict = {}
            if fileio.exists(config_file):
                config_dict = yaml_utils.read_yaml(config_file)
            config_dict.update(data)
            # Execute the regular validation
            model = handler(config_dict)
            assert isinstance(model, cls)
            # Assign the private attribute and save the config
            model._config_file = config_file
            model.write_config()
        else:
            # If the raw value is not a dict, apply proper validation.
            model = handler(data)
            assert isinstance(model, cls)
        return model
    def __setattr__(self, key: str, value: Any) -> None:
        """Sets an attribute on the model and persists it in the configuration file.
        Args:
            key: attribute name.
            value: attribute value.
        """
        super(FileSyncModel, self).__setattr__(key, value)
        if key.startswith("_"):
            return
        self.write_config()
    def __getattribute__(self, key: str) -> Any:
        """Gets an attribute value for a specific key.
        Args:
            key: attribute name.
        Returns:
            attribute value.
        """
        if not key.startswith("_") and key in self.__dict__:
            self.load_config()
        return super(FileSyncModel, self).__getattribute__(key)
    def write_config(self) -> None:
        """Writes the model to the configuration file."""
        yaml_utils.write_yaml(self._config_file, self.model_dump(mode="json"))
        self._config_file_timestamp = os.path.getmtime(self._config_file)
    def load_config(self) -> None:
        """Loads the model from the configuration file on disk."""
        if not fileio.exists(self._config_file):
            return
        # don't reload the configuration if the file hasn't
        # been updated since the last load
        file_timestamp = os.path.getmtime(self._config_file)
        if file_timestamp == self._config_file_timestamp:
            return
        if self._config_file_timestamp is not None:
            logger.info(f"Reloading configuration file {self._config_file}")
        # refresh the model from the configuration file values
        config_dict = yaml_utils.read_yaml(self._config_file)
        for key, value in config_dict.items():
            super(FileSyncModel, self).__setattr__(key, value)
        self._config_file_timestamp = file_timestamp
__getattribute__(self, key)
  
      special
  
    Gets an attribute value for a specific key.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | str | attribute name. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | attribute value. | 
Source code in zenml/utils/filesync_model.py
          def __getattribute__(self, key: str) -> Any:
    """Gets an attribute value for a specific key.
    Args:
        key: attribute name.
    Returns:
        attribute value.
    """
    if not key.startswith("_") and key in self.__dict__:
        self.load_config()
    return super(FileSyncModel, self).__getattribute__(key)
__setattr__(self, key, value)
  
      special
  
    Sets an attribute on the model and persists it in the configuration file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | str | attribute name. | required | 
| value | Any | attribute value. | required | 
Source code in zenml/utils/filesync_model.py
          def __setattr__(self, key: str, value: Any) -> None:
    """Sets an attribute on the model and persists it in the configuration file.
    Args:
        key: attribute name.
        value: attribute value.
    """
    super(FileSyncModel, self).__setattr__(key, value)
    if key.startswith("_"):
        return
    self.write_config()
config_validator(data, handler, info)
  
      classmethod
  
    Wrap model validator to infer the config_file during initialization.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| data | Any | The raw data that is provided before the validation. | required | 
| handler | ValidatorFunctionWrapHandler | The actual validation function pydantic would use for the built-in validation function. | required | 
| info | ValidationInfo | The context information during the execution of this validation function. | required | 
Returns:
| Type | Description | 
|---|---|
| FileSyncModel | the actual instance after the validation | 
Exceptions:
| Type | Description | 
|---|---|
| ValidationError | if you try to validate through a JSON string. You need to provide a config_file path when you create a FileSyncModel. | 
| AssertionError | if the raw input does not include a config_file path for the configuration file. | 
Source code in zenml/utils/filesync_model.py
          @model_validator(mode="wrap")
@classmethod
def config_validator(
    cls,
    data: Any,
    handler: ValidatorFunctionWrapHandler,
    info: ValidationInfo,
) -> "FileSyncModel":
    """Wrap model validator to infer the config_file during initialization.
    Args:
        data: The raw data that is provided before the validation.
        handler: The actual validation function pydantic would use for the
            built-in validation function.
        info: The context information during the execution of this
            validation function.
    Returns:
        the actual instance after the validation
    Raises:
        ValidationError: if you try to validate through a JSON string. You
            need to provide a config_file path when you create a
            FileSyncModel.
        AssertionError: if the raw input does not include a config_file
            path for the configuration file.
    """
    # Disable json validation
    if info.mode == "json":
        raise ValidationError(
            "You can not instantiate filesync models using the JSON mode."
        )
    if isinstance(data, dict):
        # Assert that the config file is defined
        assert (
            "config_file" in data
        ), "You have to provide a path for the configuration file."
        config_file = data.pop("config_file")
        # Load the current values and update with new values
        config_dict = {}
        if fileio.exists(config_file):
            config_dict = yaml_utils.read_yaml(config_file)
        config_dict.update(data)
        # Execute the regular validation
        model = handler(config_dict)
        assert isinstance(model, cls)
        # Assign the private attribute and save the config
        model._config_file = config_file
        model.write_config()
    else:
        # If the raw value is not a dict, apply proper validation.
        model = handler(data)
        assert isinstance(model, cls)
    return model
load_config(self)
    Loads the model from the configuration file on disk.
Source code in zenml/utils/filesync_model.py
          def load_config(self) -> None:
    """Loads the model from the configuration file on disk."""
    if not fileio.exists(self._config_file):
        return
    # don't reload the configuration if the file hasn't
    # been updated since the last load
    file_timestamp = os.path.getmtime(self._config_file)
    if file_timestamp == self._config_file_timestamp:
        return
    if self._config_file_timestamp is not None:
        logger.info(f"Reloading configuration file {self._config_file}")
    # refresh the model from the configuration file values
    config_dict = yaml_utils.read_yaml(self._config_file)
    for key, value in config_dict.items():
        super(FileSyncModel, self).__setattr__(key, value)
    self._config_file_timestamp = file_timestamp
model_post_init(self, __context)
    This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| self | BaseModel | The BaseModel instance. | required | 
| __context | Any | The context. | required | 
Source code in zenml/utils/filesync_model.py
          def init_private_attributes(self: BaseModel, __context: Any) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.
    It takes context as an argument since that's what pydantic-core passes when calling it.
    Args:
        self: The BaseModel instance.
        __context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
write_config(self)
    Writes the model to the configuration file.
Source code in zenml/utils/filesync_model.py
          def write_config(self) -> None:
    """Writes the model to the configuration file."""
    yaml_utils.write_yaml(self._config_file, self.model_dump(mode="json"))
    self._config_file_timestamp = os.path.getmtime(self._config_file)
        function_utils
    Utility functions for python functions.
create_cli_wrapped_script(func, flavour='accelerate')
    Create a script with the CLI-wrapped function.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| func | ~F | The function to use. | required | 
| flavour | str | The flavour to use. | 'accelerate' | 
Yields:
| Type | Description | 
|---|---|
| Iterator[Tuple[pathlib.Path, pathlib.Path]] | The paths of the script and the output. | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If the function is not defined in a module. | 
Source code in zenml/utils/function_utils.py
          @contextmanager
def create_cli_wrapped_script(
    func: F, flavour: str = "accelerate"
) -> Iterator[Tuple[Path, Path]]:
    """Create a script with the CLI-wrapped function.
    Args:
        func: The function to use.
        flavour: The flavour to use.
    Yields:
        The paths of the script and the output.
    Raises:
        ValueError: If the function is not defined in a module.
    """
    try:
        random_name = random_str(20)
        script_path = Path(random_name + ".py")
        output_path = Path(random_name + ".out")
        module = inspect.getmodule(func)
        if module is None:
            raise ValueError(
                f"Function `{func.__name__}` must be defined in a "
                "module to be used with Accelerate."
            )
        with open(script_path, "w") as f:
            if path := module.__file__:
                func_path = str(Path(path).parent)
                relative_path = path.replace(func_path, "").lstrip(os.sep)
                relative_path = os.path.splitext(relative_path)[0]
                clean_module_name = ".".join(relative_path.split(os.sep))
                script = _CLI_WRAPPED_SCRIPT_TEMPLATE_HEADER.format(
                    func_path=func_path,
                    func_module=clean_module_name,
                    func_name=func.__name__,
                )
                script += _CLI_WRAPPED_MAINS[flavour].format(
                    output_file=str(output_path.absolute())
                )
                f.write(script)
            else:
                raise ValueError(
                    f"Cannot find module file path for function `{func.__name__}`."
                )
        logger.debug(f"Created script:\n\n{script}")
        yield script_path, output_path
    finally:
        if script_path.exists():
            script_path.unlink()
        if output_path.exists():
            output_path.unlink()
        git_utils
    Utility function to clone a Git repository.
clone_git_repository(url, to_path, branch=None, commit=None)
    Clone a Git repository.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| url | str | URL of the repository to clone. | required | 
| to_path | str | Path to clone the repository to. | required | 
| branch | Optional[str] | Branch to clone. Defaults to "main". | None | 
| commit | Optional[str] | Commit to checkout. If specified, the branch argument is ignored. | None | 
Returns:
| Type | Description | 
|---|---|
| Repo | The cloned repository. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the repository could not be cloned. | 
Source code in zenml/utils/git_utils.py
          def clone_git_repository(
    url: str,
    to_path: str,
    branch: Optional[str] = None,
    commit: Optional[str] = None,
) -> Repo:
    """Clone a Git repository.
    Args:
        url: URL of the repository to clone.
        to_path: Path to clone the repository to.
        branch: Branch to clone. Defaults to "main".
        commit: Commit to checkout. If specified, the branch argument is
            ignored.
    Returns:
        The cloned repository.
    Raises:
        RuntimeError: If the repository could not be cloned.
    """
    os.makedirs(os.path.basename(to_path), exist_ok=True)
    try:
        if commit:
            repo = Repo.clone_from(
                url=url,
                to_path=to_path,
                no_checkout=True,
            )
            repo.git.checkout(commit)
        else:
            repo = Repo.clone_from(
                url=url,
                to_path=to_path,
                branch=branch or "main",
            )
        return repo
    except GitCommandError as e:
        raise RuntimeError from e
        integration_utils
    Util functions for integration.
parse_requirement(requirement)
    Parse a requirement string into name and extras.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| requirement | str | A requirement string. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[Optional[str], Optional[str]] | A tuple of name and extras. | 
Source code in zenml/utils/integration_utils.py
          def parse_requirement(requirement: str) -> Tuple[Optional[str], Optional[str]]:
    """Parse a requirement string into name and extras.
    Args:
        requirement: A requirement string.
    Returns:
        A tuple of name and extras.
    """
    if match := re.match(
        r"([a-zA-Z0-9\-_]+)(?![^\[]*\s)(\[[a-zA-Z0-9\-_,]+\])?", requirement
    ):
        name, extras = match.groups()
        return (None, None) if " " in requirement else (name, extras)
    return None, None
        io_utils
    Various utility functions for the io module.
copy_dir(source_dir, destination_dir, overwrite=False)
    Copies dir from source to destination.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source_dir | str | Path to copy from. | required | 
| destination_dir | str | Path to copy to. | required | 
| overwrite | bool | Boolean. If false, function throws an error before overwrite. | False | 
Source code in zenml/utils/io_utils.py
          def copy_dir(
    source_dir: str, destination_dir: str, overwrite: bool = False
) -> None:
    """Copies dir from source to destination.
    Args:
        source_dir: Path to copy from.
        destination_dir: Path to copy to.
        overwrite: Boolean. If false, function throws an error before overwrite.
    """
    for source_file in listdir(source_dir):
        source_path = os.path.join(source_dir, convert_to_str(source_file))
        destination_path = os.path.join(
            destination_dir, convert_to_str(source_file)
        )
        if isdir(source_path):
            if source_path == destination_dir:
                # if the destination is a subdirectory of the source, we skip
                # copying it to avoid an infinite loop.
                continue
            copy_dir(source_path, destination_path, overwrite)
        else:
            create_dir_recursive_if_not_exists(
                os.path.dirname(destination_path)
            )
            copy(str(source_path), str(destination_path), overwrite)
create_dir_if_not_exists(dir_path)
    Creates directory if it does not exist.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dir_path | str | Local path in filesystem. | required | 
Source code in zenml/utils/io_utils.py
          def create_dir_if_not_exists(dir_path: str) -> None:
    """Creates directory if it does not exist.
    Args:
        dir_path: Local path in filesystem.
    """
    if not isdir(dir_path):
        mkdir(dir_path)
create_dir_recursive_if_not_exists(dir_path)
    Creates directory recursively if it does not exist.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dir_path | str | Local path in filesystem. | required | 
Source code in zenml/utils/io_utils.py
          def create_dir_recursive_if_not_exists(dir_path: str) -> None:
    """Creates directory recursively if it does not exist.
    Args:
        dir_path: Local path in filesystem.
    """
    if not isdir(dir_path):
        makedirs(dir_path)
create_file_if_not_exists(file_path, file_contents='{}')
    Creates file if it does not exist.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Local path in filesystem. | required | 
| file_contents | str | Contents of file. | '{}' | 
Source code in zenml/utils/io_utils.py
          def create_file_if_not_exists(
    file_path: str, file_contents: str = "{}"
) -> None:
    """Creates file if it does not exist.
    Args:
        file_path: Local path in filesystem.
        file_contents: Contents of file.
    """
    full_path = Path(file_path)
    if not exists(file_path):
        create_dir_recursive_if_not_exists(str(full_path.parent))
        with open(str(full_path), "w") as f:
            f.write(file_contents)
find_files(dir_path, pattern)
    Find files in a directory that match pattern.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dir_path | PathType | The path to directory. | required | 
| pattern | str | pattern like *.png. | required | 
Yields:
| Type | Description | 
|---|---|
| Iterable[str] | All matching filenames in the directory. | 
Source code in zenml/utils/io_utils.py
          def find_files(dir_path: "PathType", pattern: str) -> Iterable[str]:
    """Find files in a directory that match pattern.
    Args:
        dir_path: The path to directory.
        pattern: pattern like *.png.
    Yields:
        All matching filenames in the directory.
    """
    for root, _, files in walk(dir_path):
        for basename in files:
            if fnmatch.fnmatch(convert_to_str(basename), pattern):
                filename = os.path.join(
                    convert_to_str(root), convert_to_str(basename)
                )
                yield filename
get_global_config_directory()
    Gets the global config directory for ZenML.
Returns:
| Type | Description | 
|---|---|
| str | The global config directory for ZenML. | 
Source code in zenml/utils/io_utils.py
          def get_global_config_directory() -> str:
    """Gets the global config directory for ZenML.
    Returns:
        The global config directory for ZenML.
    """
    env_var_path = os.getenv(ENV_ZENML_CONFIG_PATH)
    if env_var_path:
        return str(Path(env_var_path).resolve())
    return click.get_app_dir(APP_NAME)
get_grandparent(dir_path)
    Get grandparent of dir.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dir_path | str | The path to directory. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The input paths parents parent. | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If dir_path does not exist. | 
Source code in zenml/utils/io_utils.py
          def get_grandparent(dir_path: str) -> str:
    """Get grandparent of dir.
    Args:
        dir_path: The path to directory.
    Returns:
        The input paths parents parent.
    Raises:
        ValueError: If dir_path does not exist.
    """
    if not os.path.exists(dir_path):
        raise ValueError(f"Path '{dir_path}' does not exist.")
    return Path(dir_path).parent.parent.stem
get_parent(dir_path)
    Get parent of dir.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dir_path | str | The path to directory. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Parent (stem) of the dir as a string. | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If dir_path does not exist. | 
Source code in zenml/utils/io_utils.py
          def get_parent(dir_path: str) -> str:
    """Get parent of dir.
    Args:
        dir_path: The path to directory.
    Returns:
        Parent (stem) of the dir as a string.
    Raises:
        ValueError: If dir_path does not exist.
    """
    if not os.path.exists(dir_path):
        raise ValueError(f"Path '{dir_path}' does not exist.")
    return Path(dir_path).parent.stem
is_remote(path)
    Returns True if path exists remotely.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | Any path as a string. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if remote path, else False. | 
Source code in zenml/utils/io_utils.py
          def is_remote(path: str) -> bool:
    """Returns True if path exists remotely.
    Args:
        path: Any path as a string.
    Returns:
        True if remote path, else False.
    """
    return any(path.startswith(prefix) for prefix in REMOTE_FS_PREFIX)
is_root(path)
    Returns true if path has no parent in local filesystem.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | Local path in filesystem. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if root, else False. | 
Source code in zenml/utils/io_utils.py
          def is_root(path: str) -> bool:
    """Returns true if path has no parent in local filesystem.
    Args:
        path: Local path in filesystem.
    Returns:
        True if root, else False.
    """
    return Path(path).parent == Path(path)
move(source, destination, overwrite=False)
    Moves dir or file from source to destination. Can be used to rename.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source | str | Local path to copy from. | required | 
| destination | str | Local path to copy to. | required | 
| overwrite | bool | boolean, if false, then throws an error before overwrite. | False | 
Source code in zenml/utils/io_utils.py
          def move(source: str, destination: str, overwrite: bool = False) -> None:
    """Moves dir or file from source to destination. Can be used to rename.
    Args:
        source: Local path to copy from.
        destination: Local path to copy to.
        overwrite: boolean, if false, then throws an error before overwrite.
    """
    rename(source, destination, overwrite)
read_file_contents_as_string(file_path)
    Reads contents of file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to file. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Contents of file. | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | If file does not exist. | 
Source code in zenml/utils/io_utils.py
          def read_file_contents_as_string(file_path: str) -> str:
    """Reads contents of file.
    Args:
        file_path: Path to file.
    Returns:
        Contents of file.
    Raises:
        FileNotFoundError: If file does not exist.
    """
    if not exists(file_path):
        raise FileNotFoundError(f"{file_path} does not exist!")
    with open(file_path) as f:
        return f.read()  # type: ignore[no-any-return]
resolve_relative_path(path)
    Takes relative path and resolves it absolutely.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | Local path in filesystem. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Resolved path. | 
Source code in zenml/utils/io_utils.py
          def resolve_relative_path(path: str) -> str:
    """Takes relative path and resolves it absolutely.
    Args:
        path: Local path in filesystem.
    Returns:
        Resolved path.
    """
    if is_remote(path):
        return path
    return str(Path(path).resolve())
write_file_contents_as_string(file_path, content)
    Writes contents of file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to file. | required | 
| content | str | Contents of file. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If content is not of type str. | 
Source code in zenml/utils/io_utils.py
          def write_file_contents_as_string(file_path: str, content: str) -> None:
    """Writes contents of file.
    Args:
        file_path: Path to file.
        content: Contents of file.
    Raises:
        ValueError: If content is not of type str.
    """
    if not isinstance(content, str):
        raise ValueError(f"Content must be of type str, got {type(content)}")
    with open(file_path, "w") as f:
        f.write(content)
        json_utils
    Carried over version of some functions from the pydantic v1 json module.
Check out the latest version here: https://github.com/pydantic/pydantic/blob/v1.10.15/pydantic/json.py
decimal_encoder(dec_value)
    Encodes a Decimal as int of there's no exponent, otherwise float.
This is useful when we use ConstrainedDecimal to represent Numeric(x,0) where an integer (but not int typed) is used. Encoding this as a float results in failed round-tripping between encode and parse. Our ID type is a prime example of this.
decimal_encoder(Decimal("1.0")) 1.0
decimal_encoder(Decimal("1")) 1
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| dec_value | Decimal | The input Decimal value | required | 
Returns:
| Type | Description | 
|---|---|
| Union[int, float] | the encoded result | 
Source code in zenml/utils/json_utils.py
          def decimal_encoder(dec_value: Decimal) -> Union[int, float]:
    """Encodes a Decimal as int of there's no exponent, otherwise float.
    This is useful when we use ConstrainedDecimal to represent Numeric(x,0)
    where an integer (but not int typed) is used. Encoding this as a float
    results in failed round-tripping between encode and parse.
    Our ID type is a prime example of this.
    >>> decimal_encoder(Decimal("1.0"))
    1.0
    >>> decimal_encoder(Decimal("1"))
    1
    Args:
        dec_value: The input Decimal value
    Returns:
        the encoded result
    """
    if dec_value.as_tuple().exponent >= 0:  # type: ignore[operator]
        return int(dec_value)
    else:
        return float(dec_value)
isoformat(obj)
    Function to convert a datetime into iso format.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Union[datetime.date, datetime.time] | input datetime | required | 
Returns:
| Type | Description | 
|---|---|
| str | the corresponding time in iso format. | 
Source code in zenml/utils/json_utils.py
          def isoformat(obj: Union[datetime.date, datetime.time]) -> str:
    """Function to convert a datetime into iso format.
    Args:
        obj: input datetime
    Returns:
        the corresponding time in iso format.
    """
    return obj.isoformat()
        materializer_utils
    Util functions for materializers.
select_materializer(data_type, materializer_classes)
    Select a materializer for a given data type.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| data_type | Type[Any] | The data type for which to select the materializer. | required | 
| materializer_classes | Sequence[Type[BaseMaterializer]] | Available materializer classes. | required | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If no materializer can handle the given data type. | 
Returns:
| Type | Description | 
|---|---|
| Type[BaseMaterializer] | The first materializer that can handle the given data type. | 
Source code in zenml/utils/materializer_utils.py
          def select_materializer(
    data_type: Type[Any],
    materializer_classes: Sequence[Type["BaseMaterializer"]],
) -> Type["BaseMaterializer"]:
    """Select a materializer for a given data type.
    Args:
        data_type: The data type for which to select the materializer.
        materializer_classes: Available materializer classes.
    Raises:
        RuntimeError: If no materializer can handle the given data type.
    Returns:
        The first materializer that can handle the given data type.
    """
    fallback: Optional[Type["BaseMaterializer"]] = None
    for class_ in data_type.__mro__:
        for materializer_class in materializer_classes:
            if class_ in materializer_class.ASSOCIATED_TYPES:
                return materializer_class
            elif not fallback and materializer_class.can_handle_type(class_):
                fallback = materializer_class
    if fallback:
        return fallback
    raise RuntimeError(f"No materializer found for type {data_type}.")
        mlstacks_utils
    Functionality to handle interaction with the mlstacks package.
convert_click_params_to_mlstacks_primitives(params, zenml_component_deploy=False)
    Converts raw Click CLI params to mlstacks primitives.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| params | Dict[str, Any] | Raw Click CLI params. | required | 
| zenml_component_deploy | bool | A boolean indicating whether the stack
contains a single component and is being deployed through  | False | 
Returns:
| Type | Description | 
|---|---|
| Tuple[Stack, List[Component]] | A tuple of Stack and List[Component] objects. | 
Source code in zenml/utils/mlstacks_utils.py
          def convert_click_params_to_mlstacks_primitives(
    params: Dict[str, Any],
    zenml_component_deploy: bool = False,
) -> Tuple["Stack", List["Component"]]:
    """Converts raw Click CLI params to mlstacks primitives.
    Args:
        params: Raw Click CLI params.
        zenml_component_deploy: A boolean indicating whether the stack
            contains a single component and is being deployed through `zenml
            component deploy`
    Returns:
        A tuple of Stack and List[Component] objects.
    """
    from zenml.cli.utils import verify_mlstacks_prerequisites_installation
    verify_mlstacks_prerequisites_installation()
    from mlstacks.constants import MLSTACKS_PACKAGE_NAME
    from mlstacks.models import Component, Stack
    stack: Stack = _construct_base_stack(params)
    components: List[Component] = _construct_components(
        params, zenml_component_deploy
    )
    # writes the file names to the stack spec
    # using format '<provider>-<component_name>.yaml'
    for component in components:
        stack.components.append(
            os.path.join(
                click.get_app_dir(MLSTACKS_PACKAGE_NAME),
                "stack_specs",
                stack.name,
                f"{component.name}.yaml",
            )
        )
    return stack, components
convert_mlstacks_primitives_to_dicts(stack, components)
    Converts mlstacks Stack and Components to dicts.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack | Stack | A mlstacks Stack object. | required | 
| components | List[Component] | A list of mlstacks Component objects. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[Dict[str, Any], List[Dict[str, Any]]] | A tuple of Stack and List[Component] dicts. | 
Source code in zenml/utils/mlstacks_utils.py
          def convert_mlstacks_primitives_to_dicts(
    stack: "Stack", components: List["Component"]
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
    """Converts mlstacks Stack and Components to dicts.
    Args:
        stack: A mlstacks Stack object.
        components: A list of mlstacks Component objects.
    Returns:
        A tuple of Stack and List[Component] dicts.
    """
    from zenml.cli.utils import verify_mlstacks_prerequisites_installation
    verify_mlstacks_prerequisites_installation()
    # convert to json first to strip out Enums objects
    stack_dict = json.loads(stack.model_dump_json())
    components_dicts = [
        json.loads(component.model_dump_json()) for component in components
    ]
    return stack_dict, components_dicts
deploy_mlstacks_stack(spec_file_path, stack_name, stack_provider, debug_mode=False, no_import_stack_flag=False, user_created_spec=False)
    Deploys an MLStacks stack given a spec file path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| spec_file_path | str | The path to the spec file. | required | 
| stack_name | str | The name of the stack. | required | 
| stack_provider | str | The cloud provider for which the stack is deployed. | required | 
| debug_mode | bool | A boolean indicating whether to run in debug mode. | False | 
| no_import_stack_flag | bool | A boolean indicating whether to import the stack into ZenML. | False | 
| user_created_spec | bool | A boolean indicating whether the user created the spec file. | False | 
Source code in zenml/utils/mlstacks_utils.py
          def deploy_mlstacks_stack(
    spec_file_path: str,
    stack_name: str,
    stack_provider: str,
    debug_mode: bool = False,
    no_import_stack_flag: bool = False,
    user_created_spec: bool = False,
) -> None:
    """Deploys an MLStacks stack given a spec file path.
    Args:
        spec_file_path: The path to the spec file.
        stack_name: The name of the stack.
        stack_provider: The cloud provider for which the stack is deployed.
        debug_mode: A boolean indicating whether to run in debug mode.
        no_import_stack_flag: A boolean indicating whether to import the stack
            into ZenML.
        user_created_spec: A boolean indicating whether the user created the
            spec file.
    """
    from zenml.cli.utils import (
        declare,
        verify_mlstacks_prerequisites_installation,
    )
    verify_mlstacks_prerequisites_installation()
    from mlstacks.constants import MLSTACKS_PACKAGE_NAME
    from mlstacks.utils import terraform_utils
    spec_dir = os.path.join(
        click.get_app_dir(MLSTACKS_PACKAGE_NAME), "stack_specs", stack_name
    )
    declare("Deploying stack using Terraform...")
    terraform_utils.deploy_stack(spec_file_path, debug_mode=debug_mode)
    declare("Stack successfully deployed.")
    if not no_import_stack_flag:
        declare(f"Importing stack '{stack_name}' into ZenML...")
        import_new_mlstacks_stack(
            stack_name=stack_name,
            provider=stack_provider,
            stack_spec_dir=spec_dir,
            user_stack_spec_file=spec_file_path if user_created_spec else None,
        )
        declare("Stack successfully imported into ZenML.")
get_stack_spec_file_path(stack_name)
    Gets the path to the stack spec file for the given stack name.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack_name | str | The name of the stack spec to get the path for. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The path to the stack spec file for the given stack name. | 
Exceptions:
| Type | Description | 
|---|---|
| KeyError | If the stack does not exist. | 
Source code in zenml/utils/mlstacks_utils.py
          def get_stack_spec_file_path(stack_name: str) -> str:
    """Gets the path to the stack spec file for the given stack name.
    Args:
        stack_name: The name of the stack spec to get the path for.
    Returns:
        The path to the stack spec file for the given stack name.
    Raises:
        KeyError: If the stack does not exist.
    """
    try:
        stack = Client().get_stack(
            name_id_or_prefix=stack_name, allow_name_prefix_match=False
        )
    except KeyError as e:
        raise e
    return stack.stack_spec_path or ""
import_new_mlstacks_component(stack_name, component_name, provider, stack_spec_dir)
    Import a new component deployed for a particular cloud provider.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack_name | str | The name of the stack to import. | required | 
| component_name | str | The name of the component to import. | required | 
| provider | str | The cloud provider for which the stack is deployed. | required | 
| stack_spec_dir | str | The path to the directory containing the stack spec. | required | 
Source code in zenml/utils/mlstacks_utils.py
          def import_new_mlstacks_component(
    stack_name: str, component_name: str, provider: str, stack_spec_dir: str
) -> None:
    """Import a new component deployed for a particular cloud provider.
    Args:
        stack_name: The name of the stack to import.
        component_name: The name of the component to import.
        provider: The cloud provider for which the stack is deployed.
        stack_spec_dir: The path to the directory containing the stack spec.
    """
    data, _ = _setup_import(provider, stack_name, stack_spec_dir)
    component_ids = _import_components(
        data, stack_spec_dir, component_name=component_name
    )
    component_type = list(component_ids.keys())[
        0
    ]  # Assuming only one component is imported
    component = Client().get_stack_component(
        component_type, component_ids[component_type]
    )
    from zenml.cli.utils import print_model_url
    print_model_url(get_component_url(component))
import_new_mlstacks_stack(stack_name, provider, stack_spec_dir, user_stack_spec_file=None)
    Import a new stack deployed for a particular cloud provider.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack_name | str | The name of the stack to import. | required | 
| provider | str | The cloud provider for which the stack is deployed. | required | 
| stack_spec_dir | str | The path to the directory containing the stack spec. | required | 
| user_stack_spec_file | Optional[str] | The path to the user-created stack spec file. | None | 
Source code in zenml/utils/mlstacks_utils.py
          def import_new_mlstacks_stack(
    stack_name: str,
    provider: str,
    stack_spec_dir: str,
    user_stack_spec_file: Optional[str] = None,
) -> None:
    """Import a new stack deployed for a particular cloud provider.
    Args:
        stack_name: The name of the stack to import.
        provider: The cloud provider for which the stack is deployed.
        stack_spec_dir: The path to the directory containing the stack spec.
        user_stack_spec_file: The path to the user-created stack spec file.
    """
    data, stack_spec_file = _setup_import(
        provider, stack_name, stack_spec_dir, user_stack_spec_file
    )
    component_ids = _import_components(
        data=data, stack_spec_dir=stack_spec_dir
    )
    imported_stack = Client().create_stack(
        name=stack_name,
        components=component_ids,
        stack_spec_file=stack_spec_file,
    )
    from zenml.cli.utils import print_model_url
    print_model_url(get_stack_url(imported_stack))
stack_exists(stack_name)
    Checks whether a stack with that name exists or not.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack_name | str | The name of the stack to check for. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | A boolean indicating whether the stack exists or not. | 
Source code in zenml/utils/mlstacks_utils.py
          def stack_exists(stack_name: str) -> bool:
    """Checks whether a stack with that name exists or not.
    Args:
        stack_name: The name of the stack to check for.
    Returns:
        A boolean indicating whether the stack exists or not.
    """
    try:
        Client().get_stack(
            name_id_or_prefix=stack_name, allow_name_prefix_match=False
        )
    except KeyError:
        return False
    return True
stack_spec_exists(stack_name)
    Checks whether a stack spec with that name exists or not.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack_name | str | The name of the stack spec to check for. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | A boolean indicating whether the stack spec exists or not. | 
Source code in zenml/utils/mlstacks_utils.py
          def stack_spec_exists(stack_name: str) -> bool:
    """Checks whether a stack spec with that name exists or not.
    Args:
        stack_name: The name of the stack spec to check for.
    Returns:
        A boolean indicating whether the stack spec exists or not.
    """
    from zenml.cli.utils import verify_mlstacks_prerequisites_installation
    verify_mlstacks_prerequisites_installation()
    from mlstacks.constants import MLSTACKS_PACKAGE_NAME
    spec_dir = os.path.join(
        click.get_app_dir(MLSTACKS_PACKAGE_NAME), "stack_specs", stack_name
    )
    return Path(spec_dir).exists()
verify_spec_and_tf_files_exist(spec_file_path, tf_file_path)
    Checks whether both the spec and tf files exist.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| spec_file_path | str | The path to the spec file. | required | 
| tf_file_path | str | The path to the tf file. | required | 
Source code in zenml/utils/mlstacks_utils.py
          def verify_spec_and_tf_files_exist(
    spec_file_path: str, tf_file_path: str
) -> None:
    """Checks whether both the spec and tf files exist.
    Args:
        spec_file_path: The path to the spec file.
        tf_file_path: The path to the tf file.
    """
    from zenml.cli.utils import error
    if not Path(spec_file_path).exists():
        error(f"Could not find the Stack spec file at {spec_file_path}.")
    elif not Path(tf_file_path).exists():
        error(
            f"Could not find the Terraform files for the stack "
            f"at {tf_file_path}."
        )
        networking_utils
    Utility functions for networking.
find_available_port()
    Finds a local random unoccupied TCP port.
Returns:
| Type | Description | 
|---|---|
| int | A random unoccupied TCP port. | 
Source code in zenml/utils/networking_utils.py
          def find_available_port() -> int:
    """Finds a local random unoccupied TCP port.
    Returns:
        A random unoccupied TCP port.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        _, port = s.getsockname()
    return cast(int, port)
get_or_create_ngrok_tunnel(ngrok_token, port)
    Get or create an ngrok tunnel at the given port.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| ngrok_token | str | The ngrok auth token. | required | 
| port | int | The port to tunnel. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The public URL of the ngrok tunnel. | 
Exceptions:
| Type | Description | 
|---|---|
| ImportError | If the  | 
Source code in zenml/utils/networking_utils.py
          def get_or_create_ngrok_tunnel(ngrok_token: str, port: int) -> str:
    """Get or create an ngrok tunnel at the given port.
    Args:
        ngrok_token: The ngrok auth token.
        port: The port to tunnel.
    Returns:
        The public URL of the ngrok tunnel.
    Raises:
        ImportError: If the `pyngrok` package is not installed.
    """
    try:
        from pyngrok import ngrok as ngrok_client
    except ImportError:
        raise ImportError(
            "The `pyngrok` package is required to create ngrok tunnels. "
            "Please install it by running `pip install pyngrok`."
        )
    # Check if ngrok is already tunneling the port
    tunnels = ngrok_client.get_tunnels()
    for tunnel in tunnels:
        if tunnel.config and isinstance(tunnel.config, dict):
            tunnel_protocol = tunnel.config.get("proto")
            tunnel_port = tunnel.config.get("addr")
            if tunnel_protocol == "http" and tunnel_port == port:
                return str(tunnel.public_url)
    # Create new tunnel
    ngrok_client.set_auth_token(ngrok_token)
    return str(ngrok_client.connect(port).public_url)
port_available(port, address='127.0.0.1')
    Checks if a local port is available.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| port | int | TCP port number | required | 
| address | str | IP address on the local machine | '127.0.0.1' | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the port is available, otherwise False | 
Source code in zenml/utils/networking_utils.py
          def port_available(port: int, address: str = "127.0.0.1") -> bool:
    """Checks if a local port is available.
    Args:
        port: TCP port number
        address: IP address on the local machine
    Returns:
        True if the port is available, otherwise False
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            if hasattr(socket, "SO_REUSEPORT"):
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            else:
                # The SO_REUSEPORT socket option is not supported on Windows.
                # This if clause exists just for mypy to not complain about
                # missing code paths.
                pass
            s.bind((address, port))
    except socket.error as e:
        logger.debug("Port %d unavailable on %s: %s", port, address, e)
        return False
    return True
port_is_open(hostname, port)
    Check if a TCP port is open on a remote host.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| hostname | str | hostname of the remote machine | required | 
| port | int | TCP port number | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the port is open, False otherwise | 
Source code in zenml/utils/networking_utils.py
          def port_is_open(hostname: str, port: int) -> bool:
    """Check if a TCP port is open on a remote host.
    Args:
        hostname: hostname of the remote machine
        port: TCP port number
    Returns:
        True if the port is open, False otherwise
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            result = sock.connect_ex((hostname, port))
            return result == 0
    except socket.error as e:
        logger.debug(
            f"Error checking TCP port {port} on host {hostname}: {str(e)}"
        )
        return False
replace_internal_hostname_with_localhost(hostname)
    Replaces an internal Docker or K3D hostname with localhost.
Localhost URLs that are directly accessible on the host machine are not
accessible from within a Docker or K3D container running on that same
machine, but there are special hostnames featured by both Docker
(host.docker.internal) and K3D (host.k3d.internal) that can be used to
access host services from within the containers.
Use this method to replace one of these special hostnames with localhost if used outside a container or in a container where special hostnames are not available.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| hostname | str | The hostname to replace. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The original or replaced hostname. | 
Source code in zenml/utils/networking_utils.py
          def replace_internal_hostname_with_localhost(hostname: str) -> str:
    """Replaces an internal Docker or K3D hostname with localhost.
    Localhost URLs that are directly accessible on the host machine are not
    accessible from within a Docker or K3D container running on that same
    machine, but there are special hostnames featured by both Docker
    (`host.docker.internal`) and K3D (`host.k3d.internal`) that can be used to
    access host services from within the containers.
    Use this method to replace one of these special hostnames with localhost
    if used outside a container or in a container where special hostnames are
    not available.
    Args:
        hostname: The hostname to replace.
    Returns:
        The original or replaced hostname.
    """
    if hostname not in ("host.docker.internal", "host.k3d.internal"):
        return hostname
    if Environment.in_container():
        # Try to resolve one of the special hostnames to see if it is available
        # inside the container and use that if it is.
        for internal_hostname in (
            "host.docker.internal",
            "host.k3d.internal",
        ):
            try:
                socket.gethostbyname(internal_hostname)
                if internal_hostname != hostname:
                    logger.debug(
                        f"Replacing internal hostname {hostname} with "
                        f"{internal_hostname}"
                    )
                return internal_hostname
            except socket.gaierror:
                continue
    logger.debug(f"Replacing internal hostname {hostname} with localhost.")
    return "127.0.0.1"
replace_localhost_with_internal_hostname(url)
    Replaces the localhost with an internal Docker or K3D hostname in a given URL.
Localhost URLs that are directly accessible on the host machine are not
accessible from within a Docker or K3D container running on that same
machine, but there are special hostnames featured by both Docker
(host.docker.internal) and K3D (host.k3d.internal) that can be used to
access host services from within the containers.
Use this method to attempt to replace localhost in a URL with one of these
special hostnames, if they are available inside a container.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| url | str | The URL to update. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The updated URL. | 
Source code in zenml/utils/networking_utils.py
          def replace_localhost_with_internal_hostname(url: str) -> str:
    """Replaces the localhost with an internal Docker or K3D hostname in a given URL.
    Localhost URLs that are directly accessible on the host machine are not
    accessible from within a Docker or K3D container running on that same
    machine, but there are special hostnames featured by both Docker
    (`host.docker.internal`) and K3D (`host.k3d.internal`) that can be used to
    access host services from within the containers.
    Use this method to attempt to replace `localhost` in a URL with one of these
    special hostnames, if they are available inside a container.
    Args:
        url: The URL to update.
    Returns:
        The updated URL.
    """
    if not Environment.in_container():
        return url
    parsed_url = urlparse(url)
    if parsed_url.hostname in ("localhost", "127.0.0.1"):
        for internal_hostname in (
            "host.docker.internal",
            "host.k3d.internal",
        ):
            try:
                socket.gethostbyname(internal_hostname)
                parsed_url = parsed_url._replace(
                    netloc=parsed_url.netloc.replace(
                        parsed_url.hostname,
                        internal_hostname,
                    )
                )
                logger.debug(
                    f"Replacing localhost with {internal_hostname} in URL: "
                    f"{url}"
                )
                return parsed_url.geturl()
            except socket.gaierror:
                continue
    return url
scan_for_available_port(start=8000, stop=65535)
    Scan the local network for an available port in the given range.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| start | int | the beginning of the port range value to scan | 8000 | 
| stop | int | the (inclusive) end of the port range value to scan | 65535 | 
Returns:
| Type | Description | 
|---|---|
| Optional[int] | The first available port in the given range, or None if no available port is found. | 
Source code in zenml/utils/networking_utils.py
          def scan_for_available_port(
    start: int = SCAN_PORT_RANGE[0], stop: int = SCAN_PORT_RANGE[1]
) -> Optional[int]:
    """Scan the local network for an available port in the given range.
    Args:
        start: the beginning of the port range value to scan
        stop: the (inclusive) end of the port range value to scan
    Returns:
        The first available port in the given range, or None if no available
        port is found.
    """
    for port in range(start, stop + 1):
        if port_available(port):
            return port
    logger.debug(
        "No free TCP ports found in the range %d - %d",
        start,
        stop,
    )
    return None
        notebook_utils
    Notebook utilities.
enable_notebook_code_extraction(_obj=None)
    Decorator to enable code extraction from notebooks.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| _obj | Optional[AnyObject] | The class or function for which to enable code extraction. | None | 
Returns:
| Type | Description | 
|---|---|
| Union[AnyObject, Callable[[AnyObject], AnyObject]] | The decorated class or function. | 
Source code in zenml/utils/notebook_utils.py
          def enable_notebook_code_extraction(
    _obj: Optional["AnyObject"] = None,
) -> Union["AnyObject", Callable[["AnyObject"], "AnyObject"]]:
    """Decorator to enable code extraction from notebooks.
    Args:
        _obj: The class or function for which to enable code extraction.
    Returns:
        The decorated class or function.
    """
    def inner_decorator(obj: "AnyObject") -> "AnyObject":
        try_to_save_notebook_cell_code(obj)
        return obj
    if _obj is None:
        return inner_decorator
    else:
        return inner_decorator(_obj)
get_active_notebook_cell_code()
    Get the code of the currently active notebook cell.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The code of the currently active notebook cell. | 
Source code in zenml/utils/notebook_utils.py
          def get_active_notebook_cell_code() -> Optional[str]:
    """Get the code of the currently active notebook cell.
    Returns:
        The code of the currently active notebook cell.
    """
    cell_code = None
    try:
        ipython = get_ipython()  # type: ignore[name-defined]
        cell_code = ipython.get_parent()["content"]["code"]
    except (NameError, KeyError) as e:
        logger.warning("Unable to extract cell code: %s.", str(e))
    return cell_code
is_defined_in_notebook_cell(obj)
    Check whether an object is defined in a notebook cell.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Any | The object to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the object is defined in a notebook cell. | 
Source code in zenml/utils/notebook_utils.py
          def is_defined_in_notebook_cell(obj: Any) -> bool:
    """Check whether an object is defined in a notebook cell.
    Args:
        obj: The object to check.
    Returns:
        Whether the object is defined in a notebook cell.
    """
    if not Environment.in_notebook():
        return False
    module_name = getattr(obj, "__module__", None)
    return module_name == "__main__"
load_notebook_cell_code(obj)
    Load the notebook cell code for an object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Any | The object for which to load the cell code. | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[str] | The notebook cell code if it was saved. | 
Source code in zenml/utils/notebook_utils.py
          def load_notebook_cell_code(obj: Any) -> Optional[str]:
    """Load the notebook cell code for an object.
    Args:
        obj: The object for which to load the cell code.
    Returns:
        The notebook cell code if it was saved.
    """
    return getattr(obj, ZENML_NOTEBOOK_CELL_CODE_ATTRIBUTE_NAME, None)
try_to_save_notebook_cell_code(obj)
    Try to save the notebook cell code for an object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Any | The object for which to save the notebook cell code. | required | 
Source code in zenml/utils/notebook_utils.py
          def try_to_save_notebook_cell_code(obj: Any) -> None:
    """Try to save the notebook cell code for an object.
    Args:
        obj: The object for which to save the notebook cell code.
    """
    if is_defined_in_notebook_cell(obj):
        if cell_code := get_active_notebook_cell_code():
            setattr(
                obj,
                ZENML_NOTEBOOK_CELL_CODE_ATTRIBUTE_NAME,
                cell_code,
            )
warn_about_notebook_cell_magic_commands(cell_code)
    Warn about magic commands in the cell code.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cell_code | str | The cell code. | required | 
Source code in zenml/utils/notebook_utils.py
          def warn_about_notebook_cell_magic_commands(cell_code: str) -> None:
    """Warn about magic commands in the cell code.
    Args:
        cell_code: The cell code.
    """
    if any(line.startswith(("%", "!")) for line in cell_code.splitlines()):
        logger.warning(
            "Some lines in your notebook cell start with a `!` or `%` "
            "character. Running a ZenML step remotely from a notebook "
            "only works if the cell only contains python code. If any "
            "of these lines contain Jupyter notebook magic commands, "
            "remove them and try again."
        )
        package_utils
    Utility functions for the package.
clean_requirements(requirements)
    Clean requirements list from redundant requirements.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| requirements | List[str] | List of requirements. | required | 
Returns:
| Type | Description | 
|---|---|
| List[str] | Cleaned list of requirements | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | If input is not a list | 
| ValueError | If any element in the list is not a string | 
Source code in zenml/utils/package_utils.py
          def clean_requirements(requirements: List[str]) -> List[str]:
    """Clean requirements list from redundant requirements.
    Args:
        requirements: List of requirements.
    Returns:
        Cleaned list of requirements
    Raises:
        TypeError: If input is not a list
        ValueError: If any element in the list is not a string
    """
    if not isinstance(requirements, list):
        raise TypeError("Input must be a list")
    if not all(isinstance(req, str) for req in requirements):
        raise ValueError("All elements in the list must be strings")
    cleaned = {}
    for req in requirements:
        package = (
            req.split(">=")[0]
            .split("==")[0]
            .split("<")[0]
            .split("~=")[0]
            .split("^=")[0]
            .split("[")[0]
            .strip()
        )
        if package not in cleaned or any(
            op in req for op in ["=", ">", "<", "~", "^"]
        ):
            cleaned[package] = req
    return sorted(cleaned.values())
is_latest_zenml_version()
    Checks if the currently running ZenML package is on the latest version.
Returns:
| Type | Description | 
|---|---|
| bool | True in case the current running zenml code is the latest available version on PYPI, otherwise False. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | In case something goe wrong | 
Source code in zenml/utils/package_utils.py
          def is_latest_zenml_version() -> bool:
    """Checks if the currently running ZenML package is on the latest version.
    Returns:
        True in case the current running zenml code is the latest available version on PYPI, otherwise False.
    Raises:
        RuntimeError: In case something goe wrong
    """
    from zenml import __version__
    # Get the current version of the package
    current_local_version = __version__
    # Get the latest version from PyPI
    try:
        response = requests.get("https://pypi.org/pypi/zenml/json", timeout=60)
        response.raise_for_status()
        latest_published_version = response.json()["info"]["version"]
    except Exception as e:
        raise RuntimeError(
            f"Failed to fetch the latest version from PyPI: {e}"
        )
    # Compare versions
    if version.parse(latest_published_version) > version.parse(
        current_local_version
    ):
        return False
    else:
        return True
        pagination_utils
    Pagination utilities.
depaginate(list_method, **kwargs)
    Depaginate the results from a client or store method that returns pages.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| list_method | Callable[..., zenml.models.v2.base.page.Page[~AnyResponse]] | The list method to depaginate. | required | 
| **kwargs | Any | Arguments for the list method. | {} | 
Returns:
| Type | Description | 
|---|---|
| List[~AnyResponse] | A list of the corresponding Response Models. | 
Source code in zenml/utils/pagination_utils.py
          def depaginate(
    list_method: Callable[..., Page[AnyResponse]], **kwargs: Any
) -> List[AnyResponse]:
    """Depaginate the results from a client or store method that returns pages.
    Args:
        list_method: The list method to depaginate.
        **kwargs: Arguments for the list method.
    Returns:
        A list of the corresponding Response Models.
    """
    page = list_method(**kwargs)
    items = list(page.items)
    while page.index < page.total_pages:
        kwargs["page"] = page.index + 1
        page = list_method(**kwargs)
        items += list(page.items)
    return items
        pipeline_docker_image_builder
    Implementation of Docker image builds to run ZenML pipelines.
        
PipelineDockerImageBuilder        
    Builds Docker images to run a ZenML pipeline.
Source code in zenml/utils/pipeline_docker_image_builder.py
          class PipelineDockerImageBuilder:
    """Builds Docker images to run a ZenML pipeline."""
    def build_docker_image(
        self,
        docker_settings: "DockerSettings",
        tag: str,
        stack: "Stack",
        include_files: bool,
        download_files: bool,
        entrypoint: Optional[str] = None,
        extra_files: Optional[Dict[str, str]] = None,
        code_repository: Optional["BaseCodeRepository"] = None,
    ) -> Tuple[str, Optional[str], Optional[str]]:
        """Builds (and optionally pushes) a Docker image to run a pipeline.
        Use the image name returned by this method whenever you need to uniquely
        reference the pushed image in order to pull or run it.
        Args:
            docker_settings: The settings for the image build.
            tag: The tag to use for the image.
            stack: The stack on which the pipeline will be deployed.
            include_files: Whether to include files in the build context.
            download_files: Whether to download files in the build context.
            entrypoint: Entrypoint to use for the final image. If left empty,
                no entrypoint will be included in the image.
            extra_files: Extra files to add to the build context. Keys are the
                path inside the build context, values are either the file
                content or a file path.
            code_repository: The code repository from which files will be
                downloaded.
        Returns:
            A tuple (image_digest, dockerfile, requirements):
            - The Docker image repo digest or local name, depending on whether
            the image was pushed or is just stored locally.
            - Dockerfile will contain the contents of the Dockerfile used to
            build the image.
            - Requirements is a string with a single pip requirement per line.
        Raises:
            RuntimeError: If the stack does not contain an image builder.
            ValueError: If no Dockerfile and/or custom parent image is
                specified and the Docker configuration doesn't require an
                image build.
            ValueError: If the specified Dockerfile does not exist.
        """
        requirements: Optional[str] = None
        dockerfile: Optional[str] = None
        if docker_settings.skip_build:
            assert (
                docker_settings.parent_image
            )  # checked via validator already
            # Should we tag this here and push it to the container registry of
            # the stack to make sure it's always accessible when running the
            # pipeline?
            return docker_settings.parent_image, dockerfile, requirements
        if docker_settings.dockerfile and not os.path.isfile(
            docker_settings.dockerfile
        ):
            raise ValueError(
                "Dockerfile at path "
                f"{os.path.abspath(docker_settings.dockerfile)} not found."
            )
        stack.validate()
        image_builder = stack.image_builder
        if not image_builder:
            raise RuntimeError(
                "Unable to build Docker images without an image builder in the "
                f"stack `{stack.name}`."
            )
        container_registry = stack.container_registry
        build_context_class = image_builder.build_context_class
        target_image_name = self._get_target_image_name(
            docker_settings=docker_settings,
            tag=tag,
            container_registry=container_registry,
        )
        requires_zenml_build = any(
            [
                docker_settings.requirements,
                docker_settings.required_integrations,
                docker_settings.required_hub_plugins,
                docker_settings.replicate_local_python_environment,
                docker_settings.install_stack_requirements,
                docker_settings.apt_packages,
                docker_settings.environment,
                include_files,
                download_files,
                entrypoint,
                extra_files,
            ]
        )
        # Fallback to the value defined on the stack component if the
        # pipeline configuration doesn't have a configured value
        parent_image = (
            docker_settings.parent_image or DEFAULT_DOCKER_PARENT_IMAGE
        )
        if docker_settings.dockerfile:
            if parent_image != DEFAULT_DOCKER_PARENT_IMAGE:
                logger.warning(
                    "You've specified both a Dockerfile and a custom parent "
                    "image, ignoring the parent image."
                )
            push = (
                not image_builder.is_building_locally
                or not requires_zenml_build
            )
            if requires_zenml_build:
                # We will build an additional image on top of this one later
                # to include user files and/or install requirements. The image
                # we build now will be used as the parent for the next build.
                repository = docker_settings.target_repository
                if not repository:
                    if container_registry:
                        repository = (
                            container_registry.config.default_repository
                        )
                repository = repository or DEFAULT_ZENML_DOCKER_REPOSITORY
                user_image_name = f"{repository}:" f"{tag}-intermediate-build"
                if push and container_registry:
                    user_image_name = (
                        f"{container_registry.config.uri}/{user_image_name}"
                    )
                parent_image = user_image_name
            else:
                # The image we'll build from the custom Dockerfile will be
                # used directly, so we tag it with the requested target name.
                user_image_name = target_image_name
            build_config = (
                docker_settings.parent_image_build_config
                or DockerBuildConfig()
            )
            build_context = build_context_class(
                root=docker_settings.build_context_root,
                dockerignore_file=build_config.dockerignore,
            )
            build_context.add_file(
                source=docker_settings.dockerfile, destination="Dockerfile"
            )
            logger.info("Building Docker image `%s`.", user_image_name)
            image_name_or_digest = image_builder.build(
                image_name=user_image_name,
                build_context=build_context,
                docker_build_options=build_config.build_options
                or docker_settings.build_options,
                container_registry=container_registry if push else None,
            )
        elif not requires_zenml_build:
            if parent_image == DEFAULT_DOCKER_PARENT_IMAGE:
                raise ValueError(
                    "Unable to run a ZenML pipeline with the given Docker "
                    "settings: No Dockerfile or custom parent image "
                    "specified and no files will be copied or requirements "
                    "installed."
                )
            else:
                # The parent image will be used directly to run the pipeline and
                # needs to be tagged/pushed
                docker_utils.tag_image(parent_image, target=target_image_name)
                if container_registry:
                    image_name_or_digest = container_registry.push_image(
                        target_image_name
                    )
                else:
                    image_name_or_digest = target_image_name
        if requires_zenml_build:
            logger.info("Building Docker image `%s`.", target_image_name)
            build_config = docker_settings.build_config or DockerBuildConfig()
            # Leave the build context empty if we don't want to include any files
            build_context_root = (
                source_utils.get_source_root() if include_files else None
            )
            dockerignore = (
                build_config.dockerignore or docker_settings.dockerignore
            )
            build_context = build_context_class(
                root=build_context_root,
                dockerignore_file=dockerignore,
            )
            requirements_files = self.gather_requirements_files(
                docker_settings=docker_settings,
                stack=stack,
                code_repository=code_repository,
            )
            self._add_requirements_files(
                requirements_files=requirements_files,
                build_context=build_context,
            )
            requirements = (
                "\n".join(
                    file_content for _, file_content, _ in requirements_files
                )
                or None
            )
            apt_packages = docker_settings.apt_packages.copy()
            if docker_settings.install_stack_requirements:
                apt_packages += stack.apt_packages
            # include apt packages from all required integrations
            for integration in docker_settings.required_integrations:
                # get the integration
                integration_cls = integration_registry.integrations[
                    integration
                ]
                apt_packages += integration_cls.APT_PACKAGES
            if apt_packages:
                logger.info(
                    "Including apt packages: %s",
                    ", ".join(f"`{p}`" for p in apt_packages),
                )
            if parent_image == DEFAULT_DOCKER_PARENT_IMAGE:
                # The default parent image is static and doesn't require a pull
                # each time
                pull_parent_image = False
            elif docker_settings.dockerfile and not container_registry:
                # We built a custom parent image and there was no container
                # registry in the stack to push to, this is a local image
                pull_parent_image = False
            elif not image_builder.is_building_locally:
                # Remote image builders always need to pull the image
                pull_parent_image = True
            else:
                # If the image is local, we don't need to pull it. Otherwise
                # we play it safe and always pull in case the user pushed a new
                # image for the given name and tag
                pull_parent_image = not docker_utils.is_local_image(
                    parent_image
                )
            build_options = {
                "pull": pull_parent_image,
                "rm": False,
                **build_config.build_options,
            }
            dockerfile = self._generate_zenml_pipeline_dockerfile(
                parent_image=parent_image,
                docker_settings=docker_settings,
                download_files=download_files,
                requirements_files=requirements_files,
                apt_packages=apt_packages,
                entrypoint=entrypoint,
            )
            build_context.add_file(destination="Dockerfile", source=dockerfile)
            if extra_files:
                for destination, source in extra_files.items():
                    build_context.add_file(
                        destination=destination, source=source
                    )
            image_name_or_digest = image_builder.build(
                image_name=target_image_name,
                build_context=build_context,
                docker_build_options=build_options,
                container_registry=container_registry,
            )
        return image_name_or_digest, dockerfile, requirements
    @staticmethod
    def _get_target_image_name(
        docker_settings: "DockerSettings",
        tag: str,
        container_registry: Optional["BaseContainerRegistry"] = None,
    ) -> str:
        """Returns the target image name.
        If a container registry is given, the image name will include the
        registry URI
        Args:
            docker_settings: The settings for the image build.
            tag: The tag to use for the image.
            container_registry: Optional container registry to which this
                image will be pushed.
        Returns:
            The docker image name.
        """
        repository = docker_settings.target_repository
        if not repository:
            if container_registry:
                repository = container_registry.config.default_repository
        repository = repository or DEFAULT_ZENML_DOCKER_REPOSITORY
        target_image_name = f"{repository}:{tag}"
        if container_registry:
            target_image_name = (
                f"{container_registry.config.uri}/{target_image_name}"
            )
        return target_image_name
    @classmethod
    def _add_requirements_files(
        cls,
        requirements_files: List[Tuple[str, str, List[str]]],
        build_context: "BuildContext",
    ) -> None:
        """Adds requirements files to the build context.
        Args:
            requirements_files: List of tuples
                (filename, file_content, pip_options).
            build_context: Build context to add the requirements files to.
        """
        for filename, file_content, _ in requirements_files:
            build_context.add_file(source=file_content, destination=filename)
    @staticmethod
    def gather_requirements_files(
        docker_settings: DockerSettings,
        stack: "Stack",
        code_repository: Optional["BaseCodeRepository"] = None,
        log: bool = True,
    ) -> List[Tuple[str, str, List[str]]]:
        """Gathers and/or generates pip requirements files.
        This method is called in `PipelineDockerImageBuilder.build_docker_image`
        but it is also called by other parts of the codebase, e.g. the
        `AzureMLStepOperator`, which needs to upload the requirements files to
        AzureML where the step image is then built.
        Args:
            docker_settings: Docker settings that specifies which
                requirements to install.
            stack: The stack on which the pipeline will run.
            code_repository: The code repository from which files will be
                downloaded.
            log: If True, will log the requirements.
        Raises:
            RuntimeError: If the command to export the local python packages
                failed.
            FileNotFoundError: If the specified requirements file does not
                exist.
        Returns:
            List of tuples (filename, file_content, pip_options) of all
            requirements files.
            The files will be in the following order:
            - Packages installed in the local Python environment
            - Requirements defined by stack integrations
            - Requirements defined by user integrations
            - User-defined requirements
        """
        requirements_files: List[Tuple[str, str, List[str]]] = []
        # Generate requirements file for the local environment if configured
        if docker_settings.replicate_local_python_environment:
            if isinstance(
                docker_settings.replicate_local_python_environment,
                PythonEnvironmentExportMethod,
            ):
                command = (
                    docker_settings.replicate_local_python_environment.command
                )
            else:
                command = " ".join(
                    docker_settings.replicate_local_python_environment
                )
            try:
                local_requirements = subprocess.check_output(
                    command,
                    shell=True,  # nosec
                ).decode()
            except subprocess.CalledProcessError as e:
                raise RuntimeError(
                    "Unable to export local python packages."
                ) from e
            requirements_files.append(
                (".zenml_local_requirements", local_requirements, [])
            )
            if log:
                logger.info(
                    "- Including python packages from local environment"
                )
        if docker_settings.install_stack_requirements:
            stack_requirements = stack.requirements()
            if code_repository:
                stack_requirements.update(code_repository.requirements)
            if stack_requirements:
                stack_requirements_list = sorted(stack_requirements)
                stack_requirements_file = "\n".join(stack_requirements_list)
                requirements_files.append(
                    (
                        ".zenml_stack_integration_requirements",
                        stack_requirements_file,
                        [],
                    )
                )
                if log:
                    logger.info(
                        "- Including stack requirements: %s",
                        ", ".join(f"`{r}`" for r in stack_requirements_list),
                    )
        # Generate requirements file for all required integrations
        integration_requirements = set(
            itertools.chain.from_iterable(
                integration_registry.select_integration_requirements(
                    integration_name=integration,
                    target_os=OperatingSystemType.LINUX,
                )
                for integration in docker_settings.required_integrations
            )
        )
        if integration_requirements:
            integration_requirements_list = sorted(integration_requirements)
            integration_requirements_file = "\n".join(
                integration_requirements_list
            )
            requirements_files.append(
                (
                    ".zenml_integration_requirements",
                    integration_requirements_file,
                    [],
                )
            )
            if log:
                logger.info(
                    "- Including integration requirements: %s",
                    ", ".join(f"`{r}`" for r in integration_requirements_list),
                )
        # Generate/Read requirements file for user-defined requirements
        if isinstance(docker_settings.requirements, str):
            path = os.path.abspath(docker_settings.requirements)
            try:
                user_requirements = io_utils.read_file_contents_as_string(path)
            except FileNotFoundError as e:
                raise FileNotFoundError(
                    f"Requirements file {path} does not exist."
                ) from e
            if log:
                logger.info(
                    "- Including user-defined requirements from file `%s`",
                    path,
                )
        elif isinstance(docker_settings.requirements, List):
            user_requirements = "\n".join(docker_settings.requirements)
            if log:
                logger.info(
                    "- Including user-defined requirements: %s",
                    ", ".join(f"`{r}`" for r in docker_settings.requirements),
                )
        else:
            user_requirements = None
        if user_requirements:
            requirements_files.append(
                (".zenml_user_requirements", user_requirements, [])
            )
        return requirements_files
    @staticmethod
    def _generate_zenml_pipeline_dockerfile(
        parent_image: str,
        docker_settings: DockerSettings,
        download_files: bool,
        requirements_files: Sequence[Tuple[str, str, List[str]]] = (),
        apt_packages: Sequence[str] = (),
        entrypoint: Optional[str] = None,
    ) -> str:
        """Generates a Dockerfile.
        Args:
            parent_image: The image to use as parent for the Dockerfile.
            docker_settings: Docker settings for this image build.
            download_files: Whether to download files in the build context.
            requirements_files: List of tuples that contain three items:
                - the name of a requirements file,
                - the content of that file,
                - options that should be passed to pip when installing the
                    requirements file.
            apt_packages: APT packages to install.
            entrypoint: The default entrypoint command that gets executed when
                running a container of an image created by this Dockerfile.
        Raises:
            ValueError: If an unsupported python package installer was
                configured.
        Returns:
            The generated Dockerfile.
        """
        lines = [f"FROM {parent_image}", f"WORKDIR {DOCKER_IMAGE_WORKDIR}"]
        # Set color logging to whatever is locally configured
        lines.append(
            f"ENV {ENV_ZENML_LOGGING_COLORS_DISABLED}={str(handle_bool_env_var(ENV_ZENML_LOGGING_COLORS_DISABLED, False))}"
        )
        for key, value in docker_settings.environment.items():
            lines.append(f"ENV {key.upper()}={value}")
        if apt_packages:
            apt_packages = " ".join(f"'{p}'" for p in apt_packages)
            lines.append(
                "RUN apt-get update && apt-get install -y "
                f"--no-install-recommends {apt_packages}"
            )
        if (
            docker_settings.python_package_installer
            == PythonPackageInstaller.PIP
        ):
            install_command = "pip install"
            default_installer_args: Dict[str, Any] = PIP_DEFAULT_ARGS
        elif (
            docker_settings.python_package_installer
            == PythonPackageInstaller.UV
        ):
            lines.append("RUN pip install uv")
            install_command = "uv pip install"
            default_installer_args = UV_DEFAULT_ARGS
        else:
            raise ValueError("Unsupported python package installer.")
        installer_args = {
            **default_installer_args,
            **docker_settings.python_package_installer_args,
        }
        installer_args_string = " ".join(
            f"--{key}" if value is None else f"--{key}={value}"
            for key, value in installer_args.items()
        )
        for file, _, options in requirements_files:
            lines.append(f"COPY {file} .")
            option_string = " ".join(options)
            lines.append(
                f"RUN {install_command} {installer_args_string}"
                f"{option_string} -r {file}"
            )
        lines.append(f"ENV {ENV_ZENML_ENABLE_REPO_INIT_WARNINGS}=False")
        if download_files:
            lines.append(f"ENV {ENV_ZENML_REQUIRES_CODE_DOWNLOAD}=True")
        lines.append(
            f"ENV {ENV_ZENML_CONFIG_PATH}={DOCKER_IMAGE_ZENML_CONFIG_PATH}"
        )
        lines.append("COPY . .")
        lines.append("RUN chmod -R a+rw .")
        if docker_settings.user:
            # Change file ownership to specified user
            lines.append(f"RUN chown -R {docker_settings.user} .")
            # Switch back to specified user for subsequent instructions
            lines.append(f"USER {docker_settings.user}")
        if entrypoint:
            lines.append(f"ENTRYPOINT {entrypoint}")
        return "\n".join(lines)
build_docker_image(self, docker_settings, tag, stack, include_files, download_files, entrypoint=None, extra_files=None, code_repository=None)
    Builds (and optionally pushes) a Docker image to run a pipeline.
Use the image name returned by this method whenever you need to uniquely reference the pushed image in order to pull or run it.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| docker_settings | DockerSettings | The settings for the image build. | required | 
| tag | str | The tag to use for the image. | required | 
| stack | Stack | The stack on which the pipeline will be deployed. | required | 
| include_files | bool | Whether to include files in the build context. | required | 
| download_files | bool | Whether to download files in the build context. | required | 
| entrypoint | Optional[str] | Entrypoint to use for the final image. If left empty, no entrypoint will be included in the image. | None | 
| extra_files | Optional[Dict[str, str]] | Extra files to add to the build context. Keys are the path inside the build context, values are either the file content or a file path. | None | 
| code_repository | Optional[BaseCodeRepository] | The code repository from which files will be downloaded. | None | 
Returns:
| Type | Description | 
|---|---|
| A tuple (image_digest, dockerfile, requirements) | 
 | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the stack does not contain an image builder. | 
| ValueError | If no Dockerfile and/or custom parent image is specified and the Docker configuration doesn't require an image build. | 
| ValueError | If the specified Dockerfile does not exist. | 
Source code in zenml/utils/pipeline_docker_image_builder.py
          def build_docker_image(
    self,
    docker_settings: "DockerSettings",
    tag: str,
    stack: "Stack",
    include_files: bool,
    download_files: bool,
    entrypoint: Optional[str] = None,
    extra_files: Optional[Dict[str, str]] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Tuple[str, Optional[str], Optional[str]]:
    """Builds (and optionally pushes) a Docker image to run a pipeline.
    Use the image name returned by this method whenever you need to uniquely
    reference the pushed image in order to pull or run it.
    Args:
        docker_settings: The settings for the image build.
        tag: The tag to use for the image.
        stack: The stack on which the pipeline will be deployed.
        include_files: Whether to include files in the build context.
        download_files: Whether to download files in the build context.
        entrypoint: Entrypoint to use for the final image. If left empty,
            no entrypoint will be included in the image.
        extra_files: Extra files to add to the build context. Keys are the
            path inside the build context, values are either the file
            content or a file path.
        code_repository: The code repository from which files will be
            downloaded.
    Returns:
        A tuple (image_digest, dockerfile, requirements):
        - The Docker image repo digest or local name, depending on whether
        the image was pushed or is just stored locally.
        - Dockerfile will contain the contents of the Dockerfile used to
        build the image.
        - Requirements is a string with a single pip requirement per line.
    Raises:
        RuntimeError: If the stack does not contain an image builder.
        ValueError: If no Dockerfile and/or custom parent image is
            specified and the Docker configuration doesn't require an
            image build.
        ValueError: If the specified Dockerfile does not exist.
    """
    requirements: Optional[str] = None
    dockerfile: Optional[str] = None
    if docker_settings.skip_build:
        assert (
            docker_settings.parent_image
        )  # checked via validator already
        # Should we tag this here and push it to the container registry of
        # the stack to make sure it's always accessible when running the
        # pipeline?
        return docker_settings.parent_image, dockerfile, requirements
    if docker_settings.dockerfile and not os.path.isfile(
        docker_settings.dockerfile
    ):
        raise ValueError(
            "Dockerfile at path "
            f"{os.path.abspath(docker_settings.dockerfile)} not found."
        )
    stack.validate()
    image_builder = stack.image_builder
    if not image_builder:
        raise RuntimeError(
            "Unable to build Docker images without an image builder in the "
            f"stack `{stack.name}`."
        )
    container_registry = stack.container_registry
    build_context_class = image_builder.build_context_class
    target_image_name = self._get_target_image_name(
        docker_settings=docker_settings,
        tag=tag,
        container_registry=container_registry,
    )
    requires_zenml_build = any(
        [
            docker_settings.requirements,
            docker_settings.required_integrations,
            docker_settings.required_hub_plugins,
            docker_settings.replicate_local_python_environment,
            docker_settings.install_stack_requirements,
            docker_settings.apt_packages,
            docker_settings.environment,
            include_files,
            download_files,
            entrypoint,
            extra_files,
        ]
    )
    # Fallback to the value defined on the stack component if the
    # pipeline configuration doesn't have a configured value
    parent_image = (
        docker_settings.parent_image or DEFAULT_DOCKER_PARENT_IMAGE
    )
    if docker_settings.dockerfile:
        if parent_image != DEFAULT_DOCKER_PARENT_IMAGE:
            logger.warning(
                "You've specified both a Dockerfile and a custom parent "
                "image, ignoring the parent image."
            )
        push = (
            not image_builder.is_building_locally
            or not requires_zenml_build
        )
        if requires_zenml_build:
            # We will build an additional image on top of this one later
            # to include user files and/or install requirements. The image
            # we build now will be used as the parent for the next build.
            repository = docker_settings.target_repository
            if not repository:
                if container_registry:
                    repository = (
                        container_registry.config.default_repository
                    )
            repository = repository or DEFAULT_ZENML_DOCKER_REPOSITORY
            user_image_name = f"{repository}:" f"{tag}-intermediate-build"
            if push and container_registry:
                user_image_name = (
                    f"{container_registry.config.uri}/{user_image_name}"
                )
            parent_image = user_image_name
        else:
            # The image we'll build from the custom Dockerfile will be
            # used directly, so we tag it with the requested target name.
            user_image_name = target_image_name
        build_config = (
            docker_settings.parent_image_build_config
            or DockerBuildConfig()
        )
        build_context = build_context_class(
            root=docker_settings.build_context_root,
            dockerignore_file=build_config.dockerignore,
        )
        build_context.add_file(
            source=docker_settings.dockerfile, destination="Dockerfile"
        )
        logger.info("Building Docker image `%s`.", user_image_name)
        image_name_or_digest = image_builder.build(
            image_name=user_image_name,
            build_context=build_context,
            docker_build_options=build_config.build_options
            or docker_settings.build_options,
            container_registry=container_registry if push else None,
        )
    elif not requires_zenml_build:
        if parent_image == DEFAULT_DOCKER_PARENT_IMAGE:
            raise ValueError(
                "Unable to run a ZenML pipeline with the given Docker "
                "settings: No Dockerfile or custom parent image "
                "specified and no files will be copied or requirements "
                "installed."
            )
        else:
            # The parent image will be used directly to run the pipeline and
            # needs to be tagged/pushed
            docker_utils.tag_image(parent_image, target=target_image_name)
            if container_registry:
                image_name_or_digest = container_registry.push_image(
                    target_image_name
                )
            else:
                image_name_or_digest = target_image_name
    if requires_zenml_build:
        logger.info("Building Docker image `%s`.", target_image_name)
        build_config = docker_settings.build_config or DockerBuildConfig()
        # Leave the build context empty if we don't want to include any files
        build_context_root = (
            source_utils.get_source_root() if include_files else None
        )
        dockerignore = (
            build_config.dockerignore or docker_settings.dockerignore
        )
        build_context = build_context_class(
            root=build_context_root,
            dockerignore_file=dockerignore,
        )
        requirements_files = self.gather_requirements_files(
            docker_settings=docker_settings,
            stack=stack,
            code_repository=code_repository,
        )
        self._add_requirements_files(
            requirements_files=requirements_files,
            build_context=build_context,
        )
        requirements = (
            "\n".join(
                file_content for _, file_content, _ in requirements_files
            )
            or None
        )
        apt_packages = docker_settings.apt_packages.copy()
        if docker_settings.install_stack_requirements:
            apt_packages += stack.apt_packages
        # include apt packages from all required integrations
        for integration in docker_settings.required_integrations:
            # get the integration
            integration_cls = integration_registry.integrations[
                integration
            ]
            apt_packages += integration_cls.APT_PACKAGES
        if apt_packages:
            logger.info(
                "Including apt packages: %s",
                ", ".join(f"`{p}`" for p in apt_packages),
            )
        if parent_image == DEFAULT_DOCKER_PARENT_IMAGE:
            # The default parent image is static and doesn't require a pull
            # each time
            pull_parent_image = False
        elif docker_settings.dockerfile and not container_registry:
            # We built a custom parent image and there was no container
            # registry in the stack to push to, this is a local image
            pull_parent_image = False
        elif not image_builder.is_building_locally:
            # Remote image builders always need to pull the image
            pull_parent_image = True
        else:
            # If the image is local, we don't need to pull it. Otherwise
            # we play it safe and always pull in case the user pushed a new
            # image for the given name and tag
            pull_parent_image = not docker_utils.is_local_image(
                parent_image
            )
        build_options = {
            "pull": pull_parent_image,
            "rm": False,
            **build_config.build_options,
        }
        dockerfile = self._generate_zenml_pipeline_dockerfile(
            parent_image=parent_image,
            docker_settings=docker_settings,
            download_files=download_files,
            requirements_files=requirements_files,
            apt_packages=apt_packages,
            entrypoint=entrypoint,
        )
        build_context.add_file(destination="Dockerfile", source=dockerfile)
        if extra_files:
            for destination, source in extra_files.items():
                build_context.add_file(
                    destination=destination, source=source
                )
        image_name_or_digest = image_builder.build(
            image_name=target_image_name,
            build_context=build_context,
            docker_build_options=build_options,
            container_registry=container_registry,
        )
    return image_name_or_digest, dockerfile, requirements
gather_requirements_files(docker_settings, stack, code_repository=None, log=True)
  
      staticmethod
  
    Gathers and/or generates pip requirements files.
This method is called in PipelineDockerImageBuilder.build_docker_image
but it is also called by other parts of the codebase, e.g. the
AzureMLStepOperator, which needs to upload the requirements files to
AzureML where the step image is then built.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| docker_settings | DockerSettings | Docker settings that specifies which requirements to install. | required | 
| stack | Stack | The stack on which the pipeline will run. | required | 
| code_repository | Optional[BaseCodeRepository] | The code repository from which files will be downloaded. | None | 
| log | bool | If True, will log the requirements. | True | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the command to export the local python packages failed. | 
| FileNotFoundError | If the specified requirements file does not exist. | 
Returns:
| Type | Description | 
|---|---|
| List of tuples (filename, file_content, pip_options) of all
requirements files.
The files will be in the following order | 
 | 
Source code in zenml/utils/pipeline_docker_image_builder.py
          @staticmethod
def gather_requirements_files(
    docker_settings: DockerSettings,
    stack: "Stack",
    code_repository: Optional["BaseCodeRepository"] = None,
    log: bool = True,
) -> List[Tuple[str, str, List[str]]]:
    """Gathers and/or generates pip requirements files.
    This method is called in `PipelineDockerImageBuilder.build_docker_image`
    but it is also called by other parts of the codebase, e.g. the
    `AzureMLStepOperator`, which needs to upload the requirements files to
    AzureML where the step image is then built.
    Args:
        docker_settings: Docker settings that specifies which
            requirements to install.
        stack: The stack on which the pipeline will run.
        code_repository: The code repository from which files will be
            downloaded.
        log: If True, will log the requirements.
    Raises:
        RuntimeError: If the command to export the local python packages
            failed.
        FileNotFoundError: If the specified requirements file does not
            exist.
    Returns:
        List of tuples (filename, file_content, pip_options) of all
        requirements files.
        The files will be in the following order:
        - Packages installed in the local Python environment
        - Requirements defined by stack integrations
        - Requirements defined by user integrations
        - User-defined requirements
    """
    requirements_files: List[Tuple[str, str, List[str]]] = []
    # Generate requirements file for the local environment if configured
    if docker_settings.replicate_local_python_environment:
        if isinstance(
            docker_settings.replicate_local_python_environment,
            PythonEnvironmentExportMethod,
        ):
            command = (
                docker_settings.replicate_local_python_environment.command
            )
        else:
            command = " ".join(
                docker_settings.replicate_local_python_environment
            )
        try:
            local_requirements = subprocess.check_output(
                command,
                shell=True,  # nosec
            ).decode()
        except subprocess.CalledProcessError as e:
            raise RuntimeError(
                "Unable to export local python packages."
            ) from e
        requirements_files.append(
            (".zenml_local_requirements", local_requirements, [])
        )
        if log:
            logger.info(
                "- Including python packages from local environment"
            )
    if docker_settings.install_stack_requirements:
        stack_requirements = stack.requirements()
        if code_repository:
            stack_requirements.update(code_repository.requirements)
        if stack_requirements:
            stack_requirements_list = sorted(stack_requirements)
            stack_requirements_file = "\n".join(stack_requirements_list)
            requirements_files.append(
                (
                    ".zenml_stack_integration_requirements",
                    stack_requirements_file,
                    [],
                )
            )
            if log:
                logger.info(
                    "- Including stack requirements: %s",
                    ", ".join(f"`{r}`" for r in stack_requirements_list),
                )
    # Generate requirements file for all required integrations
    integration_requirements = set(
        itertools.chain.from_iterable(
            integration_registry.select_integration_requirements(
                integration_name=integration,
                target_os=OperatingSystemType.LINUX,
            )
            for integration in docker_settings.required_integrations
        )
    )
    if integration_requirements:
        integration_requirements_list = sorted(integration_requirements)
        integration_requirements_file = "\n".join(
            integration_requirements_list
        )
        requirements_files.append(
            (
                ".zenml_integration_requirements",
                integration_requirements_file,
                [],
            )
        )
        if log:
            logger.info(
                "- Including integration requirements: %s",
                ", ".join(f"`{r}`" for r in integration_requirements_list),
            )
    # Generate/Read requirements file for user-defined requirements
    if isinstance(docker_settings.requirements, str):
        path = os.path.abspath(docker_settings.requirements)
        try:
            user_requirements = io_utils.read_file_contents_as_string(path)
        except FileNotFoundError as e:
            raise FileNotFoundError(
                f"Requirements file {path} does not exist."
            ) from e
        if log:
            logger.info(
                "- Including user-defined requirements from file `%s`",
                path,
            )
    elif isinstance(docker_settings.requirements, List):
        user_requirements = "\n".join(docker_settings.requirements)
        if log:
            logger.info(
                "- Including user-defined requirements: %s",
                ", ".join(f"`{r}`" for r in docker_settings.requirements),
            )
    else:
        user_requirements = None
    if user_requirements:
        requirements_files.append(
            (".zenml_user_requirements", user_requirements, [])
        )
    return requirements_files
        proxy_utils
    Proxy design pattern utils.
make_proxy_class(interface, attribute)
    Proxy class decorator.
Use this decorator to transform the decorated class into a proxy that
forwards all calls defined in the interface interface to the attribute
class attribute that implements the same interface.
This class is useful in cases where you need to have a base class that acts as a proxy or facade for one or more other classes. Both the decorated class and the class attribute must inherit from the same ABC interface for this to work. Only regular methods are supported, not class methods or attributes.
Example: Let's say you have an interface called BodyBuilder, a base class
called FatBob and another class called BigJim. BigJim implements the
BodyBuilder interface, but FatBob does not. And let's say you want
FatBob to look as if it implements the BodyBuilder interface, but in
fact it just forwards all calls to BigJim. You could do this:
from abc import ABC, abstractmethod
class BodyBuilder(ABC):
    @abstractmethod
    def build_body(self):
        pass
class BigJim(BodyBuilder):
    def build_body(self):
        print("Looks fit!")
class FatBob(BodyBuilder)
    def __init__(self):
        self.big_jim = BigJim()
    def build_body(self):
        self.big_jim.build_body()
fat_bob = FatBob()
fat_bob.build_body()
But this leads to a lot of boilerplate code with bigger interfaces and makes everything harder to maintain. This is where the proxy class decorator comes in handy. Here's how to use it:
from zenml.utils.proxy_utils import make_proxy_class
from typing import Optional
@make_proxy_class(BodyBuilder, "big_jim")
class FatBob(BodyBuilder)
    big_jim: Optional[BodyBuilder] = None
    def __init__(self):
        self.big_jim = BigJim()
fat_bob = FatBob()
fat_bob.build_body()
This is the same as implementing FatBob to call BigJim explicitly, but it has the advantage that you don't need to write a lot of boilerplate code of modify the FatBob class every time you change something in the BodyBuilder interface.
This proxy decorator also allows to extend classes dynamically at runtime:
if the attribute class attribute is set to None, the proxy class
will assume that the interface is not implemented by the class and will
raise a NotImplementedError:
@make_proxy_class(BodyBuilder, "big_jim")
class FatBob(BodyBuilder)
    big_jim: Optional[BodyBuilder] = None
    def __init__(self):
        self.big_jim = None
fat_bob = FatBob()
# Raises NotImplementedError, class not extended yet:
fat_bob.build_body()
fat_bob.big_jim = BigJim()
# Now it works:
fat_bob.build_body()
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| interface | Type[abc.ABC] | The interface to implement. | required | 
| attribute | str | The attribute of the base class to forward calls to. | required | 
Returns:
| Type | Description | 
|---|---|
| Callable[[~C], ~C] | The proxy class. | 
Source code in zenml/utils/proxy_utils.py
          def make_proxy_class(interface: Type[ABC], attribute: str) -> Callable[[C], C]:
    """Proxy class decorator.
    Use this decorator to transform the decorated class into a proxy that
    forwards all calls defined in the `interface` interface to the `attribute`
    class attribute that implements the same interface.
    This class is useful in cases where you need to have a base class that acts
    as a proxy or facade for one or more other classes. Both the decorated class
    and the class attribute must inherit from the same ABC interface for this to
    work. Only regular methods are supported, not class methods or attributes.
    Example: Let's say you have an interface called `BodyBuilder`, a base class
    called `FatBob` and another class called `BigJim`. `BigJim` implements the
    `BodyBuilder` interface, but `FatBob` does not. And let's say you want
    `FatBob` to look as if it implements the `BodyBuilder` interface, but in
    fact it just forwards all calls to `BigJim`. You could do this:
    ```python
    from abc import ABC, abstractmethod
    class BodyBuilder(ABC):
        @abstractmethod
        def build_body(self):
            pass
    class BigJim(BodyBuilder):
        def build_body(self):
            print("Looks fit!")
    class FatBob(BodyBuilder)
        def __init__(self):
            self.big_jim = BigJim()
        def build_body(self):
            self.big_jim.build_body()
    fat_bob = FatBob()
    fat_bob.build_body()
    ```
    But this leads to a lot of boilerplate code with bigger interfaces and
    makes everything harder to maintain. This is where the proxy class
    decorator comes in handy. Here's how to use it:
    ```python
    from zenml.utils.proxy_utils import make_proxy_class
    from typing import Optional
    @make_proxy_class(BodyBuilder, "big_jim")
    class FatBob(BodyBuilder)
        big_jim: Optional[BodyBuilder] = None
        def __init__(self):
            self.big_jim = BigJim()
    fat_bob = FatBob()
    fat_bob.build_body()
    ```
    This is the same as implementing FatBob to call BigJim explicitly, but it
    has the advantage that you don't need to write a lot of boilerplate code
    of modify the FatBob class every time you change something in the
    BodyBuilder interface.
    This proxy decorator also allows to extend classes dynamically at runtime:
    if the `attribute` class attribute is set to None, the proxy class
    will assume that the interface is not implemented by the class and will
    raise a NotImplementedError:
    ```python
    @make_proxy_class(BodyBuilder, "big_jim")
    class FatBob(BodyBuilder)
        big_jim: Optional[BodyBuilder] = None
        def __init__(self):
            self.big_jim = None
    fat_bob = FatBob()
    # Raises NotImplementedError, class not extended yet:
    fat_bob.build_body()
    fat_bob.big_jim = BigJim()
    # Now it works:
    fat_bob.build_body()
    ```
    Args:
        interface: The interface to implement.
        attribute: The attribute of the base class to forward calls to.
    Returns:
        The proxy class.
    """
    def make_proxy_method(cls: C, _method: F) -> F:
        """Proxy method decorator.
        Used to transform a method into a proxy that forwards all calls to the
        given class attribute.
        Args:
            cls: The class to use as the base.
            _method: The method to replace.
        Returns:
            The proxy method.
        """
        @wraps(_method)
        def proxy_method(*args: Any, **kw: Any) -> Any:
            """Proxy method.
            Args:
                *args: The arguments to pass to the method.
                **kw: The keyword arguments to pass to the method.
            Returns:
                The return value of the proxied method.
            Raises:
                TypeError: If the class does not have the attribute specified
                    in the decorator or if the attribute does not implement
                    the specified interface.
                NotImplementedError: If the attribute specified in the
                    decorator is None, i.e. the interface is not implemented.
            """
            self = args[0]
            if not hasattr(self, attribute):
                raise TypeError(
                    f"Class '{cls.__name__}' does not have a '{attribute}' "
                    f"as specified in the 'make_proxy_class' decorator."
                )
            proxied_obj = getattr(self, attribute)
            if proxied_obj is None:
                raise NotImplementedError(
                    f"This '{cls.__name__}' instance does not implement the "
                    f"'{interface.__name__}' interface."
                )
            if not isinstance(proxied_obj, interface):
                raise TypeError(
                    f"Interface '{interface.__name__}' must be implemented by "
                    f"the '{cls.__name__}' '{attribute}' attribute."
                )
            proxied_method = getattr(proxied_obj, _method.__name__)
            return proxied_method(*args[1:], **kw)
        return cast(F, proxy_method)
    def _inner_decorator(_cls: C) -> C:
        """Inner proxy class decorator.
        Args:
            _cls: The class to decorate.
        Returns:
            The decorated class.
        Raises:
            TypeError: If the decorated class does not implement the specified
                interface.
        """
        if not issubclass(_cls, interface):
            raise TypeError(
                f"Interface '{interface.__name__}' must be implemented by "
                f"the '{_cls.__name__}' class."
            )
        for method_name in interface.__abstractmethods__:
            original_method = getattr(_cls, method_name)
            method_proxy = make_proxy_method(_cls, original_method)
            # Make sure the proxy method is not considered abstract.
            method_proxy.__isabstractmethod__ = False
            setattr(_cls, method_name, method_proxy)
        # Remove the abstract methods in the interface from the decorated class.
        _cls.__abstractmethods__ = frozenset(
            method_name
            for method_name in _cls.__abstractmethods__
            if method_name not in interface.__abstractmethods__
        )
        return cast(C, _cls)
    return _inner_decorator
        pydantic_utils
    Utilities for pydantic models.
        
TemplateGenerator        
    Class to generate templates for pydantic models or classes.
Source code in zenml/utils/pydantic_utils.py
          class TemplateGenerator:
    """Class to generate templates for pydantic models or classes."""
    def __init__(
        self, instance_or_class: Union[BaseModel, Type[BaseModel]]
    ) -> None:
        """Initializes the template generator.
        Args:
            instance_or_class: The pydantic model or model class for which to
                generate a template.
        """
        self.instance_or_class = instance_or_class
    def run(self) -> Dict[str, Any]:
        """Generates the template.
        Returns:
            The template dictionary.
        """
        if isinstance(self.instance_or_class, BaseModel):
            template = self._generate_template_for_model(
                self.instance_or_class
            )
        else:
            template = self._generate_template_for_model_class(
                self.instance_or_class
            )
        # Convert to json in an intermediate step, so we can leverage Pydantic's
        # encoder to support types like UUID and datetime
        json_string = json.dumps(template, default=pydantic_encoder)
        return cast(Dict[str, Any], json.loads(json_string))
    def _generate_template_for_model(self, model: BaseModel) -> Dict[str, Any]:
        """Generates a template for a pydantic model.
        Args:
            model: The model for which to generate the template.
        Returns:
            The model template.
        """
        template = self._generate_template_for_model_class(model.__class__)
        for name in model.model_fields_set:
            value = getattr(model, name)
            template[name] = self._generate_template_for_value(value)
        return template
    def _generate_template_for_model_class(
        self,
        model_class: Type[BaseModel],
    ) -> Dict[str, Any]:
        """Generates a template for a pydantic model class.
        Args:
            model_class: The model class for which to generate the template.
        Returns:
            The model class template.
        """
        template: Dict[str, Any] = {}
        for name, field in model_class.model_fields.items():
            annotation = field.annotation
            if annotation is not None:
                if self._is_model_class(annotation):
                    template[name] = self._generate_template_for_model_class(
                        annotation
                    )
                elif typing_utils.is_optional(
                    annotation
                ) and self._is_model_class(
                    typing_utils.get_args(annotation)[0]
                ):
                    template[name] = self._generate_template_for_model_class(
                        typing_utils.get_args(annotation)[0]
                    )
                else:
                    template[name] = pydantic_repr.display_as_type(annotation)
        return template
    def _generate_template_for_value(self, value: Any) -> Any:
        """Generates a template for an arbitrary value.
        Args:
            value: The value for which to generate the template.
        Returns:
            The value template.
        """
        if isinstance(value, Dict):
            return {
                k: self._generate_template_for_value(v)
                for k, v in value.items()
            }
        elif sequence_like(value):
            return [self._generate_template_for_value(v) for v in value]
        elif isinstance(value, BaseModel):
            return self._generate_template_for_model(value)
        else:
            return value
    @staticmethod
    def _is_model_class(value: Any) -> bool:
        """Checks if the given value is a pydantic model class.
        Args:
            value: The value to check.
        Returns:
            If the value is a pydantic model class.
        """
        return isinstance(value, type) and issubclass(value, BaseModel)
__init__(self, instance_or_class)
  
      special
  
    Initializes the template generator.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| instance_or_class | Union[pydantic.main.BaseModel, Type[pydantic.main.BaseModel]] | The pydantic model or model class for which to generate a template. | required | 
Source code in zenml/utils/pydantic_utils.py
          def __init__(
    self, instance_or_class: Union[BaseModel, Type[BaseModel]]
) -> None:
    """Initializes the template generator.
    Args:
        instance_or_class: The pydantic model or model class for which to
            generate a template.
    """
    self.instance_or_class = instance_or_class
run(self)
    Generates the template.
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | The template dictionary. | 
Source code in zenml/utils/pydantic_utils.py
          def run(self) -> Dict[str, Any]:
    """Generates the template.
    Returns:
        The template dictionary.
    """
    if isinstance(self.instance_or_class, BaseModel):
        template = self._generate_template_for_model(
            self.instance_or_class
        )
    else:
        template = self._generate_template_for_model_class(
            self.instance_or_class
        )
    # Convert to json in an intermediate step, so we can leverage Pydantic's
    # encoder to support types like UUID and datetime
    json_string = json.dumps(template, default=pydantic_encoder)
    return cast(Dict[str, Any], json.loads(json_string))
        
YAMLSerializationMixin            (BaseModel)
        
    Class to serialize/deserialize pydantic models to/from YAML.
Source code in zenml/utils/pydantic_utils.py
          class YAMLSerializationMixin(BaseModel):
    """Class to serialize/deserialize pydantic models to/from YAML."""
    def yaml(self, sort_keys: bool = False, **kwargs: Any) -> str:
        """YAML string representation..
        Args:
            sort_keys: Whether to sort the keys in the YAML representation.
            **kwargs: Kwargs to pass to the pydantic model_dump(...) method.
        Returns:
            YAML string representation.
        """
        dict_ = json.loads(
            json.dumps(
                self.model_dump(mode="json", **kwargs), sort_keys=sort_keys
            )
        )
        return yaml.dump(dict_, sort_keys=sort_keys)
    @classmethod
    def from_yaml(cls: Type[M], path: str) -> M:
        """Creates an instance from a YAML file.
        Args:
            path: Path to a YAML file.
        Returns:
            The model instance.
        """
        dict_ = yaml_utils.read_yaml(path)
        return cls.model_validate(dict_)
from_yaml(path)
  
      classmethod
  
    Creates an instance from a YAML file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | Path to a YAML file. | required | 
Returns:
| Type | Description | 
|---|---|
| ~M | The model instance. | 
Source code in zenml/utils/pydantic_utils.py
          @classmethod
def from_yaml(cls: Type[M], path: str) -> M:
    """Creates an instance from a YAML file.
    Args:
        path: Path to a YAML file.
    Returns:
        The model instance.
    """
    dict_ = yaml_utils.read_yaml(path)
    return cls.model_validate(dict_)
yaml(self, sort_keys=False, **kwargs)
    YAML string representation..
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| sort_keys | bool | Whether to sort the keys in the YAML representation. | False | 
| **kwargs | Any | Kwargs to pass to the pydantic model_dump(...) method. | {} | 
Returns:
| Type | Description | 
|---|---|
| str | YAML string representation. | 
Source code in zenml/utils/pydantic_utils.py
          def yaml(self, sort_keys: bool = False, **kwargs: Any) -> str:
    """YAML string representation..
    Args:
        sort_keys: Whether to sort the keys in the YAML representation.
        **kwargs: Kwargs to pass to the pydantic model_dump(...) method.
    Returns:
        YAML string representation.
    """
    dict_ = json.loads(
        json.dumps(
            self.model_dump(mode="json", **kwargs), sort_keys=sort_keys
        )
    )
    return yaml.dump(dict_, sort_keys=sort_keys)
before_validator_handler(method)
    Decorator to handle the raw input data for pydantic model validators.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| method | Callable[..., Any] | the class method with the actual validation logic. | required | 
Returns:
| Type | Description | 
|---|---|
| Callable[[Any, Any, Any], Any] | the validator method | 
Source code in zenml/utils/pydantic_utils.py
          def before_validator_handler(
    method: Callable[..., Any],
) -> Callable[[Any, Any, Any], Any]:
    """Decorator to handle the raw input data for pydantic model validators.
    Args:
        method: the class method with the actual validation logic.
    Returns:
        the validator method
    """
    def before_validator(
        cls: Type[BaseModel], data: Any, validation_info: ValidationInfo
    ) -> Any:
        """Wrapper method to handle the raw data.
        Args:
            cls: the class handler
            data: the raw input data
            validation_info: the context of the validation.
        Returns:
            the validated data
        """
        data = model_validator_data_handler(
            raw_data=data, base_class=cls, validation_info=validation_info
        )
        return method(cls=cls, data=data)
    return before_validator
has_validators(pydantic_class, field_name=None)
    Function to check if a Pydantic model or a pydantic field has validators.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pydantic_class | Type[pydantic.main.BaseModel] | The class defining the pydantic model. | required | 
| field_name | Optional[str] | Optional, field info. If specified, this function will focus on a singular field within the class. If not specified, it will check model validators. | None | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the specified field or class has a validator or not. | 
Source code in zenml/utils/pydantic_utils.py
          def has_validators(
    pydantic_class: Type[BaseModel],
    field_name: Optional[str] = None,
) -> bool:
    """Function to check if a Pydantic model or a pydantic field has validators.
    Args:
        pydantic_class: The class defining the pydantic model.
        field_name: Optional, field info. If specified, this function will focus
            on a singular field within the class. If not specified, it will
            check model validators.
    Returns:
        Whether the specified field or class has a validator or not.
    """
    # If field is not specified check model validators
    if field_name is None:
        if pydantic_class.__pydantic_decorators__.model_validators:
            return True
    # Else, check field validators
    else:
        # 1. Field validators can be defined through @field_validator decorators
        f_validators = pydantic_class.__pydantic_decorators__.field_validators
        for name, f_v in f_validators.items():
            if field_name in f_v.info.fields:
                return True
        # 2. Field validators can be defined through the Annotation[.....]
        field_info = pydantic_class.model_fields[field_name]
        if metadata := field_info.metadata:
            if any(
                isinstance(
                    m,
                    (
                        AfterValidator,
                        BeforeValidator,
                        PlainValidator,
                        WrapValidator,
                    ),
                )
                for m in metadata
            ):
                return True
    return False
model_validator_data_handler(raw_data, base_class, validation_info)
    Utility function to parse raw input data of varying types to a dict.
With the change to pydantic v2, validators which operate with "before" (or previously known as the "pre" parameter) are getting "Any" types of raw input instead of a "Dict[str, Any]" as before. Depending on the use-case, this can create conflicts after the migration and this function will be used as a helper function to handle different types of raw input data.
A code snippet to showcase how the behaviour changes. The "before" validator prints the type of the input:
class Base(BaseModel):
    a: int = 3
class MyClass(Base):
    @model_validator(mode="before")
    @classmethod
    def before_validator(cls, data: Any) -> Any:
        print(type(data))
        return {}
one = MyClass() # prints "<class 'dict'>"
MyClass.model_validate(one)  # prints NOTHING, it is already validated
MyClass.model_validate("asdf")  # prints "<class 'str'>", fails without the modified return.
MyClass.model_validate(RandomClass())  # prints "<class 'RandomClass'>", fails without the modified return.
MyClass.model_validate(Base())  # prints "<class 'Base'>", fails without the modified return.
MyClass.model_validate_json(json.dumps("aria"))  # prints "<class 'str'>", fails without the modified return.
MyClass.model_validate_json(json.dumps([1]))  # prints "<class 'list'>", fails without the modified return.
MyClass.model_validate_json(one.model_dump_json())  # prints "<class 'dict'>"
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| raw_data | Any | The raw data passed to the validator, can be "Any" type. | required | 
| base_class | Type[pydantic.main.BaseModel] | The class that the validator belongs to | required | 
| validation_info | ValidationInfo | Extra information about the validation process. | required | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | if the type of the data is not processable. | 
| ValueError | in case of an unknown validation mode. | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | A dictionary which will be passed to the eventual validator of pydantic. | 
Source code in zenml/utils/pydantic_utils.py
          def model_validator_data_handler(
    raw_data: Any,
    base_class: Type[BaseModel],
    validation_info: ValidationInfo,
) -> Dict[str, Any]:
    """Utility function to parse raw input data of varying types to a dict.
    With the change to pydantic v2, validators which operate with "before"
    (or previously known as the "pre" parameter) are getting "Any" types of raw
    input instead of a "Dict[str, Any]" as before. Depending on the use-case,
    this can create conflicts after the migration and this function will be
    used as a helper function to handle different types of raw input data.
    A code snippet to showcase how the behaviour changes. The "before" validator
    prints the type of the input:
        class Base(BaseModel):
            a: int = 3
        class MyClass(Base):
            @model_validator(mode="before")
            @classmethod
            def before_validator(cls, data: Any) -> Any:
                print(type(data))
                return {}
        one = MyClass() # prints "<class 'dict'>"
        MyClass.model_validate(one)  # prints NOTHING, it is already validated
        MyClass.model_validate("asdf")  # prints "<class 'str'>", fails without the modified return.
        MyClass.model_validate(RandomClass())  # prints "<class 'RandomClass'>", fails without the modified return.
        MyClass.model_validate(Base())  # prints "<class 'Base'>", fails without the modified return.
        MyClass.model_validate_json(json.dumps("aria"))  # prints "<class 'str'>", fails without the modified return.
        MyClass.model_validate_json(json.dumps([1]))  # prints "<class 'list'>", fails without the modified return.
        MyClass.model_validate_json(one.model_dump_json())  # prints "<class 'dict'>"
    Args:
        raw_data: The raw data passed to the validator, can be "Any" type.
        base_class: The class that the validator belongs to
        validation_info: Extra information about the validation process.
    Raises:
        TypeError: if the type of the data is not processable.
        ValueError: in case of an unknown validation mode.
    Returns:
        A dictionary which will be passed to the eventual validator of pydantic.
    """
    if validation_info.mode == "python":
        # This is mode is only active if people validate objects using pythonic
        # raw data such as MyClass(...) or MyClass.model_validate()
        if isinstance(raw_data, dict):
            # In most cases, this is the behaviour as the raw input is a dict
            return raw_data
        elif isinstance(raw_data, base_class):
            # In some cases, we pass the same object type to the validation
            # in such cases, it is critical we keep the original structure of
            # fields that are already set.
            return dict(raw_data)
        elif issubclass(base_class, raw_data.__class__):
            # There are a few occurrences where the annotation of the field is
            # denoted by a subclass, and we use the instance of its super class
            # as the raw input. In such cases we will use the same approach as
            # before, while raising a debug message.
            logger.debug(
                f"During the validation of a `{base_class}` object, an instance"
                f"of `{raw_data.__class__}` (super class of `{base_class}`) "
                f"has been passed as raw input. This might lead to unexpected "
                f"behaviour in case `{base_class}` have features which can not"
                f"be extracted from an instance of a `{raw_data.__class__}`."
            )
            return dict(raw_data)
        elif isinstance(raw_data, str):
            # If the raw input is a raw string, we can try to use the `json`
            # module to parse it. The resulting data needs to be a proper
            # dict for us to pass it to the validation process.
            try:
                json_data = json.loads(raw_data)
                if isinstance(json_data, dict):
                    return json_data
                else:
                    raise TypeError("The resulting json data is not a dict!")
            except (TypeError, JSONDecodeError) as e:
                raise TypeError(
                    "The raw json input string can not be converted to a "
                    f"dict: {e}"
                )
        else:
            raise TypeError(
                "Unsupported type of raw input data for the `python` validation"
                "mode of the pydantic class. Please consider changing the way "
                f"you are creating using the `{base_class}` or instead use"
                f"`{base_class}.model_validate_json()`."
            )
    elif validation_info.mode == "json":
        # This is mode is only active if people validate objects using json
        # input data such as MyClass.model_validate_json()
        if isinstance(raw_data, dict):
            return raw_data
        else:
            raise TypeError(
                f"The resulting JSON data {raw_data} is not a dict, therefore"
                f"can not be used by the validation process."
            )
    else:
        # Unknown validation mode
        raise ValueError(f"Unknown validation mode. {validation_info.mode}")
update_model(original, update, recursive=True, exclude_none=False)
    Updates a pydantic model.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| original | ~M | The model to update. | required | 
| update | Union[BaseModel, Dict[str, Any]] | The update values. | required | 
| recursive | bool | If  | True | 
| exclude_none | bool | If  | False | 
Returns:
| Type | Description | 
|---|---|
| ~M | The updated model. | 
Source code in zenml/utils/pydantic_utils.py
          def update_model(
    original: M,
    update: Union["BaseModel", Dict[str, Any]],
    recursive: bool = True,
    exclude_none: bool = False,
) -> M:
    """Updates a pydantic model.
    Args:
        original: The model to update.
        update: The update values.
        recursive: If `True`, dictionary values will be updated recursively.
        exclude_none: If `True`, `None` values in the update will be removed.
    Returns:
        The updated model.
    """
    if isinstance(update, Dict):
        if exclude_none:
            update_dict = dict_utils.remove_none_values(
                update, recursive=recursive
            )
        else:
            update_dict = update
    else:
        update_dict = update.model_dump(
            exclude_unset=True, exclude_none=exclude_none
        )
    original_dict = original.model_dump(exclude_unset=True)
    if recursive:
        values = dict_utils.recursive_update(original_dict, update_dict)
    else:
        values = {**original_dict, **update_dict}
    return original.__class__.model_validate(values)
validate_function_args(__func, __config, *args, **kwargs)
    Validates arguments passed to a function.
This function validates that all arguments to call the function exist and that the types match.
It raises a pydantic.ValidationError if the validation fails.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| __func | Callable[..., Any] | The function for which the arguments are passed. | required | 
| __config | Optional[pydantic.config.ConfigDict] | The pydantic config for the underlying model that is created to validate the types of the arguments. | required | 
| *args | Any | Function arguments. | () | 
| **kwargs | Any | Function keyword arguments. | {} | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, Any] | The validated arguments. | 
Source code in zenml/utils/pydantic_utils.py
          def validate_function_args(
    __func: Callable[..., Any],
    __config: Optional[ConfigDict],
    *args: Any,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Validates arguments passed to a function.
    This function validates that all arguments to call the function exist and
    that the types match.
    It raises a pydantic.ValidationError if the validation fails.
    Args:
        __func: The function for which the arguments are passed.
        __config: The pydantic config for the underlying model that is created
            to validate the types of the arguments.
        *args: Function arguments.
        **kwargs: Function keyword arguments.
    Returns:
        The validated arguments.
    """
    signature = inspect.signature(__func)
    validated_args = ()
    validated_kwargs = {}
    def f(*args: Any, **kwargs: Dict[Any, Any]) -> None:
        nonlocal validated_args
        nonlocal validated_kwargs
        validated_args = args
        validated_kwargs = kwargs
    # We create a dummy function with the original function signature to run
    # pydantic validation without actually running the function code
    f.__signature__ = signature  # type: ignore[attr-defined]
    f.__annotations__ = __func.__annotations__
    validated_function = validate_call(config=__config, validate_return=False)(
        f
    )
    # This raises a pydantic.ValidatonError in case the arguments are not valid
    validated_function(*args, **kwargs)
    return signature.bind(*validated_args, **validated_kwargs).arguments
        secret_utils
    Utility functions for secrets and secret references.
        
SecretReference            (tuple)
        
    Class representing a secret reference.
Attributes:
| Name | Type | Description | 
|---|---|---|
| name | str | The secret name. | 
| key | str | The secret key. | 
Source code in zenml/utils/secret_utils.py
          class SecretReference(NamedTuple):
    """Class representing a secret reference.
    Attributes:
        name: The secret name.
        key: The secret key.
    """
    name: str
    key: str
__getnewargs__(self)
  
      special
  
    Return self as a plain tuple. Used by copy and pickle.
Source code in zenml/utils/secret_utils.py
          def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, name, key)
  
      special
      staticmethod
  
    Create new instance of SecretReference(name, key)
__repr__(self)
  
      special
  
    Return a nicely formatted representation string
Source code in zenml/utils/secret_utils.py
          def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
ClearTextField(*args, **kwargs)
    Marks a pydantic field to prevent secret references.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Positional arguments which will be forwarded
to  | () | 
| **kwargs | Any | Keyword arguments which will be forwarded to
 | {} | 
Returns:
| Type | Description | 
|---|---|
| Any | Pydantic field info. | 
Source code in zenml/utils/secret_utils.py
          def ClearTextField(*args: Any, **kwargs: Any) -> Any:
    """Marks a pydantic field to prevent secret references.
    Args:
        *args: Positional arguments which will be forwarded
            to `pydantic.Field(...)`.
        **kwargs: Keyword arguments which will be forwarded to
            `pydantic.Field(...)`.
    Returns:
        Pydantic field info.
    """
    json_schema_extra = kwargs.get("json_schema_extra", {})
    json_schema_extra.update({PYDANTIC_CLEAR_TEXT_FIELD_MARKER: True})
    return Field(json_schema_extra=json_schema_extra, *args, **kwargs)  # type: ignore[pydantic-field]
SecretField(*args, **kwargs)
    Marks a pydantic field as something containing sensitive information.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Positional arguments which will be forwarded
to  | () | 
| **kwargs | Any | Keyword arguments which will be forwarded to
 | {} | 
Returns:
| Type | Description | 
|---|---|
| Any | Pydantic field info. | 
Source code in zenml/utils/secret_utils.py
          def SecretField(*args: Any, **kwargs: Any) -> Any:
    """Marks a pydantic field as something containing sensitive information.
    Args:
        *args: Positional arguments which will be forwarded
            to `pydantic.Field(...)`.
        **kwargs: Keyword arguments which will be forwarded to
            `pydantic.Field(...)`.
    Returns:
        Pydantic field info.
    """
    json_schema_extra = kwargs.get("json_schema_extra", {})
    json_schema_extra.update({PYDANTIC_SENSITIVE_FIELD_MARKER: True})
    return Field(json_schema_extra=json_schema_extra, *args, **kwargs)  # type: ignore[pydantic-field]
is_clear_text_field(field)
    Returns whether a pydantic field prevents secret references or not.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| field | FieldInfo | The field to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 
 | 
Source code in zenml/utils/secret_utils.py
          def is_clear_text_field(field: "FieldInfo") -> bool:
    """Returns whether a pydantic field prevents secret references or not.
    Args:
        field: The field to check.
    Returns:
        `True` if the field prevents secret references, `False` otherwise.
    """
    if field.json_schema_extra is not None:
        if isinstance(field.json_schema_extra, dict):
            if marker := field.json_schema_extra.get(
                PYDANTIC_CLEAR_TEXT_FIELD_MARKER
            ):
                assert isinstance(marker, bool), (
                    f"The parameter `{PYDANTIC_CLEAR_TEXT_FIELD_MARKER}` in the "
                    f"field definition can only be a boolean value."
                )
                return marker
        else:
            logger.warning(
                f"The 'json_schema_extra' of the field '{field.title}' is "
                "not defined as a dict. This might lead to unexpected "
                "behaviour as we are checking it is a clear text field. "
                "Returning 'False' as default..."
            )
    return False
is_secret_field(field)
    Returns whether a pydantic field contains sensitive information or not.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| field | FieldInfo | The field to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 
 | 
Source code in zenml/utils/secret_utils.py
          def is_secret_field(field: "FieldInfo") -> bool:
    """Returns whether a pydantic field contains sensitive information or not.
    Args:
        field: The field to check.
    Returns:
        `True` if the field contains sensitive information, `False` otherwise.
    """
    if field.json_schema_extra is not None:
        if isinstance(field.json_schema_extra, dict):
            if marker := field.json_schema_extra.get(
                PYDANTIC_SENSITIVE_FIELD_MARKER
            ):
                assert isinstance(marker, bool), (
                    f"The parameter `{PYDANTIC_SENSITIVE_FIELD_MARKER}` in the "
                    f"field definition can only be a boolean value."
                )
                return marker
        else:
            logger.warning(
                f"The 'json_schema_extra' of the field '{field.title}' is "
                "not defined as a dict. This might lead to unexpected "
                "behaviour as we are checking it is a secret text field. "
                "Returning 'False' as default..."
            )
    return False
is_secret_reference(value)
    Checks whether any value is a secret reference.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| value | Any | The value to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 
 | 
Source code in zenml/utils/secret_utils.py
          def is_secret_reference(value: Any) -> bool:
    """Checks whether any value is a secret reference.
    Args:
        value: The value to check.
    Returns:
        `True` if the value is a secret reference, `False` otherwise.
    """
    if not isinstance(value, str):
        return False
    return bool(_secret_reference_expression.fullmatch(value))
parse_secret_reference(reference)
    Parses a secret reference.
This function assumes the input string is a valid secret reference and does not perform any additional checks. If you pass an invalid secret reference here, this will most likely crash.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| reference | str | The string representing a valid secret reference. | required | 
Returns:
| Type | Description | 
|---|---|
| SecretReference | The parsed secret reference. | 
Source code in zenml/utils/secret_utils.py
          def parse_secret_reference(reference: str) -> SecretReference:
    """Parses a secret reference.
    This function assumes the input string is a valid secret reference and
    **does not** perform any additional checks. If you pass an invalid secret
    reference here, this will most likely crash.
    Args:
        reference: The string representing a **valid** secret reference.
    Returns:
        The parsed secret reference.
    """
    reference = reference[2:]
    reference = reference[:-2]
    secret_name, secret_key = reference.split(".", 1)
    secret_name, secret_key = secret_name.strip(), secret_key.strip()
    return SecretReference(name=secret_name, key=secret_key)
        settings_utils
    Utility functions for ZenML settings.
get_flavor_setting_key(flavor)
    Gets the setting key for a flavor.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| flavor | Flavor | The flavor for which to get the key. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The setting key for the flavor. | 
Source code in zenml/utils/settings_utils.py
          def get_flavor_setting_key(flavor: "Flavor") -> str:
    """Gets the setting key for a flavor.
    Args:
        flavor: The flavor for which to get the key.
    Returns:
        The setting key for the flavor.
    """
    return f"{flavor.type}.{flavor.name}"
get_general_settings()
    Returns all general settings.
Returns:
| Type | Description | 
|---|---|
| Dict[str, Type[BaseSettings]] | Dictionary mapping general settings keys to their type. | 
Source code in zenml/utils/settings_utils.py
          def get_general_settings() -> Dict[str, Type["BaseSettings"]]:
    """Returns all general settings.
    Returns:
        Dictionary mapping general settings keys to their type.
    """
    from zenml.config import DockerSettings, ResourceSettings
    return {
        DOCKER_SETTINGS_KEY: DockerSettings,
        RESOURCE_SETTINGS_KEY: ResourceSettings,
    }
get_stack_component_for_settings_key(key, stack)
    Gets the stack component of a stack for a given settings key.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | str | The settings key for which to get the component. | required | 
| stack | Stack | The stack from which to get the component. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If the key is invalid or the stack does not contain a component of the correct flavor. | 
Returns:
| Type | Description | 
|---|---|
| StackComponent | The stack component. | 
Source code in zenml/utils/settings_utils.py
          def get_stack_component_for_settings_key(
    key: str, stack: "Stack"
) -> "StackComponent":
    """Gets the stack component of a stack for a given settings key.
    Args:
        key: The settings key for which to get the component.
        stack: The stack from which to get the component.
    Raises:
        ValueError: If the key is invalid or the stack does not contain a
            component of the correct flavor.
    Returns:
        The stack component.
    """
    if not is_stack_component_setting_key(key):
        raise ValueError(
            f"Settings key {key} does not refer to a stack component."
        )
    component_type, flavor = key.split(".", 1)
    stack_component = stack.components.get(StackComponentType(component_type))
    if not stack_component or stack_component.flavor != flavor:
        raise ValueError(
            f"Component of type {component_type} in stack {stack} is not "
            f"of the flavor {flavor} specified by the settings key {key}."
        )
    return stack_component
get_stack_component_setting_key(stack_component)
    Gets the setting key for a stack component.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| stack_component | StackComponent | The stack component for which to get the key. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The setting key for the stack component. | 
Source code in zenml/utils/settings_utils.py
          def get_stack_component_setting_key(stack_component: "StackComponent") -> str:
    """Gets the setting key for a stack component.
    Args:
        stack_component: The stack component for which to get the key.
    Returns:
        The setting key for the stack component.
    """
    return f"{stack_component.type}.{stack_component.flavor}"
is_general_setting_key(key)
    Checks whether the key refers to a general setting.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | str | The key to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | If the key refers to a general setting. | 
Source code in zenml/utils/settings_utils.py
          def is_general_setting_key(key: str) -> bool:
    """Checks whether the key refers to a general setting.
    Args:
        key: The key to check.
    Returns:
        If the key refers to a general setting.
    """
    return key in get_general_settings()
is_stack_component_setting_key(key)
    Checks whether a settings key refers to a stack component.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | str | The key to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | If the key refers to a stack component. | 
Source code in zenml/utils/settings_utils.py
          def is_stack_component_setting_key(key: str) -> bool:
    """Checks whether a settings key refers to a stack component.
    Args:
        key: The key to check.
    Returns:
        If the key refers to a stack component.
    """
    return bool(STACK_COMPONENT_REGEX.fullmatch(key))
is_valid_setting_key(key)
    Checks whether a settings key is valid.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | str | The key to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | If the key is valid. | 
Source code in zenml/utils/settings_utils.py
          def is_valid_setting_key(key: str) -> bool:
    """Checks whether a settings key is valid.
    Args:
        key: The key to check.
    Returns:
        If the key is valid.
    """
    return is_general_setting_key(key) or is_stack_component_setting_key(key)
validate_setting_keys(setting_keys)
    Validates settings keys.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| setting_keys | Sequence[str] | The keys to validate. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If any key is invalid. | 
Source code in zenml/utils/settings_utils.py
          def validate_setting_keys(setting_keys: Sequence[str]) -> None:
    """Validates settings keys.
    Args:
        setting_keys: The keys to validate.
    Raises:
        ValueError: If any key is invalid.
    """
    for key in setting_keys:
        if not is_valid_setting_key(key):
            raise ValueError(
                f"Invalid setting key `{key}`. Setting keys can either refer "
                "to general settings (available keys: "
                f"{set(get_general_settings())}) or stack component specific "
                "settings. Stack component specific keys are of the format "
                "`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
            )
        singleton
    Utility class to turn classes into singleton classes.
        
SingletonMetaClass            (type)
        
    Singleton metaclass.
Use this metaclass to make any class into a singleton class:
class OneRing(metaclass=SingletonMetaClass):
    def __init__(self, owner):
        self._owner = owner
    @property
    def owner(self):
        return self._owner
the_one_ring = OneRing('Sauron')
the_lost_ring = OneRing('Frodo')
print(the_lost_ring.owner)  # Sauron
OneRing._clear() # ring destroyed
Source code in zenml/utils/singleton.py
          class SingletonMetaClass(type):
    """Singleton metaclass.
    Use this metaclass to make any class into a singleton class:
    ```python
    class OneRing(metaclass=SingletonMetaClass):
        def __init__(self, owner):
            self._owner = owner
        @property
        def owner(self):
            return self._owner
    the_one_ring = OneRing('Sauron')
    the_lost_ring = OneRing('Frodo')
    print(the_lost_ring.owner)  # Sauron
    OneRing._clear() # ring destroyed
    ```
    """
    def __init__(cls, *args: Any, **kwargs: Any) -> None:
        """Initialize a singleton class.
        Args:
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(*args, **kwargs)
        cls.__singleton_instance: Optional["SingletonMetaClass"] = None
    def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
        """Create or return the singleton instance.
        Args:
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        Returns:
            The singleton instance.
        """
        if not cls.__singleton_instance:
            cls.__singleton_instance = cast(
                "SingletonMetaClass", super().__call__(*args, **kwargs)
            )
        return cls.__singleton_instance
    def _clear(cls) -> None:
        """Clear the singleton instance."""
        cls.__singleton_instance = None
    def _exists(cls) -> bool:
        """Check if the singleton instance exists.
        Returns:
            `True` if the singleton instance exists, `False` otherwise.
        """
        return cls.__singleton_instance is not None
__call__(cls, *args, **kwargs)
  
      special
  
    Create or return the singleton instance.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Additional arguments. | () | 
| **kwargs | Any | Additional keyword arguments. | {} | 
Returns:
| Type | Description | 
|---|---|
| SingletonMetaClass | The singleton instance. | 
Source code in zenml/utils/singleton.py
          def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
    """Create or return the singleton instance.
    Args:
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    Returns:
        The singleton instance.
    """
    if not cls.__singleton_instance:
        cls.__singleton_instance = cast(
            "SingletonMetaClass", super().__call__(*args, **kwargs)
        )
    return cls.__singleton_instance
__init__(cls, *args, **kwargs)
  
      special
  
    Initialize a singleton class.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| *args | Any | Additional arguments. | () | 
| **kwargs | Any | Additional keyword arguments. | {} | 
Source code in zenml/utils/singleton.py
          def __init__(cls, *args: Any, **kwargs: Any) -> None:
    """Initialize a singleton class.
    Args:
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    cls.__singleton_instance: Optional["SingletonMetaClass"] = None
        source_code_utils
    Utilities for getting the source code of objects.
get_hashed_source_code(value)
    Returns a hash of the objects source code.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| value | Any | object to get source from. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Hash of source code. | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | If unable to compute the hash. | 
Source code in zenml/utils/source_code_utils.py
          def get_hashed_source_code(value: Any) -> str:
    """Returns a hash of the objects source code.
    Args:
        value: object to get source from.
    Returns:
        Hash of source code.
    Raises:
        TypeError: If unable to compute the hash.
    """
    try:
        source_code = get_source_code(value)
    except TypeError:
        raise TypeError(
            f"Unable to compute the hash of source code of object: {value}."
        )
    return hashlib.sha256(source_code.encode("utf-8")).hexdigest()
get_source_code(value)
    Returns the source code of an object.
If executing within a IPython kernel environment, then this monkey-patches
inspect module temporarily with a workaround to get source from the cell.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| value | Any | object to get source from. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Source code of object. | 
Source code in zenml/utils/source_code_utils.py
          def get_source_code(value: Any) -> str:
    """Returns the source code of an object.
    If executing within a IPython kernel environment, then this monkey-patches
    `inspect` module temporarily with a workaround to get source from the cell.
    Args:
        value: object to get source from.
    Returns:
        Source code of object.
    """
    if Environment.in_notebook():
        # Monkey patch inspect.getfile temporarily to make getsource work.
        # Source: https://stackoverflow.com/questions/51566497/
        def _new_getfile(
            object: Any,
            _old_getfile: Callable[
                [
                    Union[
                        ModuleType,
                        Type[Any],
                        MethodType,
                        FunctionType,
                        TracebackType,
                        FrameType,
                        CodeType,
                        Callable[..., Any],
                    ]
                ],
                str,
            ] = inspect.getfile,
        ) -> Any:
            if not inspect.isclass(object):
                return _old_getfile(object)
            # Lookup by parent module (as in current inspect)
            if hasattr(object, "__module__"):
                object_ = sys.modules.get(object.__module__)
                if hasattr(object_, "__file__"):
                    return object_.__file__  # type: ignore[union-attr]
            # If parent module is __main__, lookup by methods
            for _, member in inspect.getmembers(object):
                if (
                    inspect.isfunction(member)
                    and object.__qualname__ + "." + member.__name__
                    == member.__qualname__
                ):
                    return inspect.getfile(member)
            else:
                raise TypeError(f"Source for {object!r} not found.")
        # Monkey patch, compute source, then revert monkey patch.
        _old_getfile = inspect.getfile
        inspect.getfile = _new_getfile
        try:
            src = inspect.getsource(value)
        finally:
            inspect.getfile = _old_getfile
    else:
        # Use standard inspect if running outside a notebook
        src = inspect.getsource(value)
    return src
        source_utils
    Utilities for loading/resolving objects.
get_source_root()
    Get the source root.
The source root will be determined in the following order: - The manually specified custom source root if it was set. - The ZenML repository directory if one exists in the current working directory or any parent directories. - The parent directory of the main module file.
Returns:
| Type | Description | 
|---|---|
| str | The source root. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the main module file can't be found. | 
Source code in zenml/utils/source_utils.py
          def get_source_root() -> str:
    """Get the source root.
    The source root will be determined in the following order:
    - The manually specified custom source root if it was set.
    - The ZenML repository directory if one exists in the current working
      directory or any parent directories.
    - The parent directory of the main module file.
    Returns:
        The source root.
    Raises:
        RuntimeError: If the main module file can't be found.
    """
    if _CUSTOM_SOURCE_ROOT:
        logger.debug("Using custom source root: %s", _CUSTOM_SOURCE_ROOT)
        return _CUSTOM_SOURCE_ROOT
    from zenml.client import Client
    repo_root = Client.find_repository()
    if repo_root:
        logger.debug("Using repository root as source root: %s", repo_root)
        return str(repo_root.resolve())
    main_module = sys.modules.get("__main__")
    if main_module is None:
        raise RuntimeError(
            "Unable to determine source root because the main module could not "
            "be found."
        )
    if not hasattr(main_module, "__file__") or not main_module.__file__:
        raise RuntimeError(
            "Unable to determine source root because the main module does not "
            "have an associated file. This could be because you're running in "
            "an interactive Python environment. If you are trying to run from "
            "within a Jupyter notebook, please run `zenml init` from the root "
            "where your notebook is located and restart your notebook server.   "
        )
    path = Path(main_module.__file__).resolve().parent
    logger.debug("Using main module parent directory as source root: %s", path)
    return str(path)
get_source_type(module)
    Get the type of a source.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| module | module | The module for which to get the source type. | required | 
Returns:
| Type | Description | 
|---|---|
| SourceType | The source type. | 
Source code in zenml/utils/source_utils.py
          def get_source_type(module: ModuleType) -> SourceType:
    """Get the type of a source.
    Args:
        module: The module for which to get the source type.
    Returns:
        The source type.
    """
    try:
        file_path = inspect.getfile(module)
    except (TypeError, OSError):
        if module.__name__ == "__main__" and Environment.in_notebook():
            return SourceType.NOTEBOOK
        return SourceType.BUILTIN
    if is_internal_module(module_name=module.__name__):
        return SourceType.INTERNAL
    if is_distribution_package_file(
        file_path=file_path, module_name=module.__name__
    ):
        return SourceType.DISTRIBUTION_PACKAGE
    if is_standard_lib_file(file_path=file_path):
        return SourceType.BUILTIN
    # Make sure to check for distribution packages before this to catch the
    # case when a virtual environment is inside our source root
    if is_user_file(file_path=file_path):
        return SourceType.USER
    return SourceType.UNKNOWN
is_distribution_package_file(file_path, module_name)
    Checks if a file/module belongs to a distribution package.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | The file path to check. | required | 
| module_name | str | The module name. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the file/module belongs to a distribution package, False otherwise. | 
Source code in zenml/utils/source_utils.py
          def is_distribution_package_file(file_path: str, module_name: str) -> bool:
    """Checks if a file/module belongs to a distribution package.
    Args:
        file_path: The file path to check.
        module_name: The module name.
    Returns:
        True if the file/module belongs to a distribution package, False
        otherwise.
    """
    absolute_file_path = Path(file_path).resolve()
    for path in site.getsitepackages() + [site.getusersitepackages()]:
        if Path(path).resolve() in absolute_file_path.parents:
            return True
    # TODO: The previous check does not detect editable installs because
    # the site packages dir only contains a reference to the source files,
    # not the actual files. That means currently editable installs get a
    # source type UNKNOWN which might or might not lead to issues.
    return False
is_internal_module(module_name)
    Checks if a module is internal (=part of the zenml package).
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| module_name | str | Name of the module to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the module is internal, False otherwise. | 
Source code in zenml/utils/source_utils.py
          def is_internal_module(module_name: str) -> bool:
    """Checks if a module is internal (=part of the zenml package).
    Args:
        module_name: Name of the module to check.
    Returns:
        True if the module is internal, False otherwise.
    """
    return module_name.split(".", maxsplit=1)[0] == "zenml"
is_standard_lib_file(file_path)
    Checks if a file belongs to the Python standard library.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | The file path to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the file belongs to the Python standard library, False otherwise. | 
Source code in zenml/utils/source_utils.py
          def is_standard_lib_file(file_path: str) -> bool:
    """Checks if a file belongs to the Python standard library.
    Args:
        file_path: The file path to check.
    Returns:
        True if the file belongs to the Python standard library, False
        otherwise.
    """
    stdlib_root = get_python_lib(standard_lib=True)
    logger.debug("Standard library root: %s", stdlib_root)
    return Path(stdlib_root).resolve() in Path(file_path).resolve().parents
is_user_file(file_path)
    Checks if a file is a user file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | The file path to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the file is a user file, False otherwise. | 
Source code in zenml/utils/source_utils.py
          def is_user_file(file_path: str) -> bool:
    """Checks if a file is a user file.
    Args:
        file_path: The file path to check.
    Returns:
        True if the file is a user file, False otherwise.
    """
    source_root = get_source_root()
    return Path(source_root) in Path(file_path).resolve().parents
load(source)
    Load a source or import path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source | Union[zenml.config.source.Source, str] | The source to load. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | The loaded object. | 
Source code in zenml/utils/source_utils.py
          def load(source: Union[Source, str]) -> Any:
    """Load a source or import path.
    Args:
        source: The source to load.
    Returns:
        The loaded object.
    """
    if isinstance(source, str):
        source = Source.from_import_path(source)
    # The types of some objects don't exist in the `builtin` module
    # so we need to manually handle it here
    if source.import_path == NoneTypeSource.import_path:
        return NoneType
    elif source.import_path == FunctionTypeSource.import_path:
        return FunctionType
    elif source.import_path == BuiltinFunctionTypeSource.import_path:
        return BuiltinFunctionType
    import_root = None
    if source.type == SourceType.CODE_REPOSITORY:
        source = CodeRepositorySource.model_validate(dict(source))
        _warn_about_potential_source_loading_issues(source=source)
        import_root = get_source_root()
    elif source.type == SourceType.DISTRIBUTION_PACKAGE:
        source = DistributionPackageSource.model_validate(dict(source))
        if source.version:
            current_package_version = _get_package_version(
                package_name=source.package_name
            )
            if current_package_version != source.version:
                logger.warning(
                    "The currently installed version `%s` of package `%s` "
                    "does not match the source version `%s`. This might lead "
                    "to unexpected behavior when using the source object `%s`.",
                    current_package_version,
                    source.package_name,
                    source.version,
                    source.import_path,
                )
    elif source.type == SourceType.NOTEBOOK:
        if Environment.in_notebook():
            # If we're in a notebook, we don't need to do anything as the
            # loading from the __main__ module should work just fine.
            pass
        else:
            notebook_source = NotebookSource.model_validate(dict(source))
            return _try_to_load_notebook_source(notebook_source)
    elif source.type in {SourceType.USER, SourceType.UNKNOWN}:
        # Unknown source might also refer to a user file, include source
        # root in python path just to be sure
        import_root = get_source_root()
    module = _load_module(module_name=source.module, import_root=import_root)
    if source.attribute:
        obj = getattr(module, source.attribute)
    else:
        obj = module
    return obj
load_and_validate_class(source, expected_class)
    Loads a source class and validates its class.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source | Union[str, zenml.config.source.Source] | The source. | required | 
| expected_class | Type[Any] | The class that the source should resolve to. | required | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | If the source does not resolve to the expected class. | 
Returns:
| Type | Description | 
|---|---|
| Type[Any] | The resolved source class. | 
Source code in zenml/utils/source_utils.py
          def load_and_validate_class(
    source: Union[str, Source], expected_class: Type[Any]
) -> Type[Any]:
    """Loads a source class and validates its class.
    Args:
        source: The source.
        expected_class: The class that the source should resolve to.
    Raises:
        TypeError: If the source does not resolve to the expected class.
    Returns:
        The resolved source class.
    """
    obj = load(source)
    if isinstance(obj, type) and issubclass(obj, expected_class):
        return obj
    else:
        raise TypeError(
            f"Error while loading `{source}`. Expected class "
            f"{expected_class.__name__}, got {obj} instead."
        )
prepend_python_path(path)
    Context manager to temporarily prepend a path to the python path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| path | str | Path that will be prepended to sys.path for the duration of the context manager. | required | 
Yields:
| Type | Description | 
|---|---|
| Iterator[NoneType] | None | 
Source code in zenml/utils/source_utils.py
          @contextlib.contextmanager
def prepend_python_path(path: str) -> Iterator[None]:
    """Context manager to temporarily prepend a path to the python path.
    Args:
        path: Path that will be prepended to sys.path for the duration of
            the context manager.
    Yields:
        None
    """
    try:
        sys.path.insert(0, path)
        yield
    finally:
        sys.path.remove(path)
resolve(obj, skip_validation=False)
    Resolve an object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Union[Type[Any], Callable[..., Any], module, function, builtin_function_or_method] | The object to resolve. | required | 
| skip_validation | bool | If True, the validation that the object exist in the module is skipped. | False | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the object can't be resolved. | 
Returns:
| Type | Description | 
|---|---|
| Source | The source of the resolved object. | 
Source code in zenml/utils/source_utils.py
          def resolve(
    obj: Union[
        Type[Any],
        Callable[..., Any],
        ModuleType,
        FunctionType,
        BuiltinFunctionType,
        NoneType,
    ],
    skip_validation: bool = False,
) -> Source:
    """Resolve an object.
    Args:
        obj: The object to resolve.
        skip_validation: If True, the validation that the object exist in the
            module is skipped.
    Raises:
        RuntimeError: If the object can't be resolved.
    Returns:
        The source of the resolved object.
    """
    # The types of some objects don't exist in the `builtin` module
    # so we need to manually handle it here
    if obj is NoneType:
        return NoneTypeSource
    elif obj is FunctionType:
        return FunctionTypeSource
    elif obj is BuiltinFunctionType:
        return BuiltinFunctionTypeSource
    elif source := getattr(obj, ZENML_SOURCE_ATTRIBUTE_NAME, None):
        assert isinstance(source, Source)
        return source
    elif isinstance(obj, ModuleType):
        module = obj
        attribute_name = None
    else:
        module = sys.modules[obj.__module__]
        attribute_name = obj.__name__  # type: ignore[union-attr]
    if (
        not (skip_validation or getattr(obj, "_DOCS_BUILDING_MODE", False))
        and attribute_name
        and getattr(module, attribute_name, None) is not obj
    ):
        raise RuntimeError(
            f"Unable to resolve object `{obj}`. For the resolving to work, the "
            "class or function must be defined as top-level code (= it must "
            "get defined when importing the module) and not inside a function/"
            f"if-condition. Please make sure that your `{module.__name__}` "
            f"module has a top-level attribute `{attribute_name}` that "
            "holds the object you want to resolve."
        )
    module_name = module.__name__
    if module_name == "__main__":
        module_name = _resolve_module(module)
    source_type = get_source_type(module=module)
    if source_type == SourceType.USER:
        from zenml.utils import code_repository_utils
        local_repo_context = (
            code_repository_utils.find_active_code_repository()
        )
        if local_repo_context and not local_repo_context.has_local_changes:
            module_name = _resolve_module(module)
            source_root = get_source_root()
            subdir = PurePath(source_root).relative_to(local_repo_context.root)
            return CodeRepositorySource(
                repository_id=local_repo_context.code_repository_id,
                commit=local_repo_context.current_commit,
                subdirectory=subdir.as_posix(),
                module=module_name,
                attribute=attribute_name,
                type=SourceType.CODE_REPOSITORY,
            )
        module_name = _resolve_module(module)
    elif source_type == SourceType.DISTRIBUTION_PACKAGE:
        package_name = _get_package_for_module(module_name=module_name)
        if package_name:
            package_version = _get_package_version(package_name=package_name)
            return DistributionPackageSource(
                module=module_name,
                attribute=attribute_name,
                package_name=package_name,
                version=package_version,
                type=source_type,
            )
        else:
            # Fallback to an unknown source if we can't find the package
            source_type = SourceType.UNKNOWN
    elif source_type == SourceType.NOTEBOOK:
        source = NotebookSource(
            module=module_name,
            attribute=attribute_name,
            type=source_type,
        )
        # Private attributes are ignored by pydantic if passed in the __init__
        # method, so we set this afterwards
        source._cell_code = notebook_utils.load_notebook_cell_code(obj)
        return source
    return Source(
        module=module_name, attribute=attribute_name, type=source_type
    )
set_custom_source_root(source_root)
    Sets a custom source root.
If set this has the highest priority and will always be used as the source root.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source_root | Optional[str] | The source root to use. | required | 
Source code in zenml/utils/source_utils.py
          def set_custom_source_root(source_root: Optional[str]) -> None:
    """Sets a custom source root.
    If set this has the highest priority and will always be used as the source
    root.
    Args:
        source_root: The source root to use.
    """
    logger.debug("Setting custom source root: %s", source_root)
    global _CUSTOM_SOURCE_ROOT
    _CUSTOM_SOURCE_ROOT = source_root
validate_source_class(source, expected_class)
    Validates that a source resolves to a certain class.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| source | Union[zenml.config.source.Source, str] | The source to validate. | required | 
| expected_class | Type[Any] | The class that the source should resolve to. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if the source resolves to the expected class, False otherwise. | 
Source code in zenml/utils/source_utils.py
          def validate_source_class(
    source: Union[Source, str], expected_class: Type[Any]
) -> bool:
    """Validates that a source resolves to a certain class.
    Args:
        source: The source to validate.
        expected_class: The class that the source should resolve to.
    Returns:
        True if the source resolves to the expected class, False otherwise.
    """
    try:
        obj = load(source)
    except Exception:
        return False
    if isinstance(obj, type) and issubclass(obj, expected_class):
        return True
    else:
        return False
        string_utils
    Utils for strings.
b64_decode(input_)
    Returns a decoded string of the base 64 encoded input string.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| input_ | str | Base64 encoded string. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Decoded string. | 
Source code in zenml/utils/string_utils.py
          def b64_decode(input_: str) -> str:
    """Returns a decoded string of the base 64 encoded input string.
    Args:
        input_: Base64 encoded string.
    Returns:
        Decoded string.
    """
    encoded_bytes = input_.encode()
    decoded_bytes = base64.b64decode(encoded_bytes)
    return decoded_bytes.decode()
b64_encode(input_)
    Returns a base 64 encoded string of the input string.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| input_ | str | The input to encode. | required | 
Returns:
| Type | Description | 
|---|---|
| str | Base64 encoded string. | 
Source code in zenml/utils/string_utils.py
          def b64_encode(input_: str) -> str:
    """Returns a base 64 encoded string of the input string.
    Args:
        input_: The input to encode.
    Returns:
        Base64 encoded string.
    """
    input_bytes = input_.encode()
    encoded_bytes = base64.b64encode(input_bytes)
    return encoded_bytes.decode()
get_human_readable_filesize(bytes_)
    Convert a file size in bytes into a human-readable string.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| bytes_ | int | The number of bytes to convert. | required | 
Returns:
| Type | Description | 
|---|---|
| str | A human-readable string. | 
Source code in zenml/utils/string_utils.py
          def get_human_readable_filesize(bytes_: int) -> str:
    """Convert a file size in bytes into a human-readable string.
    Args:
        bytes_: The number of bytes to convert.
    Returns:
        A human-readable string.
    """
    size = abs(float(bytes_))
    for unit in ["B", "KiB", "MiB", "GiB"]:
        if size < 1024.0 or unit == "GiB":
            break
        size /= 1024.0
    return f"{size:.2f} {unit}"
get_human_readable_time(seconds)
    Convert seconds into a human-readable string.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| seconds | float | The number of seconds to convert. | required | 
Returns:
| Type | Description | 
|---|---|
| str | A human-readable string. | 
Source code in zenml/utils/string_utils.py
          def get_human_readable_time(seconds: float) -> str:
    """Convert seconds into a human-readable string.
    Args:
        seconds: The number of seconds to convert.
    Returns:
        A human-readable string.
    """
    prefix = "-" if seconds < 0 else ""
    seconds = abs(seconds)
    int_seconds = int(seconds)
    days, int_seconds = divmod(int_seconds, 86400)
    hours, int_seconds = divmod(int_seconds, 3600)
    minutes, int_seconds = divmod(int_seconds, 60)
    if days > 0:
        time_string = f"{days}d{hours}h{minutes}m{int_seconds}s"
    elif hours > 0:
        time_string = f"{hours}h{minutes}m{int_seconds}s"
    elif minutes > 0:
        time_string = f"{minutes}m{int_seconds}s"
    else:
        time_string = f"{seconds:.3f}s"
    return prefix + time_string
random_str(length)
    Generate a random human readable string of given length.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| length | int | Length of string | required | 
Returns:
| Type | Description | 
|---|---|
| str | Random human-readable string. | 
Source code in zenml/utils/string_utils.py
          def random_str(length: int) -> str:
    """Generate a random human readable string of given length.
    Args:
        length: Length of string
    Returns:
        Random human-readable string.
    """
    random.seed()
    return "".join(random.choices(string.ascii_letters, k=length))
validate_name(model)
    Validator to ensure that the given name has only allowed characters.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model | BaseModel | The model to validate. | required | 
Exceptions:
| Type | Description | 
|---|---|
| ValueError | If the name has invalid characters. | 
Source code in zenml/utils/string_utils.py
          def validate_name(model: BaseModel) -> None:
    """Validator to ensure that the given name has only allowed characters.
    Args:
        model: The model to validate.
    Raises:
        ValueError: If the name has invalid characters.
    """
    cls_name = model.__class__.__name__
    if cls_name.endswith("Base"):
        type_ = cls_name[:-4]
    elif cls_name.endswith("Request"):
        type_ = cls_name[:-7]
    else:
        type_ = cls_name
    if name := getattr(model, "name", None):
        diff = "".join(set(name).intersection(set(BANNED_NAME_CHARACTERS)))
        if diff:
            msg = (
                f"The name `{name}` of the `{type_}` contains "
                f"the following forbidden characters: `{diff}`."
            )
            raise ValueError(msg)
    else:
        raise ValueError(
            f"The class `{cls_name}` has no attribute `name` "
            "or it is set to `None`. Cannot validate the name."
        )
        terraform_utils
    Terraform utilities.
verify_terraform_installation()
    Verifies the Terraform installation.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If Terraform is not installed or ZenML was installed
without the  | 
Source code in zenml/utils/terraform_utils.py
          def verify_terraform_installation() -> None:
    """Verifies the Terraform installation.
    Raises:
        RuntimeError: If Terraform is not installed or ZenML was installed
            without the `terraform` extra.
    """
    try:
        import python_terraform
    except ImportError:
        raise RuntimeError(
            "Missing Terraform python library. This is probably because ZenML "
            "was installed without the optional `terraform` extra. To install "
            "the missing dependencies, run: \n\n"
            f'`pip install "zenml[terraform]=={zenml.__version__}"`.'
        )
    try:
        python_terraform.Terraform().cmd("-version")
    except FileNotFoundError:
        raise RuntimeError(
            "Missing Terraform installation. Check out "
            "https://developer.hashicorp.com/terraform/downloads for how to "
            "install it on your operating system."
        )
        typed_model
    Utility classes for adding type information to Pydantic models.
        
BaseTypedModel            (BaseModel)
        
    Typed Pydantic model base class.
Use this class as a base class instead of BaseModel to automatically
add a type literal attribute to the model that stores the name of the
class.
This can be useful when serializing models to JSON and then de-serializing them as part of a submodel union field, e.g.:
class BluePill(BaseTypedModel):
    ...
class RedPill(BaseTypedModel):
    ...
class TheMatrix(BaseTypedModel):
    choice: Union[BluePill, RedPill] = Field(..., discriminator='type')
matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = TheMatrix.model_validate(d)
assert isinstance(new_matrix.choice, RedPill)
It can also facilitate de-serializing objects when their type isn't known:
matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = BaseTypedModel.from_dict(d)
assert isinstance(new_matrix.choice, RedPill)
Source code in zenml/utils/typed_model.py
          class BaseTypedModel(BaseModel, metaclass=BaseTypedModelMeta):
    """Typed Pydantic model base class.
    Use this class as a base class instead of BaseModel to automatically
    add a `type` literal attribute to the model that stores the name of the
    class.
    This can be useful when serializing models to JSON and then de-serializing
    them as part of a submodel union field, e.g.:
    ```python
    class BluePill(BaseTypedModel):
        ...
    class RedPill(BaseTypedModel):
        ...
    class TheMatrix(BaseTypedModel):
        choice: Union[BluePill, RedPill] = Field(..., discriminator='type')
    matrix = TheMatrix(choice=RedPill())
    d = matrix.dict()
    new_matrix = TheMatrix.model_validate(d)
    assert isinstance(new_matrix.choice, RedPill)
    ```
    It can also facilitate de-serializing objects when their type isn't known:
    ```python
    matrix = TheMatrix(choice=RedPill())
    d = matrix.dict()
    new_matrix = BaseTypedModel.from_dict(d)
    assert isinstance(new_matrix.choice, RedPill)
    ```
    """
    @classmethod
    def from_dict(
        cls,
        model_dict: Dict[str, Any],
    ) -> "BaseTypedModel":
        """Instantiate a Pydantic model from a serialized JSON-able dict representation.
        Args:
            model_dict: the model attributes serialized as JSON-able dict.
        Returns:
            A BaseTypedModel created from the serialized representation.
        Raises:
            RuntimeError: if the model_dict contains an invalid type.
        """
        model_type = model_dict.get("type")
        if not model_type:
            raise RuntimeError(
                "`type` information is missing from the serialized model dict."
            )
        cls = source_utils.load(model_type)
        if not issubclass(cls, BaseTypedModel):
            raise RuntimeError(
                f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
            )
        return cls.model_validate(model_dict)
    @classmethod
    def from_json(
        cls,
        json_str: str,
    ) -> "BaseTypedModel":
        """Instantiate a Pydantic model from a serialized JSON representation.
        Args:
            json_str: the model attributes serialized as JSON.
        Returns:
            A BaseTypedModel created from the serialized representation.
        """
        model_dict = json.loads(json_str)
        return cls.from_dict(model_dict)
from_dict(model_dict)
  
      classmethod
  
    Instantiate a Pydantic model from a serialized JSON-able dict representation.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| model_dict | Dict[str, Any] | the model attributes serialized as JSON-able dict. | required | 
Returns:
| Type | Description | 
|---|---|
| BaseTypedModel | A BaseTypedModel created from the serialized representation. | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | if the model_dict contains an invalid type. | 
Source code in zenml/utils/typed_model.py
          @classmethod
def from_dict(
    cls,
    model_dict: Dict[str, Any],
) -> "BaseTypedModel":
    """Instantiate a Pydantic model from a serialized JSON-able dict representation.
    Args:
        model_dict: the model attributes serialized as JSON-able dict.
    Returns:
        A BaseTypedModel created from the serialized representation.
    Raises:
        RuntimeError: if the model_dict contains an invalid type.
    """
    model_type = model_dict.get("type")
    if not model_type:
        raise RuntimeError(
            "`type` information is missing from the serialized model dict."
        )
    cls = source_utils.load(model_type)
    if not issubclass(cls, BaseTypedModel):
        raise RuntimeError(
            f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
        )
    return cls.model_validate(model_dict)
from_json(json_str)
  
      classmethod
  
    Instantiate a Pydantic model from a serialized JSON representation.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| json_str | str | the model attributes serialized as JSON. | required | 
Returns:
| Type | Description | 
|---|---|
| BaseTypedModel | A BaseTypedModel created from the serialized representation. | 
Source code in zenml/utils/typed_model.py
          @classmethod
def from_json(
    cls,
    json_str: str,
) -> "BaseTypedModel":
    """Instantiate a Pydantic model from a serialized JSON representation.
    Args:
        json_str: the model attributes serialized as JSON.
    Returns:
        A BaseTypedModel created from the serialized representation.
    """
    model_dict = json.loads(json_str)
    return cls.from_dict(model_dict)
        
BaseTypedModelMeta            (ModelMetaclass)
        
    Metaclass responsible for adding type information to Pydantic models.
Source code in zenml/utils/typed_model.py
          class BaseTypedModelMeta(ModelMetaclass):
    """Metaclass responsible for adding type information to Pydantic models."""
    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseTypedModelMeta":
        """Creates a Pydantic BaseModel class.
        This includes a hidden attribute that reflects the full class
        identifier.
        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The class dictionary.
        Returns:
            A Pydantic BaseModel class that includes a hidden attribute that
            reflects the full class identifier.
        Raises:
            TypeError: If the class is not a Pydantic BaseModel class.
        """
        if "type" in dct:
            raise TypeError(
                "`type` is a reserved attribute name for BaseTypedModel "
                "subclasses"
            )
        type_name = f"{dct['__module__']}.{dct['__qualname__']}"
        type_ann = Literal[type_name]  # type: ignore[valid-type]
        type = Field(type_name)
        dct.setdefault("__annotations__", dict())["type"] = type_ann
        dct["type"] = type
        cls = cast(
            Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
        )
        return cls
__new__(mcs, name, bases, dct)
  
      special
      staticmethod
  
    Creates a Pydantic BaseModel class.
This includes a hidden attribute that reflects the full class identifier.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| name | str | The name of the class. | required | 
| bases | Tuple[Type[Any], ...] | The base classes of the class. | required | 
| dct | Dict[str, Any] | The class dictionary. | required | 
Returns:
| Type | Description | 
|---|---|
| BaseTypedModelMeta | A Pydantic BaseModel class that includes a hidden attribute that reflects the full class identifier. | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | If the class is not a Pydantic BaseModel class. | 
Source code in zenml/utils/typed_model.py
          def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseTypedModelMeta":
    """Creates a Pydantic BaseModel class.
    This includes a hidden attribute that reflects the full class
    identifier.
    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The class dictionary.
    Returns:
        A Pydantic BaseModel class that includes a hidden attribute that
        reflects the full class identifier.
    Raises:
        TypeError: If the class is not a Pydantic BaseModel class.
    """
    if "type" in dct:
        raise TypeError(
            "`type` is a reserved attribute name for BaseTypedModel "
            "subclasses"
        )
    type_name = f"{dct['__module__']}.{dct['__qualname__']}"
    type_ann = Literal[type_name]  # type: ignore[valid-type]
    type = Field(type_name)
    dct.setdefault("__annotations__", dict())["type"] = type_ann
    dct["type"] = type
    cls = cast(
        Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
    )
    return cls
        typing_utils
    Carried over version of some functions from the pydantic v1 typing module.
Check out the latest version here: https://github.com/pydantic/pydantic/blob/v1.10.14/pydantic/typing.py
all_literal_values(type_)
    Fetches the literal values defined in a type in a recursive manner.
This method is used to retrieve all Literal values as Literal can be
used recursively (see https://www.python.org/dev/peps/pep-0586)
e.g. Literal[Literal[Literal[1, 2, 3], "foo"], 5, None]
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| type_ | Type[Any] | type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[Any, ...] | tuple of all the literal values defined in the type. | 
Source code in zenml/utils/typing_utils.py
          def all_literal_values(type_: Type[Any]) -> Tuple[Any, ...]:
    """Fetches the literal values defined in a type in a recursive manner.
    This method is used to retrieve all Literal values as Literal can be
    used recursively (see https://www.python.org/dev/peps/pep-0586)
    e.g. `Literal[Literal[Literal[1, 2, 3], "foo"], 5, None]`
    Args:
        type_: type to check.
    Returns:
        tuple of all the literal values defined in the type.
    """
    if not is_literal_type(type_):
        return (type_,)
    values = literal_values(type_)
    return tuple(x for value in values for x in all_literal_values(value))
get_args(tp)
    Get type arguments with all substitutions performed.
For unions, basic simplifications used by Union constructor are performed. Examples:: get_args(Dict[str, int]) == (str, int) get_args(int) == () get_args(Union[int, Union[T, int], str][int]) == (int, str) get_args(Union[int, Tuple[T, int]][str]) == (int, Tuple[str, int]) get_args(Callable[[], T][int]) == ([], int)
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tp | Type[Any] | the type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[Any, ...] | Tuple of all the args. | 
Source code in zenml/utils/typing_utils.py
          def get_args(tp: Type[Any]) -> Tuple[Any, ...]:
    """Get type arguments with all substitutions performed.
    For unions, basic simplifications used by Union constructor are performed.
    Examples::
        get_args(Dict[str, int]) == (str, int)
        get_args(int) == ()
        get_args(Union[int, Union[T, int], str][int]) == (int, str)
        get_args(Union[int, Tuple[T, int]][str]) == (int, Tuple[str, int])
        get_args(Callable[[], T][int]) == ([], int)
    Args:
        tp: the type to check.
    Returns:
        Tuple of all the args.
    """
    if type(tp).__name__ in AnnotatedTypeNames:
        return tp.__args__ + tp.__metadata__  # type: ignore[no-any-return]
    # the fallback is needed for the same reasons as `get_origin` (see above)
    return (
        _typing_get_args(tp)
        or getattr(tp, "__args__", ())
        or _generic_get_args(tp)
    )
get_origin(tp)
    Fetches the origin of a given type.
We can't directly use typing.get_origin since we need a fallback to
support custom generic classes like ConstrainedList
It should be useless once https://github.com/cython/cython/issues/3537 is
solved and https://github.com/pydantic/pydantic/pull/1753 is merged.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tp | Type[Any] | type to check | required | 
Returns:
| Type | Description | 
|---|---|
| Optional[Type[Any]] | the origin type of the provided type. | 
Source code in zenml/utils/typing_utils.py
          def get_origin(tp: Type[Any]) -> Optional[Type[Any]]:
    """Fetches the origin of a given type.
    We can't directly use `typing.get_origin` since we need a fallback to
    support custom generic classes like `ConstrainedList`
    It should be useless once https://github.com/cython/cython/issues/3537 is
    solved and https://github.com/pydantic/pydantic/pull/1753 is merged.
    Args:
        tp: type to check
    Returns:
        the origin type of the provided type.
    """
    if type(tp).__name__ in AnnotatedTypeNames:
        return cast(Type[Any], Annotated)  # mypy complains about _SpecialForm
    return _typing_get_origin(tp) or getattr(tp, "__origin__", None)
is_literal_type(type_)
    Checks if the provided type is a literal type.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| type_ | Type[Any] | type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | boolean indicating whether the type is union type. | 
Source code in zenml/utils/typing_utils.py
          def is_literal_type(type_: Type[Any]) -> bool:
    """Checks if the provided type is a literal type.
    Args:
        type_: type to check.
    Returns:
        boolean indicating whether the type is union type.
    """
    return Literal is not None and get_origin(type_) in LITERAL_TYPES
is_none_type(type_)
    Checks if the provided type is a none type.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| type_ | Any | type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | boolean indicating whether the type is a none type. | 
Source code in zenml/utils/typing_utils.py
          def is_none_type(type_: Any) -> bool:
    """Checks if the provided type is a none type.
    Args:
        type_: type to check.
    Returns:
        boolean indicating whether the type is a none type.
    """
    return type_ in NONE_TYPES
is_optional(tp)
    Checks whether a given annotation is typing.Optional.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tp | Type[Any] | the type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | boolean indicating if the type is typing.Optional. | 
Source code in zenml/utils/typing_utils.py
          def is_optional(tp: Type[Any]) -> bool:
    """Checks whether a given annotation is typing.Optional.
    Args:
        tp: the type to check.
    Returns:
        boolean indicating if the type is typing.Optional.
    """
    return is_union(get_origin(tp)) and type(None) in get_args(tp)
is_union(type_)
    Checks if the provided type is a union type.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| type_ | Optional[Type[Any]] | type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | boolean indicating whether the type is union type. | 
Source code in zenml/utils/typing_utils.py
          def is_union(type_: Optional[Type[Any]]) -> bool:
    """Checks if the provided type is a union type.
    Args:
        type_: type to check.
    Returns:
        boolean indicating whether the type is union type.
    """
    import types
    return type_ is Union or type_ is types.UnionType  # type: ignore[comparison-overlap]
literal_values(type_)
    Fetches the literal values defined in a type.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| type_ | Type[Any] | type to check. | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[Any, ...] | tuple of the literal values. | 
Source code in zenml/utils/typing_utils.py
          def literal_values(type_: Type[Any]) -> Tuple[Any, ...]:
    """Fetches the literal values defined in a type.
    Args:
        type_: type to check.
    Returns:
        tuple of the literal values.
    """
    return get_args(type_)
        uuid_utils
    Utility functions for handling UUIDs.
generate_uuid_from_string(value)
    Deterministically generates a UUID from a string seed.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| value | str | The string from which to generate the UUID. | required | 
Returns:
| Type | Description | 
|---|---|
| UUID | The generated UUID. | 
Source code in zenml/utils/uuid_utils.py
          def generate_uuid_from_string(value: str) -> UUID:
    """Deterministically generates a UUID from a string seed.
    Args:
        value: The string from which to generate the UUID.
    Returns:
        The generated UUID.
    """
    hash_ = hashlib.md5()  # nosec
    hash_.update(value.encode("utf-8"))
    return UUID(hex=hash_.hexdigest(), version=4)
is_valid_uuid(value, version=4)
    Checks if a string is a valid UUID.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| value | Any | String to check. | required | 
| version | int | Version of UUID to check for. | 4 | 
Returns:
| Type | Description | 
|---|---|
| bool | True if string is a valid UUID, False otherwise. | 
Source code in zenml/utils/uuid_utils.py
          def is_valid_uuid(value: Any, version: int = 4) -> bool:
    """Checks if a string is a valid UUID.
    Args:
        value: String to check.
        version: Version of UUID to check for.
    Returns:
        True if string is a valid UUID, False otherwise.
    """
    if isinstance(value, UUID):
        return True
    if isinstance(value, str):
        try:
            UUID(value, version=version)
            return True
        except ValueError:
            return False
    return False
parse_name_or_uuid(name_or_id)
    Convert a "name or id" string value to a string or UUID.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| name_or_id | Optional[str] | Name or id to convert. | required | 
Returns:
| Type | Description | 
|---|---|
| Union[uuid.UUID, str] | A UUID if name_or_id is a UUID, string otherwise. | 
Source code in zenml/utils/uuid_utils.py
          def parse_name_or_uuid(
    name_or_id: Optional[str],
) -> Optional[Union[str, UUID]]:
    """Convert a "name or id" string value to a string or UUID.
    Args:
        name_or_id: Name or id to convert.
    Returns:
        A UUID if name_or_id is a UUID, string otherwise.
    """
    if name_or_id:
        try:
            return UUID(name_or_id)
        except ValueError:
            return name_or_id
    else:
        return name_or_id
        visualization_utils
    Utility functions for dashboard visualizations.
format_csv_visualization_as_html(csv_visualization, max_rows=10, max_cols=10)
    Formats a CSV visualization as an HTML table.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| csv_visualization | str | CSV visualization as a string. | required | 
| max_rows | int | Maximum number of rows to display. Remaining rows will be replaced by an ellipsis in the middle of the table. | 10 | 
| max_cols | int | Maximum number of columns to display. Remaining columns will be replaced by an ellipsis at the end of each row. | 10 | 
Returns:
| Type | Description | 
|---|---|
| str | HTML table as a string. | 
Source code in zenml/utils/visualization_utils.py
          def format_csv_visualization_as_html(
    csv_visualization: str, max_rows: int = 10, max_cols: int = 10
) -> str:
    """Formats a CSV visualization as an HTML table.
    Args:
        csv_visualization: CSV visualization as a string.
        max_rows: Maximum number of rows to display. Remaining rows will be
            replaced by an ellipsis in the middle of the table.
        max_cols: Maximum number of columns to display. Remaining columns will
            be replaced by an ellipsis at the end of each row.
    Returns:
        HTML table as a string.
    """
    rows = csv_visualization.splitlines()
    html = ""
    # If there are fewer rows than the maximum, print all rows
    if len(rows) <= max_rows:
        for row in rows:
            html += _format_csv_row_as_html(row, max_cols=max_cols)
    else:
        # Else, replace middle rows with ellipsis
        half_max_rows = max_rows // 2
        # Print first half of rows
        for row in rows[:half_max_rows]:
            html += _format_csv_row_as_html(row, max_cols=max_cols)
        # Print ellipsis
        if len(rows) > max_rows:
            html += "<tr><td>...</td></tr>"
        # Print last half of rows
        for row in rows[-half_max_rows:]:
            html += _format_csv_row_as_html(row, max_cols=max_cols)
    return "<table>" + html + "</table>"
visualize_artifact(artifact, title=None)
    Visualize an artifact in notebook environments.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| artifact | ArtifactVersionResponse | The artifact to visualize. | required | 
| title | Optional[str] | Optional title to show before the visualizations. | None | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If not in a notebook environment. | 
Source code in zenml/utils/visualization_utils.py
          def visualize_artifact(
    artifact: "ArtifactVersionResponse", title: Optional[str] = None
) -> None:
    """Visualize an artifact in notebook environments.
    Args:
        artifact: The artifact to visualize.
        title: Optional title to show before the visualizations.
    Raises:
        RuntimeError: If not in a notebook environment.
    """
    if not Environment.in_notebook():
        raise RuntimeError(
            "The `output.visualize()` method is only available in Jupyter "
            "notebooks. In all other runtime environments, please open "
            "your ZenML dashboard using `zenml up` and view the "
            "visualizations by clicking on the respective artifacts in the "
            "pipeline run DAG instead."
        )
    if not artifact.visualizations:
        return
    if title:
        display(Markdown(f"### {title}"))
    for i in range(len(artifact.visualizations)):
        visualization = load_artifact_visualization(artifact, index=i)
        if visualization.type == VisualizationType.IMAGE:
            display(Image(visualization.value))
        elif visualization.type == VisualizationType.HTML:
            display(HTML(visualization.value))
        elif visualization.type == VisualizationType.MARKDOWN:
            display(Markdown(visualization.value))
        elif visualization.type == VisualizationType.CSV:
            assert isinstance(visualization.value, str)
            table = format_csv_visualization_as_html(visualization.value)
            display(HTML(table))
        else:
            display(visualization.value)
        yaml_utils
    Utility functions to help with YAML files and data.
        
UUIDEncoder            (JSONEncoder)
        
    JSON encoder for UUID objects.
Source code in zenml/utils/yaml_utils.py
          class UUIDEncoder(json.JSONEncoder):
    """JSON encoder for UUID objects."""
    def default(self, obj: Any) -> Any:
        """Default UUID encoder for JSON.
        Args:
            obj: Object to encode.
        Returns:
            Encoded object.
        """
        if isinstance(obj, UUID):
            # if the obj is uuid, we simply return the value of uuid
            return obj.hex
        return json.JSONEncoder.default(self, obj)
default(self, obj)
    Default UUID encoder for JSON.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Any | Object to encode. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | Encoded object. | 
Source code in zenml/utils/yaml_utils.py
          def default(self, obj: Any) -> Any:
    """Default UUID encoder for JSON.
    Args:
        obj: Object to encode.
    Returns:
        Encoded object.
    """
    if isinstance(obj, UUID):
        # if the obj is uuid, we simply return the value of uuid
        return obj.hex
    return json.JSONEncoder.default(self, obj)
append_yaml(file_path, contents)
    Append contents to a YAML file at file_path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to YAML file. | required | 
| contents | Dict[Any, Any] | Contents of YAML file as dict. | required | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | if directory does not exist. | 
Source code in zenml/utils/yaml_utils.py
          def append_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
    """Append contents to a YAML file at file_path.
    Args:
        file_path: Path to YAML file.
        contents: Contents of YAML file as dict.
    Raises:
        FileNotFoundError: if directory does not exist.
    """
    file_contents = read_yaml(file_path) or {}
    file_contents.update(contents)
    if not io_utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    io_utils.write_file_contents_as_string(file_path, yaml.dump(file_contents))
comment_out_yaml(yaml_string)
    Comments out a yaml string.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| yaml_string | str | The yaml string to comment out. | required | 
Returns:
| Type | Description | 
|---|---|
| str | The commented out yaml string. | 
Source code in zenml/utils/yaml_utils.py
          def comment_out_yaml(yaml_string: str) -> str:
    """Comments out a yaml string.
    Args:
        yaml_string: The yaml string to comment out.
    Returns:
        The commented out yaml string.
    """
    lines = yaml_string.splitlines(keepends=True)
    lines = ["# " + line for line in lines]
    return "".join(lines)
is_json_serializable(obj)
    Checks whether an object is JSON serializable.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| obj | Any | The object to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the object is JSON serializable using pydantics encoder class. | 
Source code in zenml/utils/yaml_utils.py
          def is_json_serializable(obj: Any) -> bool:
    """Checks whether an object is JSON serializable.
    Args:
        obj: The object to check.
    Returns:
        Whether the object is JSON serializable using pydantics encoder class.
    """
    from zenml.utils.json_utils import pydantic_encoder
    try:
        json.dumps(obj, default=pydantic_encoder)
        return True
    except TypeError:
        return False
is_yaml(file_path)
    Returns True if file_path is YAML, else False.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to YAML file. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | True if is yaml, else False. | 
Source code in zenml/utils/yaml_utils.py
          def is_yaml(file_path: str) -> bool:
    """Returns True if file_path is YAML, else False.
    Args:
        file_path: Path to YAML file.
    Returns:
        True if is yaml, else False.
    """
    if file_path.endswith("yaml") or file_path.endswith("yml"):
        return True
    return False
read_json(file_path)
    Read JSON on file path and returns contents as dict.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to JSON file. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | Contents of the file in a dict. | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | if file does not exist. | 
Source code in zenml/utils/yaml_utils.py
          def read_json(file_path: str) -> Any:
    """Read JSON on file path and returns contents as dict.
    Args:
        file_path: Path to JSON file.
    Returns:
        Contents of the file in a dict.
    Raises:
        FileNotFoundError: if file does not exist.
    """
    if fileio.exists(file_path):
        contents = io_utils.read_file_contents_as_string(file_path)
        return json.loads(contents)
    else:
        raise FileNotFoundError(f"{file_path} does not exist.")
read_yaml(file_path)
    Read YAML on file path and returns contents as dict.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to YAML file. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | Contents of the file in a dict. | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | if file does not exist. | 
Source code in zenml/utils/yaml_utils.py
          def read_yaml(file_path: str) -> Any:
    """Read YAML on file path and returns contents as dict.
    Args:
        file_path: Path to YAML file.
    Returns:
        Contents of the file in a dict.
    Raises:
        FileNotFoundError: if file does not exist.
    """
    if fileio.exists(file_path):
        contents = io_utils.read_file_contents_as_string(file_path)
        # TODO: [LOW] consider adding a default empty dict to be returned
        #   instead of None
        return yaml.safe_load(contents)
    else:
        raise FileNotFoundError(f"{file_path} does not exist.")
write_json(file_path, contents, encoder=None)
    Write contents as JSON format to file_path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to JSON file. | required | 
| contents | Any | Contents of JSON file. | required | 
| encoder | Optional[Type[json.encoder.JSONEncoder]] | Custom JSON encoder to use when saving json. | None | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | if directory does not exist. | 
Source code in zenml/utils/yaml_utils.py
          def write_json(
    file_path: str,
    contents: Any,
    encoder: Optional[Type[json.JSONEncoder]] = None,
) -> None:
    """Write contents as JSON format to file_path.
    Args:
        file_path: Path to JSON file.
        contents: Contents of JSON file.
        encoder: Custom JSON encoder to use when saving json.
    Raises:
        FileNotFoundError: if directory does not exist.
    """
    if not io_utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            # Check if it is a local path, if it doesn't exist, raise Exception.
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    io_utils.write_file_contents_as_string(
        file_path,
        json.dumps(
            contents,
            cls=encoder,
        ),
    )
write_yaml(file_path, contents, sort_keys=True)
    Write contents as YAML format to file_path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| file_path | str | Path to YAML file. | required | 
| contents | Union[Dict[Any, Any], List[Any]] | Contents of YAML file as dict or list. | required | 
| sort_keys | bool | If  | True | 
Exceptions:
| Type | Description | 
|---|---|
| FileNotFoundError | if directory does not exist. | 
Source code in zenml/utils/yaml_utils.py
          def write_yaml(
    file_path: str,
    contents: Union[Dict[Any, Any], List[Any]],
    sort_keys: bool = True,
) -> None:
    """Write contents as YAML format to file_path.
    Args:
        file_path: Path to YAML file.
        contents: Contents of YAML file as dict or list.
        sort_keys: If `True`, keys are sorted alphabetically. If `False`,
            the order in which the keys were inserted into the dict will
            be preserved.
    Raises:
        FileNotFoundError: if directory does not exist.
    """
    if not io_utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    io_utils.write_file_contents_as_string(
        file_path, yaml.dump(contents, sort_keys=sort_keys)
    )