Io
zenml.io
special
The io
module handles file operations for the ZenML package.
It offers a standard interface for reading, writing and manipulating files and
directories. It is heavily influenced and inspired by the io
module of tfx
.
fileio
Functionality for reading, writing and managing files.
convert_to_str(path)
Converts a "PathType" to a str using UTF-8.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to convert. |
required |
Returns:
Type | Description |
---|---|
str |
The path as a string. |
Source code in zenml/io/fileio.py
def convert_to_str(path: "PathType") -> str:
"""Converts a "PathType" to a str using UTF-8.
Args:
path: The path to convert.
Returns:
The path as a string.
"""
if isinstance(path, str):
return path
else:
return path.decode("utf-8")
copy(src, dst, overwrite=False)
Copy a file from the source to the destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
PathType |
The path of the file to copy. |
required |
dst |
PathType |
The path to copy the source file to. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination and
|
Source code in zenml/io/fileio.py
def copy(src: "PathType", dst: "PathType", overwrite: bool = False) -> None:
"""Copy a file from the source to the destination.
Args:
src: The path of the file to copy.
dst: The path to copy the source file to.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If a file already exists at the destination and
`overwrite` is not set to `True`.
"""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.copyfile(src, dst, overwrite=overwrite)
else:
if not overwrite and exists(dst):
raise FileExistsError(
f"Destination file '{convert_to_str(dst)}' already exists "
f"and `overwrite` is false."
)
with open(src, mode="rb") as f:
contents = f.read()
with open(dst, mode="wb") as f:
f.write(contents)
exists(path)
Check whether a given path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/io/fileio.py
def exists(path: "PathType") -> bool:
"""Check whether a given path exists.
Args:
path: The path to check.
Returns:
`True` if the given path exists, `False` otherwise.
"""
return _get_filesystem(path).exists(path)
glob(pattern)
Find all files matching the given pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
PathType |
The pattern to match. |
required |
Returns:
Type | Description |
---|---|
List[PathType] |
A list of paths matching the pattern. |
Source code in zenml/io/fileio.py
def glob(pattern: "PathType") -> List["PathType"]:
"""Find all files matching the given pattern.
Args:
pattern: The pattern to match.
Returns:
A list of paths matching the pattern.
"""
return _get_filesystem(pattern).glob(pattern)
isdir(path)
Check whether the given path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/io/fileio.py
def isdir(path: "PathType") -> bool:
"""Check whether the given path is a directory.
Args:
path: The path to check.
Returns:
`True` if the given path is a directory, `False` otherwise.
"""
return _get_filesystem(path).isdir(path)
listdir(path, only_file_names=True)
Lists all files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the directory. |
required |
only_file_names |
bool |
If True, only return the file names, not the full path. |
True |
Returns:
Type | Description |
---|---|
List[str] |
A list of files in the directory. |
Source code in zenml/io/fileio.py
def listdir(path: str, only_file_names: bool = True) -> List[str]:
"""Lists all files in a directory.
Args:
path: The path to the directory.
only_file_names: If True, only return the file names, not the full path.
Returns:
A list of files in the directory.
"""
try:
return [
os.path.join(path, convert_to_str(f))
if not only_file_names
else convert_to_str(f)
for f in _get_filesystem(path).listdir(path)
]
except IOError:
logger.debug(f"Dir {path} not found.")
return []
makedirs(path)
Make a directory at the given path, recursively creating parents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to the directory. |
required |
Source code in zenml/io/fileio.py
def makedirs(path: "PathType") -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to the directory.
"""
_get_filesystem(path).makedirs(path)
mkdir(path)
Make a directory at the given path; parent directory must exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to the directory. |
required |
Source code in zenml/io/fileio.py
def mkdir(path: "PathType") -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to the directory.
"""
_get_filesystem(path).mkdir(path)
open(path, mode='r')
Opens a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to the file. |
required |
mode |
str |
The mode to open the file in. |
'r' |
Returns:
Type | Description |
---|---|
Any |
The opened file. |
Source code in zenml/io/fileio.py
def open(path: "PathType", mode: str = "r") -> Any: # noqa
"""Opens a file.
Args:
path: The path to the file.
mode: The mode to open the file in.
Returns:
The opened file.
"""
return _get_filesystem(path).open(path, mode=mode)
remove(path)
Remove the file at the given path. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to the file to remove. |
required |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the file does not exist. |
Source code in zenml/io/fileio.py
def remove(path: "PathType") -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file to remove.
Raises:
FileNotFoundError: If the file does not exist.
"""
if not exists(path):
raise FileNotFoundError(f"{convert_to_str(path)} does not exist!")
_get_filesystem(path).remove(path)
rename(src, dst, overwrite=False)
Rename a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
PathType |
The path of the file to rename. |
required |
dst |
PathType |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the source and destination file systems are not the same. |
Source code in zenml/io/fileio.py
def rename(src: "PathType", dst: "PathType", overwrite: bool = False) -> None:
"""Rename a file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
NotImplementedError: If the source and destination file systems are not
the same.
"""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.rename(src, dst, overwrite=overwrite)
else:
raise NotImplementedError(
f"Renaming from {convert_to_str(src)} to {convert_to_str(dst)} "
f"using different file systems plugins is currently not supported."
)
rmtree(dir_path)
Deletes a directory recursively. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
The path to the directory to delete. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the path is not pointing to a directory. |
Source code in zenml/io/fileio.py
def rmtree(dir_path: str) -> None:
"""Deletes a directory recursively. Dangerous operation.
Args:
dir_path: The path to the directory to delete.
Raises:
TypeError: If the path is not pointing to a directory.
"""
if not isdir(dir_path):
raise TypeError(f"Path '{dir_path}' is not a directory.")
_get_filesystem(dir_path).rmtree(dir_path)
size(path)
Get the size of a file or directory in bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
Optional[int] |
The size of the file or directory in bytes or |
Source code in zenml/io/fileio.py
def size(path: "PathType") -> Optional[int]:
"""Get the size of a file or directory in bytes.
Args:
path: The path to the file.
Returns:
The size of the file or directory in bytes or `None` if the responsible
file system does not implement the `size` method.
"""
file_system = _get_filesystem(path)
# If the file system does not implement the `size` method, return `None`.
if file_system.size == BaseFilesystem.size:
logger.warning(
"Cannot get size of file or directory '%s' since the responsible "
"file system `%s` does not implement the `size` method.",
path,
file_system.__name__,
)
return None
# If the path does not exist, return 0.
if not exists(path):
return 0
# If the path is a file, return its size.
if not file_system.isdir(path):
return file_system.size(path)
# If the path is a directory, recursively sum the sizes of everything in it.
files = file_system.listdir(path)
file_sizes = [size(os.path.join(str(path), str(file))) for file in files]
return sum(
[file_size for file_size in file_sizes if file_size is not None]
)
stat(path)
Get the stat descriptor for a given file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
Any |
The stat descriptor. |
Source code in zenml/io/fileio.py
def stat(path: "PathType") -> Any:
"""Get the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
The stat descriptor.
"""
return _get_filesystem(path).stat(path)
walk(top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
PathType |
The path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[PathType, List[PathType], List[PathType]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/fileio.py
def walk(
top: "PathType",
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple["PathType", List["PathType"], List["PathType"]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
return _get_filesystem(top).walk(top, topdown=topdown, onerror=onerror)
filesystem
Defines the filesystem abstraction of ZenML.
BaseFilesystem (ABC)
Abstract Filesystem base class.
Design inspired by the Filesystem
abstraction in TFX:
https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/filesystem.py
Source code in zenml/io/filesystem.py
class BaseFilesystem(ABC):
"""Abstract Filesystem base class.
Design inspired by the `Filesystem` abstraction in TFX:
https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/filesystem.py
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]]
@staticmethod
@abstractmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Opens a file.
Args:
name: The path to the file.
mode: The mode to open the file in.
Returns:
The opened file.
"""
@staticmethod
@abstractmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The path of the file to copy.
dst: The path to copy the source file to.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If a file already exists at the destination and
`overwrite` is not set to `True`.
"""
@staticmethod
@abstractmethod
def exists(path: PathType) -> bool:
"""Check whether a given path exists.
Args:
path: The path to check.
Returns:
`True` if the given path exists, `False` otherwise.
"""
@staticmethod
@abstractmethod
def glob(pattern: PathType) -> List[PathType]:
"""Find all files matching the given pattern.
Args:
pattern: The pattern to match.
Returns:
A list of paths matching the pattern.
"""
@staticmethod
@abstractmethod
def isdir(path: PathType) -> bool:
"""Check whether the given path is a directory.
Args:
path: The path to check.
Returns:
`True` if the given path is a directory, `False` otherwise.
"""
@staticmethod
@abstractmethod
def listdir(path: PathType) -> List[PathType]:
"""Lists all files in a directory.
Args:
path: The path to the directory.
Returns:
A list of files in the directory.
"""
@staticmethod
@abstractmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: Path to the directory.
"""
@staticmethod
@abstractmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: Path to the directory.
"""
@staticmethod
@abstractmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file to remove.
Raises:
FileNotFoundError: If the file does not exist.
"""
@staticmethod
@abstractmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename a file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
@staticmethod
@abstractmethod
def rmtree(path: PathType) -> None:
"""Deletes a directory recursively. Dangerous operation.
Args:
path: The path to the directory to delete.
"""
@staticmethod
@abstractmethod
def stat(path: PathType) -> Any:
"""Get the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
The stat descriptor.
"""
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
To be implemented by subclasses but not abstract for backwards
compatibility.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return -1
@staticmethod
@abstractmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to copy. |
required |
dst |
Union[bytes, str] |
The path to copy the source file to. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination and
|
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The path of the file to copy.
dst: The path to copy the source file to.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If a file already exists at the destination and
`overwrite` is not set to `True`.
"""
exists(path)
staticmethod
Check whether a given path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def exists(path: PathType) -> bool:
"""Check whether a given path exists.
Args:
path: The path to check.
Returns:
`True` if the given path exists, `False` otherwise.
"""
glob(pattern)
staticmethod
Find all files matching the given pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The pattern to match. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths matching the pattern. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def glob(pattern: PathType) -> List[PathType]:
"""Find all files matching the given pattern.
Args:
pattern: The pattern to match.
Returns:
A list of paths matching the pattern.
"""
isdir(path)
staticmethod
Check whether the given path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def isdir(path: PathType) -> bool:
"""Check whether the given path is a directory.
Args:
path: The path to check.
Returns:
`True` if the given path is a directory, `False` otherwise.
"""
listdir(path)
staticmethod
Lists all files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of files in the directory. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def listdir(path: PathType) -> List[PathType]:
"""Lists all files in a directory.
Args:
path: The path to the directory.
Returns:
A list of files in the directory.
"""
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path to the directory. |
required |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: Path to the directory.
"""
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path to the directory. |
required |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: Path to the directory.
"""
open(name, mode='r')
staticmethod
Opens a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Union[bytes, str] |
The path to the file. |
required |
mode |
str |
The mode to open the file in. |
'r' |
Returns:
Type | Description |
---|---|
Any |
The opened file. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Opens a file.
Args:
name: The path to the file.
mode: The mode to open the file in.
Returns:
The opened file.
"""
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file to remove. |
required |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the file does not exist. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file to remove.
Raises:
FileNotFoundError: If the file does not exist.
"""
rename(src, dst, overwrite=False)
staticmethod
Rename a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename a file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
rmtree(path)
staticmethod
Deletes a directory recursively. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the directory to delete. |
required |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def rmtree(path: PathType) -> None:
"""Deletes a directory recursively. Dangerous operation.
Args:
path: The path to the directory to delete.
"""
size(path)
staticmethod
Get the size of a file in bytes.
To be implemented by subclasses but not abstract for backwards compatibility.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
int |
The size of the file in bytes. |
Source code in zenml/io/filesystem.py
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
To be implemented by subclasses but not abstract for backwards
compatibility.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return -1
stat(path)
staticmethod
Get the stat descriptor for a given file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
Any |
The stat descriptor. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def stat(path: PathType) -> Any:
"""Get the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
The stat descriptor.
"""
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
The path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/filesystem.py
@staticmethod
@abstractmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
filesystem_registry
Filesystem registry managing filesystem plugins.
FileIORegistry
Registry of pluggable filesystem implementations.
Source code in zenml/io/filesystem_registry.py
class FileIORegistry:
"""Registry of pluggable filesystem implementations."""
def __init__(self) -> None:
"""Initialize the registry."""
self._filesystems: Dict["PathType", Type["BaseFilesystem"]] = {}
self._registration_lock = Lock()
def register(self, filesystem_cls: Type["BaseFilesystem"]) -> None:
"""Register a filesystem implementation.
Args:
filesystem_cls: Subclass of `zenml.io.filesystem.Filesystem`.
"""
with self._registration_lock:
for scheme in filesystem_cls.SUPPORTED_SCHEMES:
current_preferred = self._filesystems.get(scheme)
if current_preferred is not None:
logger.debug(
"Overwriting previously registered filesystem for "
"scheme `%s`. Old class: %s, new class: %s",
scheme,
current_preferred.__name__,
filesystem_cls.__name__,
)
self._filesystems[scheme] = filesystem_cls
def get_filesystem_for_scheme(
self, scheme: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given scheme string.
Args:
scheme: The scheme to get the filesystem for.
Returns:
The filesystem plugin for the given scheme.
Raises:
ValueError: If no filesystem plugin is registered for the given
scheme.
"""
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
if scheme not in self._filesystems:
raise ValueError(
f"No file systems were found for the scheme: "
f"{scheme}. Please make sure that you are using "
f"the right path and the all the necessary "
f"integrations are properly installed."
)
return self._filesystems[scheme]
def get_filesystem_for_path(
self, path: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given path.
Args:
path: The path to get the filesystem for.
Returns:
The filesystem plugin for the given path.
Raises:
ValueError: If no filesystem plugin is registered for the given
path.
"""
# Assume local path by default, but extract filesystem prefix if available.
if isinstance(path, str):
path_bytes = path.encode("utf-8")
elif isinstance(path, bytes):
path_bytes = path
else:
raise ValueError("Invalid path type: %r." % path)
result = re.match(b"^([a-z0-9]+://)", path_bytes)
if result:
scheme = result.group(1).decode("utf-8")
else:
scheme = ""
return self.get_filesystem_for_scheme(scheme)
__init__(self)
special
Initialize the registry.
Source code in zenml/io/filesystem_registry.py
def __init__(self) -> None:
"""Initialize the registry."""
self._filesystems: Dict["PathType", Type["BaseFilesystem"]] = {}
self._registration_lock = Lock()
get_filesystem_for_path(self, path)
Get filesystem plugin for given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
PathType |
The path to get the filesystem for. |
required |
Returns:
Type | Description |
---|---|
Type[BaseFilesystem] |
The filesystem plugin for the given path. |
Exceptions:
Type | Description |
---|---|
ValueError |
If no filesystem plugin is registered for the given path. |
Source code in zenml/io/filesystem_registry.py
def get_filesystem_for_path(
self, path: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given path.
Args:
path: The path to get the filesystem for.
Returns:
The filesystem plugin for the given path.
Raises:
ValueError: If no filesystem plugin is registered for the given
path.
"""
# Assume local path by default, but extract filesystem prefix if available.
if isinstance(path, str):
path_bytes = path.encode("utf-8")
elif isinstance(path, bytes):
path_bytes = path
else:
raise ValueError("Invalid path type: %r." % path)
result = re.match(b"^([a-z0-9]+://)", path_bytes)
if result:
scheme = result.group(1).decode("utf-8")
else:
scheme = ""
return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_scheme(self, scheme)
Get filesystem plugin for given scheme string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
scheme |
PathType |
The scheme to get the filesystem for. |
required |
Returns:
Type | Description |
---|---|
Type[BaseFilesystem] |
The filesystem plugin for the given scheme. |
Exceptions:
Type | Description |
---|---|
ValueError |
If no filesystem plugin is registered for the given scheme. |
Source code in zenml/io/filesystem_registry.py
def get_filesystem_for_scheme(
self, scheme: "PathType"
) -> Type["BaseFilesystem"]:
"""Get filesystem plugin for given scheme string.
Args:
scheme: The scheme to get the filesystem for.
Returns:
The filesystem plugin for the given scheme.
Raises:
ValueError: If no filesystem plugin is registered for the given
scheme.
"""
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
if scheme not in self._filesystems:
raise ValueError(
f"No file systems were found for the scheme: "
f"{scheme}. Please make sure that you are using "
f"the right path and the all the necessary "
f"integrations are properly installed."
)
return self._filesystems[scheme]
register(self, filesystem_cls)
Register a filesystem implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filesystem_cls |
Type[BaseFilesystem] |
Subclass of |
required |
Source code in zenml/io/filesystem_registry.py
def register(self, filesystem_cls: Type["BaseFilesystem"]) -> None:
"""Register a filesystem implementation.
Args:
filesystem_cls: Subclass of `zenml.io.filesystem.Filesystem`.
"""
with self._registration_lock:
for scheme in filesystem_cls.SUPPORTED_SCHEMES:
current_preferred = self._filesystems.get(scheme)
if current_preferred is not None:
logger.debug(
"Overwriting previously registered filesystem for "
"scheme `%s`. Old class: %s, new class: %s",
scheme,
current_preferred.__name__,
filesystem_cls.__name__,
)
self._filesystems[scheme] = filesystem_cls
local_filesystem
Local filesystem using Python's built-in modules (os
, shutil
, glob
).
LocalFilesystem (BaseFilesystem)
Filesystem that uses local file operations.
Implementation inspired by TFX: https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/plugins/local.py
Source code in zenml/io/local_filesystem.py
class LocalFilesystem(BaseFilesystem):
"""Filesystem that uses local file operations.
Implementation inspired by TFX:
https://github.com/tensorflow/tfx/blob/master/tfx/dsl/io/plugins/local.py
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
name: The path to the file.
mode: The mode to open the file.
Returns:
Any: The file object.
"""
encoding = "utf-8" if "b" not in mode else None
return open(name, mode=mode, encoding=encoding)
@staticmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst)
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists.
Args:
path: The path to check.
Returns:
bool: Whether the path exists.
"""
return os.path.exists(path)
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern.
Args:
pattern: The glob pattern.
Returns:
List[PathType]: The paths that match the glob pattern.
"""
return glob.glob(pattern) # type: ignore[type-var]
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory.
Args:
path: The path to check.
Returns:
bool: Whether the path points to a directory.
"""
return os.path.isdir(path)
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem.
Args:
path: The path to the directory.
Returns:
List[PathType]: The list of files under the given directory.
"""
return os.listdir(path) # type:ignore[return-value]
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to the directory.
"""
os.makedirs(path, exist_ok=True)
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to the directory.
"""
os.mkdir(path)
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file.
"""
os.remove(path)
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
path: The path to the directory.
"""
shutil.rmtree(path)
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
Any: The stat descriptor for the file.
"""
return os.stat(path)
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return os.path.getsize(path)
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Yields:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk( # type: ignore[type-var, misc]
top, topdown=topdown, onerror=onerror
)
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The source path. |
required |
dst |
Union[bytes, str] |
The destination path. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If the destination file exists and overwrite is False. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def copyfile(
src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst)
exists(path)
staticmethod
Returns True
if the given path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the path exists. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists.
Args:
path: The path to check.
Returns:
bool: Whether the path exists.
"""
return os.path.exists(path)
glob(pattern)
staticmethod
Return the paths that match a glob pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern. |
required |
Returns:
Type | Description |
---|---|
List[PathType] |
The paths that match the glob pattern. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern.
Args:
pattern: The glob pattern.
Returns:
List[PathType]: The paths that match the glob pattern.
"""
return glob.glob(pattern) # type: ignore[type-var]
isdir(path)
staticmethod
Returns whether the given path points to a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the path points to a directory. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory.
Args:
path: The path to check.
Returns:
bool: Whether the path points to a directory.
"""
return os.path.isdir(path)
listdir(path)
staticmethod
Returns a list of files under a given directory in the filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Returns:
Type | Description |
---|---|
List[PathType] |
The list of files under the given directory. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem.
Args:
path: The path to the directory.
Returns:
List[PathType]: The list of files under the given directory.
"""
return os.listdir(path) # type:ignore[return-value]
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to the directory.
"""
os.makedirs(path, exist_ok=True)
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to the directory.
"""
os.mkdir(path)
open(name, mode='r')
staticmethod
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Union[bytes, str] |
The path to the file. |
required |
mode |
str |
The mode to open the file. |
'r' |
Returns:
Type | Description |
---|---|
Any |
The file object. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
name: The path to the file.
mode: The mode to open the file.
Returns:
Any: The file object.
"""
encoding = "utf-8" if "b" not in mode else None
return open(name, mode=mode, encoding=encoding)
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to the file.
"""
os.remove(path)
rename(src, dst, overwrite=False)
staticmethod
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If the destination file exists and overwrite is False. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
Raises:
FileExistsError: If the destination file exists and overwrite is
False.
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
rmtree(path)
staticmethod
Deletes dir recursively. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the directory. |
required |
Source code in zenml/io/local_filesystem.py
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
path: The path to the directory.
"""
shutil.rmtree(path)
size(path)
staticmethod
Get the size of a file in bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
int |
The size of the file in bytes. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def size(path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return os.path.getsize(path)
stat(path)
staticmethod
Return the stat descriptor for a given file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
Any |
The stat descriptor for the file. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path.
Args:
path: The path to the file.
Returns:
Any: The stat descriptor for the file.
"""
return os.stat(path)
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/local_filesystem.py
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Yields:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk( # type: ignore[type-var, misc]
top, topdown=topdown, onerror=onerror
)