Artifact Stores
zenml.artifact_stores
special
In ZenML, the inputs and outputs which go through any step is treated as an
artifact and as its name suggests, an ArtifactStore
is a place where these
artifacts get stored.
Out of the box, ZenML comes with the BaseArtifactStore
and
LocalArtifactStore
implementations. While the BaseArtifactStore
establishes
an interface for people who want to extend it to their needs, the
LocalArtifactStore
is a simple implementation for a local setup.
Moreover, additional artifact stores can be found in specific integrations
modules, such as the GCPArtifactStore
in the gcp
integration and the
AzureArtifactStore
in the azure
integration.
base_artifact_store
BaseArtifactStore (StackComponent)
pydantic-model
Base class for all ZenML artifact stores.
Attributes:
Name | Type | Description |
---|---|---|
path |
str |
The root path of the artifact store. |
Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStore(StackComponent):
"""Base class for all ZenML artifact stores.
Attributes:
path: The root path of the artifact store.
"""
path: str
# Class Configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
SUPPORTED_SCHEMES: ClassVar[Set[str]]
# --- User interface ---
@abstractmethod
def open(self, name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
@abstractmethod
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination."""
@abstractmethod
def exists(self, path: PathType) -> bool:
"""Returns `True` if the given path exists."""
@abstractmethod
def glob(self, pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
@abstractmethod
def isdir(self, path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
@abstractmethod
def listdir(self, path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
@abstractmethod
def makedirs(self, path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
@abstractmethod
def mkdir(self, path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
@abstractmethod
def remove(self, path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
@abstractmethod
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file."""
@abstractmethod
def rmtree(self, path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
@abstractmethod
def stat(self, path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
@abstractmethod
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory."""
# --- Internal interface ---
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding
filesystem."""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
@root_validator(skip_on_failure=True)
def _ensure_artifact_store(cls, values: Dict[str, Any]) -> Any:
"""Validator function for the Artifact Stores. Checks whether
supported schemes are defined and the given path is supported"""
try:
getattr(cls, "SUPPORTED_SCHEMES")
except AttributeError:
raise ArtifactStoreInterfaceError(
"When you are working with any classes which subclass from "
"'zenml.artifact_store.BaseArtifactStore' please make sure "
"that your class has a ClassVar named `SUPPORTED_SCHEMES` "
"which should hold a set of supported file schemes such "
"as {'s3://'} or {'gcs://'}. \n"
+ textwrap.dedent(
"""
When you are working with any classes which subclass from
zenml.artifact_store.BaseArtifactStore please make sure
that your class has a ClassVar named `SUPPORTED_SCHEMES`
which should hold a set of supported file schemes such
as {"s3://"} or {"gcs://"}.
Example:
class S3ArtifactStore(StackComponent):
...
# Class Variables
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
...
"""
)
)
if not any(values["path"].startswith(i) for i in cls.SUPPORTED_SCHEMES):
raise ArtifactStoreInterfaceError(
f"The path: '{values['path']}' you defined for your "
f"artifact store is not supported by the implementation of "
f"{cls.schema()['title']}, because it does not start with "
f"one of its supported schemes: {cls.SUPPORTED_SCHEMES}."
)
return values
def _register(self, priority: int = 5) -> None:
"""Create and register a filesystem within the TFX registry"""
from tfx.dsl.io.filesystem import Filesystem
from tfx.dsl.io.filesystem_registry import DEFAULT_FILESYSTEM_REGISTRY
filesystem_class = type(
self.__class__.__name__,
(Filesystem,),
{
"SUPPORTED_SCHEMES": self.SUPPORTED_SCHEMES,
"open": staticmethod(_catch_not_found_error(self.open)),
"copy": staticmethod(_catch_not_found_error(self.copyfile)),
"exists": staticmethod(self.exists),
"glob": staticmethod(self.glob),
"isdir": staticmethod(self.isdir),
"listdir": staticmethod(_catch_not_found_error(self.listdir)),
"makedirs": staticmethod(self.makedirs),
"mkdir": staticmethod(_catch_not_found_error(self.mkdir)),
"remove": staticmethod(_catch_not_found_error(self.remove)),
"rename": staticmethod(_catch_not_found_error(self.rename)),
"rmtree": staticmethod(_catch_not_found_error(self.rmtree)),
"stat": staticmethod(_catch_not_found_error(self.stat)),
"walk": staticmethod(_catch_not_found_error(self.walk)),
},
)
DEFAULT_FILESYSTEM_REGISTRY.register(
filesystem_class, priority=priority
)
__init__(self, *args, **kwargs)
special
Initiate the Pydantic object and register the corresponding filesystem.
Source code in zenml/artifact_stores/base_artifact_store.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding
filesystem."""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
copyfile(self, src, dst, overwrite=False)
Copy a file from the source to the destination.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination."""
exists(self, path)
Returns True
if the given path exists.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def exists(self, path: PathType) -> bool:
"""Returns `True` if the given path exists."""
glob(self, pattern)
Return the paths that match a glob pattern.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def glob(self, pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
isdir(self, path)
Returns whether the given path points to a directory.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def isdir(self, path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
listdir(self, path)
Returns a list of files under a given directory in the filesystem.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def listdir(self, path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
makedirs(self, path)
Make a directory at the given path, recursively creating parents.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def makedirs(self, path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
mkdir(self, path)
Make a directory at the given path; parent directory must exist.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def mkdir(self, path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
open(self, name, mode='r')
Open a file at the given path.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def open(self, name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
remove(self, path)
Remove the file at the given path. Dangerous operation.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def remove(self, path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file."""
rmtree(self, path)
Deletes dir recursively. Dangerous operation.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def rmtree(self, path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
stat(self, path)
Return the stat descriptor for a given file path.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def stat(self, path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory."""
local_artifact_store
LocalArtifactStore (BaseArtifactStore)
pydantic-model
Artifact Store for local artifacts.
Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStore(BaseArtifactStore):
"""Artifact Store for local artifacts."""
# Class Configuration
FLAVOR: ClassVar[str] = "local"
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}
@property
def local_path(self) -> str:
"""Path to the local directory where the artifacts are stored."""
return self.path
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
return open(name, mode=mode)
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst) # type: ignore[type-var, arg-type]
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return os.path.exists(path)
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return glob.glob(pattern) # type: ignore[type-var]
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return os.path.isdir(path)
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
return os.listdir(path) # type:ignore[return-value]
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
os.makedirs(path, exist_ok=True)
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
os.mkdir(path)
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
os.remove(path)
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
shutil.rmtree(path)
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return os.stat(path)
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk(top, topdown=topdown, onerror=onerror) # type: ignore[type-var, misc]
@validator("path")
def ensure_path_local(cls, path: str) -> str:
"""Pydantic validator which ensures that the given path is a local
path"""
remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
if any(path.startswith(prefix) for prefix in remote_prefixes):
raise ArtifactStoreInterfaceError(
f"The path:{path} you defined for your local artifact store "
f"start with one of the remote prefixes."
)
return path
local_path: str
property
readonly
Path to the local directory where the artifacts are stored.
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst) # type: ignore[type-var, arg-type]
ensure_path_local(path)
classmethod
Pydantic validator which ensures that the given path is a local path
Source code in zenml/artifact_stores/local_artifact_store.py
@validator("path")
def ensure_path_local(cls, path: str) -> str:
"""Pydantic validator which ensures that the given path is a local
path"""
remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
if any(path.startswith(prefix) for prefix in remote_prefixes):
raise ArtifactStoreInterfaceError(
f"The path:{path} you defined for your local artifact store "
f"start with one of the remote prefixes."
)
return path
exists(path)
staticmethod
Returns True
if the given path exists.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return os.path.exists(path)
glob(pattern)
staticmethod
Return the paths that match a glob pattern.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return glob.glob(pattern) # type: ignore[type-var]
isdir(path)
staticmethod
Returns whether the given path points to a directory.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return os.path.isdir(path)
listdir(path)
staticmethod
Returns a list of files under a given directory in the filesystem.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
return os.listdir(path) # type:ignore[return-value]
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
os.makedirs(path, exist_ok=True)
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
os.mkdir(path)
open(name, mode='r')
staticmethod
Open a file at the given path.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
return open(name, mode=mode)
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
os.remove(path)
rename(src, dst, overwrite=False)
staticmethod
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
rmtree(path)
staticmethod
Deletes dir recursively. Dangerous operation.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
shutil.rmtree(path)
stat(path)
staticmethod
Return the stat descriptor for a given file path.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return os.stat(path)
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk(top, topdown=topdown, onerror=onerror) # type: ignore[type-var, misc]