Io
zenml.io
special
The io module handles file operations for the ZenML package.
It offers a standard interface for reading, writing and manipulating files and
directories. It is heavily influenced and inspired by the io module of tfx.
fileio
Functionality for reading, writing and managing files.
convert_to_str(path)
Converts a "PathType" to a str using UTF-8.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to convert. |
required |
Returns:
| Type | Description |
|---|---|
str |
The path as a string. |
Source code in zenml/io/fileio.py
def convert_to_str(path: "PathType") -> str:
"""Converts a "PathType" to a str using UTF-8.
Args:
path: The path to convert.
Returns:
The path as a string.
"""
if isinstance(path, str):
return path
else:
return path.decode("utf-8")
copy(src, dst, overwrite=False)
Copy a file from the source to the destination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
PathType |
The path of the file to copy. |
required |
dst |
PathType |
The path to copy the source file to. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Exceptions:
| Type | Description |
|---|---|
FileExistsError |
If a file already exists at the destination and
|
Source code in zenml/io/fileio.py
def copy(src: "PathType", dst: "PathType", overwrite: bool = False) -> None:
"""Copy a file from the source to the destination.
Args:
src: The path of the file to copy.
dst: The path to copy the source file to.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If a file already exists at the destination and
`overwrite` is not set to `True`.
"""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.copyfile(src, dst, overwrite=overwrite)
else:
if not overwrite and exists(dst):
raise FileExistsError(
f"Destination file '{convert_to_str(dst)}' already exists "
f"and `overwrite` is false."
)
with open(src, mode="rb") as f:
contents = f.read()
with open(dst, mode="wb") as f:
f.write(contents)
exists(path)
Check whether a given path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool |
|
Source code in zenml/io/fileio.py
def exists(path: "PathType") -> bool:
"""Check whether a given path exists.
Args:
path: The path to check.
Returns:
`True` if the given path exists, `False` otherwise.
"""
return _get_filesystem(path).exists(path)
glob(pattern)
Find all files matching the given pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern |
PathType |
The pattern to match. |
required |
Returns:
| Type | Description |
|---|---|
List[PathType] |
A list of paths matching the pattern. |
Source code in zenml/io/fileio.py
def glob(pattern: "PathType") -> List["PathType"]:
"""Find all files matching the given pattern.
Args:
pattern: The pattern to match.
Returns:
A list of paths matching the pattern.
"""
return _get_filesystem(pattern).glob(pattern)
isdir(path)
Check whether the given path is a directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool |
|
Source code in zenml/io/fileio.py
def isdir(path: "PathType") -> bool:
"""Check whether the given path is a directory.
Args:
path: The path to check.
Returns:
`True` if the given path is a directory, `False` otherwise.
"""
return _get_filesystem(path).isdir(path)
listdir(path, only_file_names=True)
Lists all files in a directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
str |
The path to the directory. |
required |
only_file_names |
bool |
If True, only return the file names, not the full path. |
True |
Returns:
| Type | Description |
|---|---|
List[str] |
A list of files in the directory. |
Source code in zenml/io/fileio.py
def listdir(path: str, only_file_names: bool = True) -> List[str]:
"""Lists all files in a directory.
Args:
path: The path to the directory.
only_file_names: If True, only return the file names, not the full path.
Returns:
A list of files in the directory.
"""
try:
return [
os.path.join(path, convert_to_str(f))
if not only_file_names
else convert_to_str(f)
for f in _get_filesystem(path).listdir(path)
]
except IOError:
logger.debug(f"Dir {path} not found.")
return []
makedirs(path)
Make a directory at the given path, recursively creating parents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to the directory. |
required |
Source code in zenml/io/fileio.py
def makedirs(path: "PathType") -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to the directory.
"""
_get_filesystem(path).makedirs(path)
mkdir(path)
Make a directory at the given path; parent directory must exist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to the directory. |
required |
Source code in zenml/io/fileio.py
def mkdir(path: "PathType") -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to the directory.
"""
_get_filesystem(path).mkdir(path)
open(path, mode='r')
Opens a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to the file. |
required |
mode |
str |
The mode to open the file in. |
'r' |
Returns:
| Type | Description |
|---|---|
Any |
The opened file. |
Source code in zenml/io/fileio.py
def open(path: "PathType", mode: str = "r") -> Any: # noqa
"""Opens a file.
Args:
path: The path to the file.
mode: The mode to open the file in.
Returns:
The opened file.
"""
return _get_filesystem(path).open(path, mode=mode)
remove(path)
Remove the file at the given path. Dangerous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to the file to remove. |
required |
Exceptions:
| Type | Description |
|---|---|
FileNotFoundError |
If the file does not exist. |
Source code in zenml/io/fileio.py
def remove(path: "PathType") -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file to remove.
Raises:
FileNotFoundError: If the file does not exist.
"""
if not exists(path):
raise FileNotFoundError(f"{convert_to_str(path)} does not exist!")
_get_filesystem(path).remove(path)
rename(src, dst, overwrite=False)
Rename a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
PathType |
The path of the file to rename. |
required |
dst |
PathType |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
| Type | Description |
|---|---|
NotImplementedError |
If the source and destination file systems are not the same. |
Source code in zenml/io/fileio.py
def rename(src: "PathType", dst: "PathType", overwrite: bool = False) -> None:
"""Rename a file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
NotImplementedError: If the source and destination file systems are not
the same.
"""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.rename(src, dst, overwrite=overwrite)
else:
raise NotImplementedError(
f"Renaming from {convert_to_str(src)} to {convert_to_str(dst)} "
f"using different file systems plugins is currently not supported."
)
rmtree(dir_path)
Deletes a directory recursively. Dangerous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dir_path |
str |
The path to the directory to delete. |
required |
Exceptions:
| Type | Description |
|---|---|
TypeError |
If the path is not pointing to a directory. |
Source code in zenml/io/fileio.py
def rmtree(dir_path: str) -> None:
"""Deletes a directory recursively. Dangerous operation.
Args:
dir_path: The path to the directory to delete.
Raises:
TypeError: If the path is not pointing to a directory.
"""
if not isdir(dir_path):
raise TypeError(f"Path '{dir_path}' is not a directory.")
_get_filesystem(dir_path).rmtree(dir_path)
size(path)
Get the size of a file or directory in bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
Optional[int] |
The size of the file or directory in bytes or |
Source code in zenml/io/fileio.py
def size(path: "PathType") -> Optional[int]:
"""Get the size of a file or directory in bytes.
Args:
path: The path to the file.
Returns:
The size of the file or directory in bytes or `None` if the responsible
file system does not implement the `size` method.
"""
file_system = _get_filesystem(path)
# If the file system does not implement the `size` method, return `None`.
if file_system.size == BaseFilesystem.size:
logger.warning(
"Cannot get size of file or directory '%s' since the responsible "
"file system `%s` does not implement the `size` method.",
path,
file_system.__name__,
)
return None
# If the path does not exist, return 0.
if not exists(path):
return 0
# If the path is a file, return its size.
if not file_system.isdir(path):
return file_system.size(path)
# If the path is a directory, recursively sum the sizes of everything in it.
files = file_system.listdir(path)
file_sizes = [size(os.path.join(str(path), str(file))) for file in files]
return sum(
[file_size for file_size in file_sizes if file_size is not None]
)
stat(path)
Get the stat descriptor for a given file path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
Any |
The stat descriptor. |
Source code in zenml/io/fileio.py
def stat(path: "PathType") -> Any:
"""Get the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
The stat descriptor.
"""
return _get_filesystem(path).stat(path)
walk(top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top |
PathType |
The path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
| Type | Description |
|---|---|
Iterable[Tuple[PathType, List[PathType], List[PathType]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/fileio.py
def walk(
top: "PathType",
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple["PathType", List["PathType"], List["PathType"]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
return _get_filesystem(top).walk(top, topdown=topdown, onerror=onerror)
filesystem
Defines the filesystem abstraction of ZenML.
BaseFilesystem (ABC)
Abstract Filesystem base class.
Design inspired by the Filesystem abstraction in TFX:
https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/filesystem.py
Source code in zenml/io/filesystem.py
class BaseFilesystem(ABC):
"""Abstract Filesystem base class.
Design inspired by the `Filesystem` abstraction in TFX:
https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/filesystem.py
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]]
@staticmethod
@abstractmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Opens a file.
Args:
name: The path to the file.
mode: The mode to open the file in.
Returns:
The opened file.
"""
@staticmethod
@abstractmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The path of the file to copy.
dst: The path to copy the source file to.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If a file already exists at the destination and
`overwrite` is not set to `True`.
"""
@staticmethod
@abstractmethod
def exists(path: PathType) -> bool:
"""Check whether a given path exists.
Args:
path: The path to check.
Returns:
`True` if the given path exists, `False` otherwise.
"""
@staticmethod
@abstractmethod
def glob(pattern: PathType) -> List[PathType]:
"""Find all files matching the given pattern.
Args:
pattern: The pattern to match.
Returns:
A list of paths matching the pattern.
"""
@staticmethod
@abstractmethod
def isdir(path: PathType) -> bool:
"""Check whether the given path is a directory.
Args:
path: The path to check.
Returns:
`True` if the given path is a directory, `False` otherwise.
"""
@staticmethod
@abstractmethod
def listdir(path: PathType) -> List[PathType]:
"""Lists all files in a directory.
Args:
path: The path to the directory.
Returns:
A list of files in the directory.
"""
@staticmethod
@abstractmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: Path to the directory.
"""
@staticmethod
@abstractmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: Path to the directory.
"""
@staticmethod
@abstractmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file to remove.
Raises:
FileNotFoundError: If the file does not exist.
"""
@staticmethod
@abstractmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename a file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
@staticmethod
@abstractmethod
def rmtree(path: PathType) -> None:
"""Deletes a directory recursively. Dangerous operation.
Args:
path: The path to the directory to delete.
"""
@staticmethod
@abstractmethod
def stat(path: PathType) -> Any:
"""Get the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
The stat descriptor.
"""
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
To be implemented by subclasses but not abstract for backwards
compatibility.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return -1
@staticmethod
@abstractmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
Union[bytes, str] |
The path of the file to copy. |
required |
dst |
Union[bytes, str] |
The path to copy the source file to. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Exceptions:
| Type | Description |
|---|---|
FileExistsError |
If a file already exists at the destination and
|
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The path of the file to copy.
dst: The path to copy the source file to.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If a file already exists at the destination and
`overwrite` is not set to `True`.
"""
exists(path)
staticmethod
Check whether a given path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool |
|
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def exists(path: PathType) -> bool:
"""Check whether a given path exists.
Args:
path: The path to check.
Returns:
`True` if the given path exists, `False` otherwise.
"""
glob(pattern)
staticmethod
Find all files matching the given pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern |
Union[bytes, str] |
The pattern to match. |
required |
Returns:
| Type | Description |
|---|---|
List[Union[bytes, str]] |
A list of paths matching the pattern. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def glob(pattern: PathType) -> List[PathType]:
"""Find all files matching the given pattern.
Args:
pattern: The pattern to match.
Returns:
A list of paths matching the pattern.
"""
isdir(path)
staticmethod
Check whether the given path is a directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool |
|
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def isdir(path: PathType) -> bool:
"""Check whether the given path is a directory.
Args:
path: The path to check.
Returns:
`True` if the given path is a directory, `False` otherwise.
"""
listdir(path)
staticmethod
Lists all files in a directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Returns:
| Type | Description |
|---|---|
List[Union[bytes, str]] |
A list of files in the directory. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def listdir(path: PathType) -> List[PathType]:
"""Lists all files in a directory.
Args:
path: The path to the directory.
Returns:
A list of files in the directory.
"""
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
Path to the directory. |
required |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: Path to the directory.
"""
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
Path to the directory. |
required |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: Path to the directory.
"""
open(name, mode='r')
staticmethod
Opens a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
Union[bytes, str] |
The path to the file. |
required |
mode |
str |
The mode to open the file in. |
'r' |
Returns:
| Type | Description |
|---|---|
Any |
The opened file. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Opens a file.
Args:
name: The path to the file.
mode: The mode to open the file in.
Returns:
The opened file.
"""
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the file to remove. |
required |
Exceptions:
| Type | Description |
|---|---|
FileNotFoundError |
If the file does not exist. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file to remove.
Raises:
FileNotFoundError: If the file does not exist.
"""
rename(src, dst, overwrite=False)
staticmethod
Rename a file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
| Type | Description |
|---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename a file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
rmtree(path)
staticmethod
Deletes a directory recursively. Dangerous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the directory to delete. |
required |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def rmtree(path: PathType) -> None:
"""Deletes a directory recursively. Dangerous operation.
Args:
path: The path to the directory to delete.
"""
size(path)
staticmethod
Get the size of a file in bytes.
To be implemented by subclasses but not abstract for backwards compatibility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
int |
The size of the file in bytes. |
Source code in zenml/io/filesystem.py
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
To be implemented by subclasses but not abstract for backwards
compatibility.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return -1
stat(path)
staticmethod
Get the stat descriptor for a given file path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
Any |
The stat descriptor. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def stat(path: PathType) -> Any:
"""Get the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
The stat descriptor.
"""
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top |
Union[bytes, str] |
The path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
| Type | Description |
|---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
filesystem_registry
Filesystem registry managing filesystem plugins.
FileIORegistry
Registry of pluggable filesystem implementations.
Source code in zenml/io/filesystem_registry.py
class FileIORegistry:
"""Registry of pluggable filesystem implementations."""
def __init__(self) -> None:
"""Initialize the registry."""
self._filesystems: Dict["PathType", Type["BaseFilesystem"]] = {}
self._registration_lock = Lock()
def register(self, filesystem_cls: Type["BaseFilesystem"]) -> None:
"""Register a filesystem implementation.
Args:
filesystem_cls: Subclass of `zenml.io.filesystem.Filesystem`.
"""
with self._registration_lock:
for scheme in filesystem_cls.SUPPORTED_SCHEMES:
current_preferred = self._filesystems.get(scheme)
if current_preferred is not None:
logger.debug(
"Overwriting previously registered filesystem for "
"scheme `%s`. Old class: %s, new class: %s",
scheme,
current_preferred.__name__,
filesystem_cls.__name__,
)
self._filesystems[scheme] = filesystem_cls
def get_filesystem_for_scheme(
self, scheme: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given scheme string.
Args:
scheme: The scheme to get the filesystem for.
Returns:
The filesystem plugin for the given scheme.
Raises:
ValueError: If no filesystem plugin is registered for the given
scheme.
"""
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
if scheme not in self._filesystems:
raise ValueError(
f"No file systems were found for the scheme: "
f"{scheme}. Please make sure that you are using "
f"the right path and the all the necessary "
f"integrations are properly installed."
)
return self._filesystems[scheme]
def get_filesystem_for_path(
self, path: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given path.
Args:
path: The path to get the filesystem for.
Returns:
The filesystem plugin for the given path.
Raises:
ValueError: If no filesystem plugin is registered for the given
path.
"""
# Assume local path by default, but extract filesystem prefix if available.
if isinstance(path, str):
path_bytes = path.encode("utf-8")
elif isinstance(path, bytes):
path_bytes = path
else:
raise ValueError("Invalid path type: %r." % path)
result = re.match(b"^([a-z0-9]+://)", path_bytes)
if result:
scheme = result.group(1).decode("utf-8")
else:
scheme = ""
return self.get_filesystem_for_scheme(scheme)
__init__(self)
special
Initialize the registry.
Source code in zenml/io/filesystem_registry.py
def __init__(self) -> None:
"""Initialize the registry."""
self._filesystems: Dict["PathType", Type["BaseFilesystem"]] = {}
self._registration_lock = Lock()
get_filesystem_for_path(self, path)
Get filesystem plugin for given path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
PathType |
The path to get the filesystem for. |
required |
Returns:
| Type | Description |
|---|---|
Type[BaseFilesystem] |
The filesystem plugin for the given path. |
Exceptions:
| Type | Description |
|---|---|
ValueError |
If no filesystem plugin is registered for the given path. |
Source code in zenml/io/filesystem_registry.py
def get_filesystem_for_path(
self, path: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given path.
Args:
path: The path to get the filesystem for.
Returns:
The filesystem plugin for the given path.
Raises:
ValueError: If no filesystem plugin is registered for the given
path.
"""
# Assume local path by default, but extract filesystem prefix if available.
if isinstance(path, str):
path_bytes = path.encode("utf-8")
elif isinstance(path, bytes):
path_bytes = path
else:
raise ValueError("Invalid path type: %r." % path)
result = re.match(b"^([a-z0-9]+://)", path_bytes)
if result:
scheme = result.group(1).decode("utf-8")
else:
scheme = ""
return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_scheme(self, scheme)
Get filesystem plugin for given scheme string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
scheme |
PathType |
The scheme to get the filesystem for. |
required |
Returns:
| Type | Description |
|---|---|
Type[BaseFilesystem] |
The filesystem plugin for the given scheme. |
Exceptions:
| Type | Description |
|---|---|
ValueError |
If no filesystem plugin is registered for the given scheme. |
Source code in zenml/io/filesystem_registry.py
def get_filesystem_for_scheme(
self, scheme: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given scheme string.
Args:
scheme: The scheme to get the filesystem for.
Returns:
The filesystem plugin for the given scheme.
Raises:
ValueError: If no filesystem plugin is registered for the given
scheme.
"""
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
if scheme not in self._filesystems:
raise ValueError(
f"No file systems were found for the scheme: "
f"{scheme}. Please make sure that you are using "
f"the right path and the all the necessary "
f"integrations are properly installed."
)
return self._filesystems[scheme]
register(self, filesystem_cls)
Register a filesystem implementation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filesystem_cls |
Type[BaseFilesystem] |
Subclass of |
required |
Source code in zenml/io/filesystem_registry.py
def register(self, filesystem_cls: Type["BaseFilesystem"]) -> None:
"""Register a filesystem implementation.
Args:
filesystem_cls: Subclass of `zenml.io.filesystem.Filesystem`.
"""
with self._registration_lock:
for scheme in filesystem_cls.SUPPORTED_SCHEMES:
current_preferred = self._filesystems.get(scheme)
if current_preferred is not None:
logger.debug(
"Overwriting previously registered filesystem for "
"scheme `%s`. Old class: %s, new class: %s",
scheme,
current_preferred.__name__,
filesystem_cls.__name__,
)
self._filesystems[scheme] = filesystem_cls
local_filesystem
Local filesystem using Python's built-in modules (os, shutil, glob).
LocalFilesystem (BaseFilesystem)
Filesystem that uses local file operations.
Implementation inspired by TFX: https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/plugins/local.py
Source code in zenml/io/local_filesystem.py
class LocalFilesystem(BaseFilesystem):
"""Filesystem that uses local file operations.
Implementation inspired by TFX:
https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/plugins/local.py
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
name: The path to the file.
mode: The mode to open the file.
Returns:
Any: The file object.
"""
return open(name, mode=mode)
@staticmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst)
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists.
Args:
path: The path to check.
Returns:
bool: Whether the path exists.
"""
return os.path.exists(path)
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern.
Args:
pattern: The glob pattern.
Returns:
List[PathType]: The paths that match the glob pattern.
"""
return glob.glob(pattern) # type: ignore[type-var]
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory.
Args:
path: The path to check.
Returns:
bool: Whether the path points to a directory.
"""
return os.path.isdir(path)
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem.
Args:
path: The path to the directory.
Returns:
List[PathType]: The list of files under the given directory.
"""
return os.listdir(path) # type:ignore[return-value]
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to the directory.
"""
os.makedirs(path, exist_ok=True)
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to the directory.
"""
os.mkdir(path)
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file.
"""
os.remove(path)
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
path: The path to the directory.
"""
shutil.rmtree(path)
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
Any: The stat descriptor for the file.
"""
return os.stat(path)
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return os.path.getsize(path)
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Yields:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk( # type: ignore[type-var, misc]
top, topdown=topdown, onerror=onerror
)
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
Union[bytes, str] |
The source path. |
required |
dst |
Union[bytes, str] |
The destination path. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Exceptions:
| Type | Description |
|---|---|
FileExistsError |
If the destination file exists and overwrite is False. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst)
exists(path)
staticmethod
Returns True if the given path exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool |
Whether the path exists. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists.
Args:
path: The path to check.
Returns:
bool: Whether the path exists.
"""
return os.path.exists(path)
glob(pattern)
staticmethod
Return the paths that match a glob pattern.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern. |
required |
Returns:
| Type | Description |
|---|---|
List[PathType] |
The paths that match the glob pattern. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern.
Args:
pattern: The glob pattern.
Returns:
List[PathType]: The paths that match the glob pattern.
"""
return glob.glob(pattern) # type: ignore[type-var]
isdir(path)
staticmethod
Returns whether the given path points to a directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
| Type | Description |
|---|---|
bool |
Whether the path points to a directory. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory.
Args:
path: The path to check.
Returns:
bool: Whether the path points to a directory.
"""
return os.path.isdir(path)
listdir(path)
staticmethod
Returns a list of files under a given directory in the filesystem.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Returns:
| Type | Description |
|---|---|
List[PathType] |
The list of files under the given directory. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem.
Args:
path: The path to the directory.
Returns:
List[PathType]: The list of files under the given directory.
"""
return os.listdir(path) # type:ignore[return-value]
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to the directory.
"""
os.makedirs(path, exist_ok=True)
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to the directory.
"""
os.mkdir(path)
open(name, mode='r')
staticmethod
Open a file at the given path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name |
Union[bytes, str] |
The path to the file. |
required |
mode |
str |
The mode to open the file. |
'r' |
Returns:
| Type | Description |
|---|---|
Any |
The file object. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
name: The path to the file.
mode: The mode to open the file.
Returns:
Any: The file object.
"""
return open(name, mode=mode)
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file.
"""
os.remove(path)
rename(src, dst, overwrite=False)
staticmethod
Rename source file to destination file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
| Type | Description |
|---|---|
FileExistsError |
If the destination file exists and overwrite is False. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
rmtree(path)
staticmethod
Deletes dir recursively. Dangerous operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
path: The path to the directory.
"""
shutil.rmtree(path)
size(path)
staticmethod
Get the size of a file in bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
int |
The size of the file in bytes. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return os.path.getsize(path)
stat(path)
staticmethod
Return the stat descriptor for a given file path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
| Type | Description |
|---|---|
Any |
The stat descriptor for the file. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
Any: The stat descriptor for the file.
"""
return os.stat(path)
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Yields:
| Type | Description |
|---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Yields:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk( # type: ignore[type-var, misc]
top, topdown=topdown, onerror=onerror
)