Skip to content

Io

zenml.io special

The io module handles file operations for the ZenML package.

It offers a standard interface for reading, writing and manipulating files and directories. It is heavily influenced and inspired by the io module of tfx.

fileio

Functionality for reading, writing and managing files.

convert_to_str(path)

Converts a "PathType" to a str using UTF-8.

Parameters:

Name Type Description Default
path PathType

The path to convert.

required

Returns:

Type Description
str

The path as a string.

Source code in zenml/io/fileio.py
def convert_to_str(path: "PathType") -> str:
    """Converts a "PathType" to a str using UTF-8.

    Args:
        path: The path to convert.

    Returns:
        The path as a string.
    """
    if isinstance(path, str):
        return path
    else:
        return path.decode("utf-8")

copy(src, dst, overwrite=False)

Copy a file from the source to the destination.

Parameters:

Name Type Description Default
src PathType

The path of the file to copy.

required
dst PathType

The path to copy the source file to.

required
overwrite bool

Whether to overwrite the destination file if it exists.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/io/fileio.py
def copy(src: "PathType", dst: "PathType", overwrite: bool = False) -> None:
    """Copy a file from the source to the destination.

    Args:
        src: The path of the file to copy.
        dst: The path to copy the source file to.
        overwrite: Whether to overwrite the destination file if it exists.

    Raises:
        FileExistsError: If a file already exists at the destination and
            `overwrite` is not set to `True`.
    """
    src_fs = _get_filesystem(src)
    dst_fs = _get_filesystem(dst)
    if src_fs is dst_fs:
        src_fs.copyfile(src, dst, overwrite=overwrite)
    else:
        if not overwrite and exists(dst):
            raise FileExistsError(
                f"Destination file '{convert_to_str(dst)}' already exists "
                f"and `overwrite` is false."
            )
        with open(src, mode="rb") as f:
            contents = f.read()

        with open(dst, mode="wb") as f:
            f.write(contents)

exists(path)

Check whether a given path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the given path exists, False otherwise.

Source code in zenml/io/fileio.py
def exists(path: "PathType") -> bool:
    """Check whether a given path exists.

    Args:
        path: The path to check.

    Returns:
        `True` if the given path exists, `False` otherwise.
    """
    return _get_filesystem(path).exists(path)

glob(pattern)

Find all files matching the given pattern.

Parameters:

Name Type Description Default
pattern PathType

The pattern to match.

required

Returns:

Type Description
List[PathType]

A list of paths matching the pattern.

Source code in zenml/io/fileio.py
def glob(pattern: "PathType") -> List["PathType"]:
    """Find all files matching the given pattern.

    Args:
        pattern: The pattern to match.

    Returns:
        A list of paths matching the pattern.
    """
    return _get_filesystem(pattern).glob(pattern)

isdir(path)

Check whether the given path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the given path is a directory, False otherwise.

Source code in zenml/io/fileio.py
def isdir(path: "PathType") -> bool:
    """Check whether the given path is a directory.

    Args:
        path: The path to check.

    Returns:
        `True` if the given path is a directory, `False` otherwise.
    """
    return _get_filesystem(path).isdir(path)

listdir(path, only_file_names=True)

Lists all files in a directory.

Parameters:

Name Type Description Default
path str

The path to the directory.

required
only_file_names bool

If True, only return the file names, not the full path.

True

Returns:

Type Description
List[str]

A list of files in the directory.

Source code in zenml/io/fileio.py
def listdir(path: str, only_file_names: bool = True) -> List[str]:
    """Lists all files in a directory.

    Args:
        path: The path to the directory.
        only_file_names: If True, only return the file names, not the full path.

    Returns:
        A list of files in the directory.
    """
    try:
        return [
            os.path.join(path, convert_to_str(f))
            if not only_file_names
            else convert_to_str(f)
            for f in _get_filesystem(path).listdir(path)
        ]
    except IOError:
        logger.debug(f"Dir {path} not found.")
        return []

makedirs(path)

Make a directory at the given path, recursively creating parents.

Parameters:

Name Type Description Default
path PathType

The path to the directory.

required
Source code in zenml/io/fileio.py
def makedirs(path: "PathType") -> None:
    """Make a directory at the given path, recursively creating parents.

    Args:
        path: The path to the directory.
    """
    _get_filesystem(path).makedirs(path)

mkdir(path)

Make a directory at the given path; parent directory must exist.

Parameters:

Name Type Description Default
path PathType

The path to the directory.

required
Source code in zenml/io/fileio.py
def mkdir(path: "PathType") -> None:
    """Make a directory at the given path; parent directory must exist.

    Args:
        path: The path to the directory.
    """
    _get_filesystem(path).mkdir(path)

open(path, mode='r')

Opens a file.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required
mode str

The mode to open the file in.

'r'

Returns:

Type Description
Any

The opened file.

Source code in zenml/io/fileio.py
def open(path: "PathType", mode: str = "r") -> Any:  # noqa
    """Opens a file.

    Args:
        path: The path to the file.
        mode: The mode to open the file in.

    Returns:
        The opened file.
    """
    return _get_filesystem(path).open(path, mode=mode)

remove(path)

Remove the file at the given path. Dangerous operation.

Parameters:

Name Type Description Default
path PathType

The path to the file to remove.

required

Exceptions:

Type Description
FileNotFoundError

If the file does not exist.

Source code in zenml/io/fileio.py
def remove(path: "PathType") -> None:
    """Remove the file at the given path. Dangerous operation.

    Args:
        path: The path to the file to remove.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    if not exists(path):
        raise FileNotFoundError(f"{convert_to_str(path)} does not exist!")
    _get_filesystem(path).remove(path)

rename(src, dst, overwrite=False)

Rename a file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
NotImplementedError

If the source and destination file systems are not the same.

Source code in zenml/io/fileio.py
def rename(src: "PathType", dst: "PathType", overwrite: bool = False) -> None:
    """Rename a file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        NotImplementedError: If the source and destination file systems are not
            the same.
    """
    src_fs = _get_filesystem(src)
    dst_fs = _get_filesystem(dst)
    if src_fs is dst_fs:
        src_fs.rename(src, dst, overwrite=overwrite)
    else:
        raise NotImplementedError(
            f"Renaming from {convert_to_str(src)} to {convert_to_str(dst)} "
            f"using different file systems plugins is currently not supported."
        )

rmtree(dir_path)

Deletes a directory recursively. Dangerous operation.

Parameters:

Name Type Description Default
dir_path str

The path to the directory to delete.

required

Exceptions:

Type Description
TypeError

If the path is not pointing to a directory.

Source code in zenml/io/fileio.py
def rmtree(dir_path: str) -> None:
    """Deletes a directory recursively. Dangerous operation.

    Args:
        dir_path: The path to the directory to delete.

    Raises:
        TypeError: If the path is not pointing to a directory.
    """
    if not isdir(dir_path):
        raise TypeError(f"Path '{dir_path}' is not a directory.")

    _get_filesystem(dir_path).rmtree(dir_path)

size(path)

Get the size of a file or directory in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
Optional[int]

The size of the file or directory in bytes or None if the responsible file system does not implement the size method.

Source code in zenml/io/fileio.py
def size(path: "PathType") -> Optional[int]:
    """Get the size of a file or directory in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file or directory in bytes or `None` if the responsible
        file system does not implement the `size` method.
    """
    file_system = _get_filesystem(path)

    # If the file system does not implement the `size` method, return `None`.
    if file_system.size == BaseFilesystem.size:
        logger.warning(
            "Cannot get size of file or directory '%s' since the responsible "
            "file system `%s` does not implement the `size` method.",
            path,
            file_system.__name__,
        )
        return None

    # If the path does not exist, return 0.
    if not exists(path):
        return 0

    # If the path is a file, return its size.
    if not file_system.isdir(path):
        return file_system.size(path)

    # If the path is a directory, recursively sum the sizes of everything in it.
    files = file_system.listdir(path)
    file_sizes = [size(os.path.join(str(path), str(file))) for file in files]
    return sum(
        [file_size for file_size in file_sizes if file_size is not None]
    )

stat(path)

Get the stat descriptor for a given file path.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
Any

The stat descriptor.

Source code in zenml/io/fileio.py
def stat(path: "PathType") -> Any:
    """Get the stat descriptor for a given file path.

    Args:
        path: The path to the file.

    Returns:
        The stat descriptor.
    """
    return _get_filesystem(path).stat(path)

walk(top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

The path of directory to walk.

required
topdown bool

Whether to walk directories topdown or bottom-up.

True
onerror Optional[Callable[..., NoneType]]

Callable that gets called if an error occurs.

None

Returns:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/io/fileio.py
def walk(
    top: "PathType",
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple["PathType", List["PathType"], List["PathType"]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: The path of directory to walk.
        topdown: Whether to walk directories topdown or bottom-up.
        onerror: Callable that gets called if an error occurs.

    Returns:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    return _get_filesystem(top).walk(top, topdown=topdown, onerror=onerror)

filesystem

Defines the filesystem abstraction of ZenML.

BaseFilesystem (ABC)

Abstract Filesystem base class.

Design inspired by the Filesystem abstraction in TFX: https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/filesystem.py

Source code in zenml/io/filesystem.py
class BaseFilesystem(ABC):
    """Abstract Filesystem base class.

    Design inspired by the `Filesystem` abstraction in TFX:
    https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/filesystem.py
    """

    SUPPORTED_SCHEMES: ClassVar[Set[str]]

    @staticmethod
    @abstractmethod
    def open(path: PathType, mode: str = "r") -> Any:
        """Opens a file.

        Args:
            path: The path to the file.
            mode: The mode to open the file in.

        Returns:
            The opened file.
        """

    @staticmethod
    @abstractmethod
    def copyfile(
        src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file from the source to the destination.

        Args:
            src: The path of the file to copy.
            dst: The path to copy the source file to.
            overwrite: Whether to overwrite the destination file if it exists.

        Raises:
            FileExistsError: If a file already exists at the destination and
                `overwrite` is not set to `True`.
        """

    @staticmethod
    @abstractmethod
    def exists(path: PathType) -> bool:
        """Check whether a given path exists.

        Args:
            path: The path to check.

        Returns:
            `True` if the given path exists, `False` otherwise.
        """

    @staticmethod
    @abstractmethod
    def glob(pattern: PathType) -> List[PathType]:
        """Find all files matching the given pattern.

        Args:
            pattern: The pattern to match.

        Returns:
            A list of paths matching the pattern.
        """

    @staticmethod
    @abstractmethod
    def isdir(path: PathType) -> bool:
        """Check whether the given path is a directory.

        Args:
            path: The path to check.

        Returns:
            `True` if the given path is a directory, `False` otherwise.
        """

    @staticmethod
    @abstractmethod
    def listdir(path: PathType) -> List[PathType]:
        """Lists all files in a directory.

        Args:
            path: The path to the directory.

        Returns:
            A list of files in the directory.
        """

    @staticmethod
    @abstractmethod
    def makedirs(path: PathType) -> None:
        """Make a directory at the given path, recursively creating parents.

        Args:
            path: Path to the directory.
        """

    @staticmethod
    @abstractmethod
    def mkdir(path: PathType) -> None:
        """Make a directory at the given path; parent directory must exist.

        Args:
            path: Path to the directory.
        """

    @staticmethod
    @abstractmethod
    def remove(path: PathType) -> None:
        """Remove the file at the given path. Dangerous operation.

        Args:
            path: The path to the file to remove.

        Raises:
            FileNotFoundError: If the file does not exist.
        """

    @staticmethod
    @abstractmethod
    def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
        """Rename a file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """

    @staticmethod
    @abstractmethod
    def rmtree(path: PathType) -> None:
        """Deletes a directory recursively. Dangerous operation.

        Args:
            path: The path to the directory to delete.
        """

    @staticmethod
    @abstractmethod
    def stat(path: PathType) -> Any:
        """Get the stat descriptor for a given file path.

        Args:
            path: The path to the file.

        Returns:
            The stat descriptor.
        """

    @staticmethod
    def size(path: PathType) -> int:
        """Get the size of a file in bytes.

        To be implemented by subclasses but not abstract for backwards
        compatibility.

        Args:
            path: The path to the file.

        Returns:
            The size of the file in bytes.
        """
        return -1

    @staticmethod
    @abstractmethod
    def walk(
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: The path of directory to walk.
            topdown: Whether to walk directories topdown or bottom-up.
            onerror: Callable that gets called if an error occurs.

        Returns:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
copyfile(src, dst, overwrite=False) staticmethod

Copy a file from the source to the destination.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to copy.

required
dst Union[bytes, str]

The path to copy the source file to.

required
overwrite bool

Whether to overwrite the destination file if it exists.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def copyfile(
    src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file from the source to the destination.

    Args:
        src: The path of the file to copy.
        dst: The path to copy the source file to.
        overwrite: Whether to overwrite the destination file if it exists.

    Raises:
        FileExistsError: If a file already exists at the destination and
            `overwrite` is not set to `True`.
    """
exists(path) staticmethod

Check whether a given path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the given path exists, False otherwise.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def exists(path: PathType) -> bool:
    """Check whether a given path exists.

    Args:
        path: The path to check.

    Returns:
        `True` if the given path exists, `False` otherwise.
    """
glob(pattern) staticmethod

Find all files matching the given pattern.

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The pattern to match.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths matching the pattern.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def glob(pattern: PathType) -> List[PathType]:
    """Find all files matching the given pattern.

    Args:
        pattern: The pattern to match.

    Returns:
        A list of paths matching the pattern.
    """
isdir(path) staticmethod

Check whether the given path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the given path is a directory, False otherwise.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def isdir(path: PathType) -> bool:
    """Check whether the given path is a directory.

    Args:
        path: The path to check.

    Returns:
        `True` if the given path is a directory, `False` otherwise.
    """
listdir(path) staticmethod

Lists all files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the directory.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of files in the directory.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def listdir(path: PathType) -> List[PathType]:
    """Lists all files in a directory.

    Args:
        path: The path to the directory.

    Returns:
        A list of files in the directory.
    """
makedirs(path) staticmethod

Make a directory at the given path, recursively creating parents.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path to the directory.

required
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def makedirs(path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents.

    Args:
        path: Path to the directory.
    """
mkdir(path) staticmethod

Make a directory at the given path; parent directory must exist.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path to the directory.

required
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def mkdir(path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist.

    Args:
        path: Path to the directory.
    """
open(path, mode='r') staticmethod

Opens a file.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required
mode str

The mode to open the file in.

'r'

Returns:

Type Description
Any

The opened file.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def open(path: PathType, mode: str = "r") -> Any:
    """Opens a file.

    Args:
        path: The path to the file.
        mode: The mode to open the file in.

    Returns:
        The opened file.
    """
remove(path) staticmethod

Remove the file at the given path. Dangerous operation.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file to remove.

required

Exceptions:

Type Description
FileNotFoundError

If the file does not exist.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def remove(path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation.

    Args:
        path: The path to the file to remove.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
rename(src, dst, overwrite=False) staticmethod

Rename a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Rename a file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
rmtree(path) staticmethod

Deletes a directory recursively. Dangerous operation.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the directory to delete.

required
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def rmtree(path: PathType) -> None:
    """Deletes a directory recursively. Dangerous operation.

    Args:
        path: The path to the directory to delete.
    """
size(path) staticmethod

Get the size of a file in bytes.

To be implemented by subclasses but not abstract for backwards compatibility.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in zenml/io/filesystem.py
@staticmethod
def size(path: PathType) -> int:
    """Get the size of a file in bytes.

    To be implemented by subclasses but not abstract for backwards
    compatibility.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return -1
stat(path) staticmethod

Get the stat descriptor for a given file path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
Any

The stat descriptor.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def stat(path: PathType) -> Any:
    """Get the stat descriptor for a given file path.

    Args:
        path: The path to the file.

    Returns:
        The stat descriptor.
    """
walk(top, topdown=True, onerror=None) staticmethod

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

The path of directory to walk.

required
topdown bool

Whether to walk directories topdown or bottom-up.

True
onerror Optional[Callable[..., NoneType]]

Callable that gets called if an error occurs.

None

Returns:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def walk(
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: The path of directory to walk.
        topdown: Whether to walk directories topdown or bottom-up.
        onerror: Callable that gets called if an error occurs.

    Returns:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """

filesystem_registry

Filesystem registry managing filesystem plugins.

FileIORegistry

Registry of pluggable filesystem implementations.

Source code in zenml/io/filesystem_registry.py
class FileIORegistry:
    """Registry of pluggable filesystem implementations."""

    def __init__(self) -> None:
        """Initialize the registry."""
        self._filesystems: Dict["PathType", Type["BaseFilesystem"]] = {}
        self._registration_lock = Lock()

    def register(self, filesystem_cls: Type["BaseFilesystem"]) -> None:
        """Register a filesystem implementation.

        Args:
            filesystem_cls: Subclass of `zenml.io.filesystem.Filesystem`.
        """
        with self._registration_lock:
            for scheme in filesystem_cls.SUPPORTED_SCHEMES:
                current_preferred = self._filesystems.get(scheme)
                if current_preferred is not None:
                    logger.debug(
                        "Overwriting previously registered filesystem for "
                        "scheme `%s`. Old class: %s, new class: %s",
                        scheme,
                        current_preferred.__name__,
                        filesystem_cls.__name__,
                    )

                self._filesystems[scheme] = filesystem_cls

    def get_filesystem_for_scheme(
        self, scheme: "PathType"
    ) -> Type["BaseFilesystem"]:
        """Get filesystem plugin for given scheme string.

        Args:
            scheme: The scheme to get the filesystem for.

        Returns:
            The filesystem plugin for the given scheme.

        Raises:
            ValueError: If no filesystem plugin is registered for the given
                scheme.
        """
        if isinstance(scheme, bytes):
            scheme = scheme.decode("utf-8")
        if scheme not in self._filesystems:
            raise ValueError(
                f"No file systems were found for the scheme: "
                f"{scheme}. Please make sure that you are using "
                f"the right path and the all the necessary "
                f"integrations are properly installed."
            )
        return self._filesystems[scheme]

    def get_filesystem_for_path(
        self, path: "PathType"
    ) -> Type["BaseFilesystem"]:
        """Get filesystem plugin for given path.

        Args:
            path: The path to get the filesystem for.

        Returns:
            The filesystem plugin for the given path.

        Raises:
            ValueError: If no filesystem plugin is registered for the given
                path.
        """
        # Assume local path by default, but extract filesystem prefix if available.
        if isinstance(path, str):
            path_bytes = path.encode("utf-8")
        elif isinstance(path, bytes):
            path_bytes = path
        else:
            raise ValueError("Invalid path type: %r." % path)
        result = re.match(b"^([a-z0-9]+://)", path_bytes)
        if result:
            scheme = result.group(1).decode("utf-8")
        else:
            scheme = ""
        return self.get_filesystem_for_scheme(scheme)
__init__(self) special

Initialize the registry.

Source code in zenml/io/filesystem_registry.py
def __init__(self) -> None:
    """Initialize the registry."""
    self._filesystems: Dict["PathType", Type["BaseFilesystem"]] = {}
    self._registration_lock = Lock()
get_filesystem_for_path(self, path)

Get filesystem plugin for given path.

Parameters:

Name Type Description Default
path PathType

The path to get the filesystem for.

required

Returns:

Type Description
Type[BaseFilesystem]

The filesystem plugin for the given path.

Exceptions:

Type Description
ValueError

If no filesystem plugin is registered for the given path.

Source code in zenml/io/filesystem_registry.py
def get_filesystem_for_path(
    self, path: "PathType"
) -> Type["BaseFilesystem"]:
    """Get filesystem plugin for given path.

    Args:
        path: The path to get the filesystem for.

    Returns:
        The filesystem plugin for the given path.

    Raises:
        ValueError: If no filesystem plugin is registered for the given
            path.
    """
    # Assume local path by default, but extract filesystem prefix if available.
    if isinstance(path, str):
        path_bytes = path.encode("utf-8")
    elif isinstance(path, bytes):
        path_bytes = path
    else:
        raise ValueError("Invalid path type: %r." % path)
    result = re.match(b"^([a-z0-9]+://)", path_bytes)
    if result:
        scheme = result.group(1).decode("utf-8")
    else:
        scheme = ""
    return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_scheme(self, scheme)

Get filesystem plugin for given scheme string.

Parameters:

Name Type Description Default
scheme PathType

The scheme to get the filesystem for.

required

Returns:

Type Description
Type[BaseFilesystem]

The filesystem plugin for the given scheme.

Exceptions:

Type Description
ValueError

If no filesystem plugin is registered for the given scheme.

Source code in zenml/io/filesystem_registry.py
def get_filesystem_for_scheme(
    self, scheme: "PathType"
) -> Type["BaseFilesystem"]:
    """Get filesystem plugin for given scheme string.

    Args:
        scheme: The scheme to get the filesystem for.

    Returns:
        The filesystem plugin for the given scheme.

    Raises:
        ValueError: If no filesystem plugin is registered for the given
            scheme.
    """
    if isinstance(scheme, bytes):
        scheme = scheme.decode("utf-8")
    if scheme not in self._filesystems:
        raise ValueError(
            f"No file systems were found for the scheme: "
            f"{scheme}. Please make sure that you are using "
            f"the right path and the all the necessary "
            f"integrations are properly installed."
        )
    return self._filesystems[scheme]
register(self, filesystem_cls)

Register a filesystem implementation.

Parameters:

Name Type Description Default
filesystem_cls Type[BaseFilesystem]

Subclass of zenml.io.filesystem.Filesystem.

required
Source code in zenml/io/filesystem_registry.py
def register(self, filesystem_cls: Type["BaseFilesystem"]) -> None:
    """Register a filesystem implementation.

    Args:
        filesystem_cls: Subclass of `zenml.io.filesystem.Filesystem`.
    """
    with self._registration_lock:
        for scheme in filesystem_cls.SUPPORTED_SCHEMES:
            current_preferred = self._filesystems.get(scheme)
            if current_preferred is not None:
                logger.debug(
                    "Overwriting previously registered filesystem for "
                    "scheme `%s`. Old class: %s, new class: %s",
                    scheme,
                    current_preferred.__name__,
                    filesystem_cls.__name__,
                )

            self._filesystems[scheme] = filesystem_cls

local_filesystem

Local filesystem using Python's built-in modules (os, shutil, glob).

LocalFilesystem (BaseFilesystem)

Filesystem that uses local file operations.

Implementation inspired by TFX: https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/plugins/local.py

Source code in zenml/io/local_filesystem.py
class LocalFilesystem(BaseFilesystem):
    """Filesystem that uses local file operations.

    Implementation inspired by TFX:
    https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/plugins/local.py
    """

    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}

    @staticmethod
    def open(path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: The path to the file.
            mode: The mode to open the file.

        Returns:
            Any: The file object.
        """
        encoding = "utf-8" if "b" not in mode else None
        return open(path, mode=mode, encoding=encoding)

    @staticmethod
    def copyfile(
        src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file from the source to the destination.

        Args:
            src: The source path.
            dst: The destination path.
            overwrite: Whether to overwrite the destination file if it exists.

        Raises:
            FileExistsError: If the destination file exists and overwrite is
                False.
        """
        if not overwrite and os.path.exists(dst):
            raise FileExistsError(
                f"Destination file {str(dst)} already exists and argument "
                f"`overwrite` is false."
            )
        shutil.copyfile(src, dst)

    @staticmethod
    def exists(path: PathType) -> bool:
        """Returns `True` if the given path exists.

        Args:
            path: The path to check.

        Returns:
            bool: Whether the path exists.
        """
        return os.path.exists(path)

    @staticmethod
    def glob(pattern: PathType) -> List[PathType]:
        """Return the paths that match a glob pattern.

        Args:
            pattern: The glob pattern.

        Returns:
            List[PathType]: The paths that match the glob pattern.
        """
        return glob.glob(pattern)  # type: ignore[type-var]

    @staticmethod
    def isdir(path: PathType) -> bool:
        """Returns whether the given path points to a directory.

        Args:
            path: The path to check.

        Returns:
            bool: Whether the path points to a directory.
        """
        return os.path.isdir(path)

    @staticmethod
    def listdir(path: PathType) -> List[PathType]:
        """Returns a list of files under a given directory in the filesystem.

        Args:
            path: The path to the directory.

        Returns:
            List[PathType]: The list of files under the given directory.
        """
        return os.listdir(path)  # type:ignore[return-value]

    @staticmethod
    def makedirs(path: PathType) -> None:
        """Make a directory at the given path, recursively creating parents.

        Args:
            path: The path to the directory.
        """
        os.makedirs(path, exist_ok=True)

    @staticmethod
    def mkdir(path: PathType) -> None:
        """Make a directory at the given path; parent directory must exist.

        Args:
            path: The path to the directory.
        """
        os.mkdir(path)

    @staticmethod
    def remove(path: PathType) -> None:
        """Remove the file at the given path. Dangerous operation.

        Args:
            path: The path to the file.
        """
        os.remove(path)

    @staticmethod
    def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True`

        Raises:
            FileExistsError: If the destination file exists and overwrite is
                False.
        """
        if not overwrite and os.path.exists(dst):
            raise FileExistsError(
                f"Destination path {str(dst)} already exists and argument "
                f"`overwrite` is false."
            )
        os.rename(src, dst)

    @staticmethod
    def rmtree(path: PathType) -> None:
        """Deletes dir recursively. Dangerous operation.

        Args:
            path: The path to the directory.
        """
        shutil.rmtree(path)

    @staticmethod
    def stat(path: PathType) -> Any:
        """Return the stat descriptor for a given file path.

        Args:
            path: The path to the file.

        Returns:
            Any: The stat descriptor for the file.
        """
        return os.stat(path)

    @staticmethod
    def size(path: PathType) -> int:
        """Get the size of a file in bytes.

        Args:
            path: The path to the file.

        Returns:
            The size of the file in bytes.
        """
        return os.path.getsize(path)

    @staticmethod
    def walk(
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Whether to walk directories topdown or bottom-up.
            onerror: Callable that gets called if an error occurs.

        Yields:
            An Iterable of Tuples, each of which contain the path of the
            current directory path, a list of directories inside the
            current directory and a list of files inside the current
            directory.
        """
        yield from os.walk(  # type: ignore[type-var, misc]
            top, topdown=topdown, onerror=onerror
        )
copyfile(src, dst, overwrite=False) staticmethod

Copy a file from the source to the destination.

Parameters:

Name Type Description Default
src Union[bytes, str]

The source path.

required
dst Union[bytes, str]

The destination path.

required
overwrite bool

Whether to overwrite the destination file if it exists.

False

Exceptions:

Type Description
FileExistsError

If the destination file exists and overwrite is False.

Source code in zenml/io/local_filesystem.py
@staticmethod
def copyfile(
    src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file from the source to the destination.

    Args:
        src: The source path.
        dst: The destination path.
        overwrite: Whether to overwrite the destination file if it exists.

    Raises:
        FileExistsError: If the destination file exists and overwrite is
            False.
    """
    if not overwrite and os.path.exists(dst):
        raise FileExistsError(
            f"Destination file {str(dst)} already exists and argument "
            f"`overwrite` is false."
        )
    shutil.copyfile(src, dst)
exists(path) staticmethod

Returns True if the given path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

Whether the path exists.

Source code in zenml/io/local_filesystem.py
@staticmethod
def exists(path: PathType) -> bool:
    """Returns `True` if the given path exists.

    Args:
        path: The path to check.

    Returns:
        bool: Whether the path exists.
    """
    return os.path.exists(path)
glob(pattern) staticmethod

Return the paths that match a glob pattern.

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern.

required

Returns:

Type Description
List[PathType]

The paths that match the glob pattern.

Source code in zenml/io/local_filesystem.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
    """Return the paths that match a glob pattern.

    Args:
        pattern: The glob pattern.

    Returns:
        List[PathType]: The paths that match the glob pattern.
    """
    return glob.glob(pattern)  # type: ignore[type-var]
isdir(path) staticmethod

Returns whether the given path points to a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

Whether the path points to a directory.

Source code in zenml/io/local_filesystem.py
@staticmethod
def isdir(path: PathType) -> bool:
    """Returns whether the given path points to a directory.

    Args:
        path: The path to check.

    Returns:
        bool: Whether the path points to a directory.
    """
    return os.path.isdir(path)
listdir(path) staticmethod

Returns a list of files under a given directory in the filesystem.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the directory.

required

Returns:

Type Description
List[PathType]

The list of files under the given directory.

Source code in zenml/io/local_filesystem.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
    """Returns a list of files under a given directory in the filesystem.

    Args:
        path: The path to the directory.

    Returns:
        List[PathType]: The list of files under the given directory.
    """
    return os.listdir(path)  # type:ignore[return-value]
makedirs(path) staticmethod

Make a directory at the given path, recursively creating parents.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the directory.

required
Source code in zenml/io/local_filesystem.py
@staticmethod
def makedirs(path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents.

    Args:
        path: The path to the directory.
    """
    os.makedirs(path, exist_ok=True)
mkdir(path) staticmethod

Make a directory at the given path; parent directory must exist.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the directory.

required
Source code in zenml/io/local_filesystem.py
@staticmethod
def mkdir(path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist.

    Args:
        path: The path to the directory.
    """
    os.mkdir(path)
open(path, mode='r') staticmethod

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required
mode str

The mode to open the file.

'r'

Returns:

Type Description
Any

The file object.

Source code in zenml/io/local_filesystem.py
@staticmethod
def open(path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: The path to the file.
        mode: The mode to open the file.

    Returns:
        Any: The file object.
    """
    encoding = "utf-8" if "b" not in mode else None
    return open(path, mode=mode, encoding=encoding)
remove(path) staticmethod

Remove the file at the given path. Dangerous operation.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required
Source code in zenml/io/local_filesystem.py
@staticmethod
def remove(path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation.

    Args:
        path: The path to the file.
    """
    os.remove(path)
rename(src, dst, overwrite=False) staticmethod

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True

False

Exceptions:

Type Description
FileExistsError

If the destination file exists and overwrite is False.

Source code in zenml/io/local_filesystem.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True`

    Raises:
        FileExistsError: If the destination file exists and overwrite is
            False.
    """
    if not overwrite and os.path.exists(dst):
        raise FileExistsError(
            f"Destination path {str(dst)} already exists and argument "
            f"`overwrite` is false."
        )
    os.rename(src, dst)
rmtree(path) staticmethod

Deletes dir recursively. Dangerous operation.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the directory.

required
Source code in zenml/io/local_filesystem.py
@staticmethod
def rmtree(path: PathType) -> None:
    """Deletes dir recursively. Dangerous operation.

    Args:
        path: The path to the directory.
    """
    shutil.rmtree(path)
size(path) staticmethod

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in zenml/io/local_filesystem.py
@staticmethod
def size(path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return os.path.getsize(path)
stat(path) staticmethod

Return the stat descriptor for a given file path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
Any

The stat descriptor for the file.

Source code in zenml/io/local_filesystem.py
@staticmethod
def stat(path: PathType) -> Any:
    """Return the stat descriptor for a given file path.

    Args:
        path: The path to the file.

    Returns:
        Any: The stat descriptor for the file.
    """
    return os.stat(path)
walk(top, topdown=True, onerror=None) staticmethod

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Whether to walk directories topdown or bottom-up.

True
onerror Optional[Callable[..., NoneType]]

Callable that gets called if an error occurs.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/io/local_filesystem.py
@staticmethod
def walk(
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Whether to walk directories topdown or bottom-up.
        onerror: Callable that gets called if an error occurs.

    Yields:
        An Iterable of Tuples, each of which contain the path of the
        current directory path, a list of directories inside the
        current directory and a list of files inside the current
        directory.
    """
    yield from os.walk(  # type: ignore[type-var, misc]
        top, topdown=topdown, onerror=onerror
    )