Artifact Stores
zenml.artifact_stores
special
Artifact Stores
In ZenML, the inputs and outputs which go through any step is treated as an
artifact and as its name suggests, an ArtifactStore
is a place where these
artifacts get stored.
Out of the box, ZenML comes with the BaseArtifactStore
and
LocalArtifactStore
implementations. While the BaseArtifactStore
establishes
an interface for people who want to extend it to their needs, the
LocalArtifactStore
is a simple implementation for a local setup.
Moreover, additional artifact stores can be found in specific integrations
modules, such as the GCPArtifactStore
in the gcp
integration and the
AzureArtifactStore
in the azure
integration.
base_artifact_store
BaseArtifactStore (StackComponent, ABC)
pydantic-model
Base class for all ZenML artifact stores.
Attributes:
Name | Type | Description |
---|---|---|
path |
str |
The root path of the artifact store. |
Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStore(StackComponent, ABC):
"""Base class for all ZenML artifact stores.
Attributes:
path: The root path of the artifact store.
"""
path: str
# Class Configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
FLAVOR: ClassVar[str]
SUPPORTED_SCHEMES: ClassVar[Set[str]]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding
filesystem."""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
raise NotImplementedError()
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
raise NotImplementedError()
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
raise NotImplementedError()
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
raise NotImplementedError()
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
raise NotImplementedError()
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
raise NotImplementedError()
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
raise NotImplementedError()
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
raise NotImplementedError()
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
raise NotImplementedError()
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file."""
raise NotImplementedError()
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
raise NotImplementedError()
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
raise NotImplementedError()
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory."""
raise NotImplementedError()
@root_validator
def _ensure_artifact_store(cls, values: Dict[str, Any]) -> Any:
"""Validator function for the Artifact Stores. Checks whether
supported schemes are defined and the given path is supported"""
try:
getattr(cls, "SUPPORTED_SCHEMES")
except AttributeError:
raise ArtifactStoreInterfaceError(
textwrap.dedent(
"""
When you are working with any classes which subclass from
zenml.artifact_store.BaseArtifactStore please make sure that your class
has a ClassVar named `SUPPORTED_SCHEMES` which should hold a set of
supported file schemes such as {"s3://"} or {"gcs://"}.
Example:
class S3ArtifactStore(StackComponent):
...
# Class Variables
...
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
...
"""
)
)
if not any(values["path"].startswith(i) for i in cls.SUPPORTED_SCHEMES):
raise ArtifactStoreInterfaceError(
textwrap.dedent(
f"""
The path: "{values["path"]}" you defined for your artifact
store is not supported by the implementation of
{cls.schema()["title"]}, because it does not start with
one of its supported schemes: {cls.SUPPORTED_SCHEMES}.
"""
)
)
return values
def _register(self, priority: int = 5) -> None:
"""Create and register a filesystem within the TFX registry"""
from tfx.dsl.io.filesystem import Filesystem
from tfx.dsl.io.filesystem_registry import DEFAULT_FILESYSTEM_REGISTRY
filesystem_class = type(
self.__class__.__name__,
(Filesystem,),
{
"SUPPORTED_SCHEMES": self.SUPPORTED_SCHEMES,
"open": staticmethod(_catch_not_found_error(self.open)),
"copy": staticmethod(_catch_not_found_error(self.copyfile)),
"exists": staticmethod(self.exists),
"glob": staticmethod(self.glob),
"isdir": staticmethod(self.isdir),
"listdir": staticmethod(_catch_not_found_error(self.listdir)),
"makedirs": staticmethod(self.makedirs),
"mkdir": staticmethod(_catch_not_found_error(self.mkdir)),
"remove": staticmethod(_catch_not_found_error(self.remove)),
"rename": staticmethod(_catch_not_found_error(self.rename)),
"rmtree": staticmethod(_catch_not_found_error(self.rmtree)),
"stat": staticmethod(_catch_not_found_error(self.stat)),
"walk": staticmethod(_catch_not_found_error(self.walk)),
},
)
DEFAULT_FILESYSTEM_REGISTRY.register(
filesystem_class, priority=priority
)
__init__(self, *args, **kwargs)
special
Initiate the Pydantic object and register the corresponding filesystem.
Source code in zenml/artifact_stores/base_artifact_store.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding
filesystem."""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
raise NotImplementedError()
exists(path)
staticmethod
Returns True
if the given path exists.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
raise NotImplementedError()
glob(pattern)
staticmethod
Return the paths that match a glob pattern.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
raise NotImplementedError()
isdir(path)
staticmethod
Returns whether the given path points to a directory.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
raise NotImplementedError()
listdir(path)
staticmethod
Returns a list of files under a given directory in the filesystem.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
raise NotImplementedError()
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
raise NotImplementedError()
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
raise NotImplementedError()
open(name, mode='r')
staticmethod
Open a file at the given path.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
raise NotImplementedError()
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
raise NotImplementedError()
rename(src, dst, overwrite=False)
staticmethod
Rename source file to destination file.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file."""
raise NotImplementedError()
rmtree(path)
staticmethod
Deletes dir recursively. Dangerous operation.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
raise NotImplementedError()
stat(path)
staticmethod
Return the stat descriptor for a given file path.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
raise NotImplementedError()
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Source code in zenml/artifact_stores/base_artifact_store.py
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory."""
raise NotImplementedError()
local_artifact_store
LocalArtifactStore (BaseArtifactStore)
pydantic-model
Artifact Store for local artifacts.
Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStore(BaseArtifactStore):
"""Artifact Store for local artifacts."""
# Class Configuration
FLAVOR: ClassVar[str] = "local"
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
return open(name, mode=mode)
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst) # type: ignore[type-var, arg-type]
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return os.path.exists(path)
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return glob.glob(pattern) # type: ignore[type-var]
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return os.path.isdir(path)
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
return os.listdir(path) # type:ignore[return-value]
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
os.makedirs(path, exist_ok=True)
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
os.mkdir(path)
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
os.remove(path)
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
shutil.rmtree(path)
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return os.stat(path)
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk(top, topdown=topdown, onerror=onerror) # type: ignore[type-var, misc]
@validator("path")
def ensure_path_local(cls, path: str) -> str:
remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
if any(path.startswith(prefix) for prefix in remote_prefixes):
raise ArtifactStoreInterfaceError(
f"The path:{path} you defined for your local artifact store "
f"start with one of the remote prefixes."
)
return path
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst) # type: ignore[type-var, arg-type]
exists(path)
staticmethod
Returns True
if the given path exists.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return os.path.exists(path)
glob(pattern)
staticmethod
Return the paths that match a glob pattern.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return glob.glob(pattern) # type: ignore[type-var]
isdir(path)
staticmethod
Returns whether the given path points to a directory.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return os.path.isdir(path)
listdir(path)
staticmethod
Returns a list of files under a given directory in the filesystem.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
return os.listdir(path) # type:ignore[return-value]
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
os.makedirs(path, exist_ok=True)
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
os.mkdir(path)
open(name, mode='r')
staticmethod
Open a file at the given path.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
return open(name, mode=mode)
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
os.remove(path)
rename(src, dst, overwrite=False)
staticmethod
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
rmtree(path)
staticmethod
Deletes dir recursively. Dangerous operation.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
shutil.rmtree(path)
stat(path)
staticmethod
Return the stat descriptor for a given file path.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return os.stat(path)
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk(top, topdown=topdown, onerror=onerror) # type: ignore[type-var, misc]