Artifact Stores
zenml.artifact_stores
special
ZenML's artifact-store stores artifacts in a file system.
In ZenML, the inputs and outputs which go through any step is treated as an
artifact and as its name suggests, an ArtifactStore
is a place where these
artifacts get stored.
Out of the box, ZenML comes with the BaseArtifactStore
and
LocalArtifactStore
implementations. While the BaseArtifactStore
establishes
an interface for people who want to extend it to their needs, the
LocalArtifactStore
is a simple implementation for a local setup.
Moreover, additional artifact stores can be found in specific integrations
modules, such as the GCPArtifactStore
in the gcp
integration and the
AzureArtifactStore
in the azure
integration.
base_artifact_store
The base interface to extend the ZenML artifact store.
BaseArtifactStore (StackComponent)
Base class for all ZenML artifact stores.
Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStore(StackComponent):
"""Base class for all ZenML artifact stores."""
@property
def config(self) -> BaseArtifactStoreConfig:
"""Returns the `BaseArtifactStoreConfig` config.
Returns:
The configuration.
"""
return cast(BaseArtifactStoreConfig, self._config)
@property
def path(self) -> str:
"""The path to the artifact store.
Returns:
The path.
"""
return self.config.path
# --- User interface ---
@abstractmethod
def open(self, name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
name: The path of the file to open.
mode: The mode to open the file.
Returns:
The file object.
"""
@abstractmethod
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
"""
@abstractmethod
def exists(self, path: PathType) -> bool:
"""Checks if a path exists.
Args:
path: The path to check.
Returns:
`True` if the path exists.
"""
@abstractmethod
def glob(self, pattern: PathType) -> List[PathType]:
"""Gets the paths that match a glob pattern.
Args:
pattern: The glob pattern.
Returns:
The list of paths that match the pattern.
"""
@abstractmethod
def isdir(self, path: PathType) -> bool:
"""Returns whether the given path points to a directory.
Args:
path: The path to check.
Returns:
`True` if the path points to a directory.
"""
@abstractmethod
def listdir(self, path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem.
Args:
path: The path to list.
Returns:
The list of files under the given path.
"""
@abstractmethod
def makedirs(self, path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to create.
"""
@abstractmethod
def mkdir(self, path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to create.
"""
@abstractmethod
def remove(self, path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to remove.
"""
@abstractmethod
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
"""
@abstractmethod
def rmtree(self, path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
path: The path to delete.
"""
@abstractmethod
def stat(self, path: PathType) -> Any:
"""Return the stat descriptor for a given file path.
Args:
path: The path to check.
Returns:
The stat descriptor.
"""
def size(self, path: PathType) -> Optional[int]:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes or `None` if the artifact store
does not implement the `size` method.
"""
logger.warning(
"Cannot get size of file '%s' since the artifact store %s does not "
"implement the `size` method.",
path,
self.__class__.__name__,
)
return None
@abstractmethod
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path to walk.
topdown: Whether to walk the top-down or bottom-up.
onerror: The error handler.
Returns:
The iterator that walks the contents of the given directory.
"""
# --- Internal interface ---
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding filesystem.
Args:
*args: The positional arguments to pass to the Pydantic object.
**kwargs: The keyword arguments to pass to the Pydantic object.
"""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
def _register(self) -> None:
"""Create and register a filesystem within the filesystem registry."""
from zenml.io.filesystem import BaseFilesystem
from zenml.io.filesystem_registry import default_filesystem_registry
from zenml.io.local_filesystem import LocalFilesystem
# Local filesystem is always registered, no point in doing it again.
if isinstance(self, LocalFilesystem):
return
filesystem_class = type(
self.__class__.__name__,
(BaseFilesystem,),
{
"SUPPORTED_SCHEMES": self.config.SUPPORTED_SCHEMES,
"open": staticmethod(_sanitize_paths(self.open)),
"copyfile": staticmethod(_sanitize_paths(self.copyfile)),
"exists": staticmethod(_sanitize_paths(self.exists)),
"glob": staticmethod(_sanitize_paths(self.glob)),
"isdir": staticmethod(_sanitize_paths(self.isdir)),
"listdir": staticmethod(_sanitize_paths(self.listdir)),
"makedirs": staticmethod(_sanitize_paths(self.makedirs)),
"mkdir": staticmethod(_sanitize_paths(self.mkdir)),
"remove": staticmethod(_sanitize_paths(self.remove)),
"rename": staticmethod(_sanitize_paths(self.rename)),
"rmtree": staticmethod(_sanitize_paths(self.rmtree)),
"size": staticmethod(_sanitize_paths(self.size)),
"stat": staticmethod(_sanitize_paths(self.stat)),
"walk": staticmethod(_sanitize_paths(self.walk)),
},
)
default_filesystem_registry.register(filesystem_class)
config: BaseArtifactStoreConfig
property
readonly
Returns the BaseArtifactStoreConfig
config.
Returns:
Type | Description |
---|---|
BaseArtifactStoreConfig |
The configuration. |
path: str
property
readonly
The path to the artifact store.
Returns:
Type | Description |
---|---|
str |
The path. |
__init__(self, *args, **kwargs)
special
Initiate the Pydantic object and register the corresponding filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
The positional arguments to pass to the Pydantic object. |
() |
**kwargs |
Any |
The keyword arguments to pass to the Pydantic object. |
{} |
Source code in zenml/artifact_stores/base_artifact_store.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding filesystem.
Args:
*args: The positional arguments to pass to the Pydantic object.
**kwargs: The keyword arguments to pass to the Pydantic object.
"""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
copyfile(self, src, dst, overwrite=False)
Copy a file from the source to the destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The source path. |
required |
dst |
Union[bytes, str] |
The destination path. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
"""
exists(self, path)
Checks if a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def exists(self, path: PathType) -> bool:
"""Checks if a path exists.
Args:
path: The path to check.
Returns:
`True` if the path exists.
"""
glob(self, pattern)
Gets the paths that match a glob pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
The list of paths that match the pattern. |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def glob(self, pattern: PathType) -> List[PathType]:
"""Gets the paths that match a glob pattern.
Args:
pattern: The glob pattern.
Returns:
The list of paths that match the pattern.
"""
isdir(self, path)
Returns whether the given path points to a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def isdir(self, path: PathType) -> bool:
"""Returns whether the given path points to a directory.
Args:
path: The path to check.
Returns:
`True` if the path points to a directory.
"""
listdir(self, path)
Returns a list of files under a given directory in the filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
The list of files under the given path. |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def listdir(self, path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem.
Args:
path: The path to list.
Returns:
The list of files under the given path.
"""
makedirs(self, path)
Make a directory at the given path, recursively creating parents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def makedirs(self, path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents.
Args:
path: The path to create.
"""
mkdir(self, path)
Make a directory at the given path; parent directory must exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def mkdir(self, path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist.
Args:
path: The path to create.
"""
open(self, name, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Union[bytes, str] |
The path of the file to open. |
required |
mode |
str |
The mode to open the file. |
'r' |
Returns:
Type | Description |
---|---|
Any |
The file object. |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def open(self, name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
name: The path of the file to open.
mode: The mode to open the file.
Returns:
The file object.
"""
remove(self, path)
Remove the file at the given path. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to remove. |
required |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def remove(self, path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation.
Args:
path: The path to remove.
"""
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The source path. |
required |
dst |
Union[bytes, str] |
The destination path. |
required |
overwrite |
bool |
Whether to overwrite the destination file if it exists. |
False |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The source path.
dst: The destination path.
overwrite: Whether to overwrite the destination file if it exists.
"""
rmtree(self, path)
Deletes dir recursively. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to delete. |
required |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def rmtree(self, path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
path: The path to delete.
"""
size(self, path)
Get the size of a file in bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
Optional[int] |
The size of the file in bytes or |
Source code in zenml/artifact_stores/base_artifact_store.py
def size(self, path: PathType) -> Optional[int]:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes or `None` if the artifact store
does not implement the `size` method.
"""
logger.warning(
"Cannot get size of file '%s' since the artifact store %s does not "
"implement the `size` method.",
path,
self.__class__.__name__,
)
return None
stat(self, path)
Return the stat descriptor for a given file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
Any |
The stat descriptor. |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def stat(self, path: PathType) -> Any:
"""Return the stat descriptor for a given file path.
Args:
path: The path to check.
Returns:
The stat descriptor.
"""
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
The path to walk. |
required |
topdown |
bool |
Whether to walk the top-down or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
The error handler. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
The iterator that walks the contents of the given directory. |
Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: The path to walk.
topdown: Whether to walk the top-down or bottom-up.
onerror: The error handler.
Returns:
The iterator that walks the contents of the given directory.
"""
BaseArtifactStoreConfig (StackComponentConfig)
pydantic-model
Config class for BaseArtifactStore
.
Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStoreConfig(StackComponentConfig):
"""Config class for `BaseArtifactStore`."""
path: str
SUPPORTED_SCHEMES: ClassVar[Set[str]]
@root_validator(skip_on_failure=True)
def _ensure_artifact_store(cls, values: Dict[str, Any]) -> Any:
"""Validator function for the Artifact Stores.
Checks whether supported schemes are defined and the given path is
supported.
Args:
values: The values to validate.
Returns:
The validated values.
Raises:
ArtifactStoreInterfaceError: If the scheme is not supported.
"""
try:
getattr(cls, "SUPPORTED_SCHEMES")
except AttributeError:
raise ArtifactStoreInterfaceError(
textwrap.dedent(
"""
When you are working with any classes which subclass from
zenml.artifact_store.BaseArtifactStore please make sure
that your class has a ClassVar named `SUPPORTED_SCHEMES`
which should hold a set of supported file schemes such
as {"s3://"} or {"gcs://"}.
Example:
class MyArtifactStoreConfig(BaseArtifactStoreConfig):
...
# Class Variables
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
...
"""
)
)
values["path"] = values["path"].strip("'\"`")
if not any(
values["path"].startswith(i) for i in cls.SUPPORTED_SCHEMES
):
raise ArtifactStoreInterfaceError(
f"The path: '{values['path']}' you defined for your "
f"artifact store is not supported by the implementation of "
f"{cls.schema()['title']}, because it does not start with "
f"one of its supported schemes: {cls.SUPPORTED_SCHEMES}."
)
return values
BaseArtifactStoreFlavor (Flavor)
Base class for artifact store flavors.
Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStoreFlavor(Flavor):
"""Base class for artifact store flavors."""
@property
def type(self) -> StackComponentType:
"""Returns the flavor type.
Returns:
The flavor type.
"""
return StackComponentType.ARTIFACT_STORE
@property
def config_class(self) -> Type[StackComponentConfig]:
"""Config class for this flavor.
Returns:
The config class.
"""
return BaseArtifactStoreConfig
@property
@abstractmethod
def implementation_class(self) -> Type["BaseArtifactStore"]:
"""Implementation class.
Returns:
The implementation class.
"""
config_class: Type[zenml.stack.stack_component.StackComponentConfig]
property
readonly
Config class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.stack.stack_component.StackComponentConfig] |
The config class. |
implementation_class: Type[BaseArtifactStore]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[BaseArtifactStore] |
The implementation class. |
type: StackComponentType
property
readonly
Returns the flavor type.
Returns:
Type | Description |
---|---|
StackComponentType |
The flavor type. |
base_artifact_store_logging_handler
Logging handler for artifact stores.
ArtifactStoreLoggingHandler (TimedRotatingFileHandler)
Handler for logging to artifact stores.
Source code in zenml/artifact_stores/base_artifact_store_logging_handler.py
class ArtifactStoreLoggingHandler(TimedRotatingFileHandler):
"""Handler for logging to artifact stores."""
def __init__(self, logs_uri: str):
"""Initializes the handler.
Args:
logs_uri: URI of the logs file.
"""
self.logs_uri = logs_uri
self.max_messages = LOGS_HANDLER_MAX_MESSAGES
self.buffer = io.StringIO()
self.message_count = 0
self.last_upload_time = time.time()
self.local_temp_file: Optional[str] = None
# set local_logging_file to self.logs_uri if self.logs_uri is a local path
# otherwise, set local_logging_file to a temporary file
if is_remote(self.logs_uri):
# We log to a temporary file first, because
# TimedRotatingFileHandler does not support writing
# to a remote file, but still needs a file to get going
local_logging_file = f".zenml_tmp_logs_{int(time.time())}"
self.local_temp_file = local_logging_file
else:
local_logging_file = self.logs_uri
super().__init__(
local_logging_file,
when="s",
interval=LOGS_HANDLER_INTERVAL_SECONDS,
)
def emit(self, record: LogRecord) -> None:
"""Emits the log record.
Args:
record: Log record to emit.
"""
msg = self.format(record)
self.buffer.write(msg + "\n")
self.message_count += 1
current_time = time.time()
time_elapsed = current_time - self.last_upload_time
if (
self.message_count >= self.max_messages
or time_elapsed >= self.interval
):
self.flush()
def flush(self) -> None:
"""Flushes the buffer to the artifact store."""
try:
with fileio.open(self.logs_uri, mode="wb") as log_file:
log_file.write(self.buffer.getvalue().encode("utf-8"))
except (OSError, IOError) as e:
# This exception can be raised if there are issues with the underlying system calls,
# such as reaching the maximum number of open files, permission issues, file corruption,
# or other I/O errors
logger.error(f"Error while trying to write logs: {e}")
self.message_count = 0
self.last_upload_time = time.time()
def doRollover(self) -> None:
"""Flushes the buffer and performs a rollover."""
self.flush()
super().doRollover()
def close(self) -> None:
"""Tidy up any resources used by the handler."""
self.flush()
super().close()
if self.local_temp_file and fileio.exists(self.local_temp_file):
fileio.remove(self.local_temp_file)
__init__(self, logs_uri)
special
Initializes the handler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logs_uri |
str |
URI of the logs file. |
required |
Source code in zenml/artifact_stores/base_artifact_store_logging_handler.py
def __init__(self, logs_uri: str):
"""Initializes the handler.
Args:
logs_uri: URI of the logs file.
"""
self.logs_uri = logs_uri
self.max_messages = LOGS_HANDLER_MAX_MESSAGES
self.buffer = io.StringIO()
self.message_count = 0
self.last_upload_time = time.time()
self.local_temp_file: Optional[str] = None
# set local_logging_file to self.logs_uri if self.logs_uri is a local path
# otherwise, set local_logging_file to a temporary file
if is_remote(self.logs_uri):
# We log to a temporary file first, because
# TimedRotatingFileHandler does not support writing
# to a remote file, but still needs a file to get going
local_logging_file = f".zenml_tmp_logs_{int(time.time())}"
self.local_temp_file = local_logging_file
else:
local_logging_file = self.logs_uri
super().__init__(
local_logging_file,
when="s",
interval=LOGS_HANDLER_INTERVAL_SECONDS,
)
close(self)
Tidy up any resources used by the handler.
Source code in zenml/artifact_stores/base_artifact_store_logging_handler.py
def close(self) -> None:
"""Tidy up any resources used by the handler."""
self.flush()
super().close()
if self.local_temp_file and fileio.exists(self.local_temp_file):
fileio.remove(self.local_temp_file)
doRollover(self)
Flushes the buffer and performs a rollover.
Source code in zenml/artifact_stores/base_artifact_store_logging_handler.py
def doRollover(self) -> None:
"""Flushes the buffer and performs a rollover."""
self.flush()
super().doRollover()
emit(self, record)
Emits the log record.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
record |
LogRecord |
Log record to emit. |
required |
Source code in zenml/artifact_stores/base_artifact_store_logging_handler.py
def emit(self, record: LogRecord) -> None:
"""Emits the log record.
Args:
record: Log record to emit.
"""
msg = self.format(record)
self.buffer.write(msg + "\n")
self.message_count += 1
current_time = time.time()
time_elapsed = current_time - self.last_upload_time
if (
self.message_count >= self.max_messages
or time_elapsed >= self.interval
):
self.flush()
flush(self)
Flushes the buffer to the artifact store.
Source code in zenml/artifact_stores/base_artifact_store_logging_handler.py
def flush(self) -> None:
"""Flushes the buffer to the artifact store."""
try:
with fileio.open(self.logs_uri, mode="wb") as log_file:
log_file.write(self.buffer.getvalue().encode("utf-8"))
except (OSError, IOError) as e:
# This exception can be raised if there are issues with the underlying system calls,
# such as reaching the maximum number of open files, permission issues, file corruption,
# or other I/O errors
logger.error(f"Error while trying to write logs: {e}")
self.message_count = 0
self.last_upload_time = time.time()
local_artifact_store
The local artifact store is a local implementation of the artifact store.
In ZenML, the inputs and outputs which go through any step is treated as an
artifact and as its name suggests, an ArtifactStore
is a place where these
artifacts get stored.
LocalArtifactStore (LocalFilesystem, BaseArtifactStore)
Artifact Store for local artifacts.
All methods are inherited from the default LocalFilesystem
.
Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStore(LocalFilesystem, BaseArtifactStore):
"""Artifact Store for local artifacts.
All methods are inherited from the default `LocalFilesystem`.
"""
_path: Optional[str] = None
@staticmethod
def get_default_local_path(id_: "UUID") -> str:
"""Returns the default local path for a local artifact store.
Args:
id_: The id of the local artifact store.
Returns:
str: The default local path.
"""
return os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
@property
def path(self) -> str:
"""Returns the path to the local artifact store.
If the user has not defined a path in the config, this will create a
sub-folder in the global config directory.
Returns:
The path to the local artifact store.
"""
if self._path:
return self._path
if self.config.path:
self._path = self.config.path
else:
self._path = self.get_default_local_path(self.id)
io_utils.create_dir_recursive_if_not_exists(self._path)
return self._path
@property
def local_path(self) -> Optional[str]:
"""Returns the local path of the artifact store.
Returns:
The local path of the artifact store.
"""
return self.path
local_path: Optional[str]
property
readonly
Returns the local path of the artifact store.
Returns:
Type | Description |
---|---|
Optional[str] |
The local path of the artifact store. |
path: str
property
readonly
Returns the path to the local artifact store.
If the user has not defined a path in the config, this will create a sub-folder in the global config directory.
Returns:
Type | Description |
---|---|
str |
The path to the local artifact store. |
get_default_local_path(id_)
staticmethod
Returns the default local path for a local artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
UUID |
The id of the local artifact store. |
required |
Returns:
Type | Description |
---|---|
str |
The default local path. |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def get_default_local_path(id_: "UUID") -> str:
"""Returns the default local path for a local artifact store.
Args:
id_: The id of the local artifact store.
Returns:
str: The default local path.
"""
return os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
LocalArtifactStoreConfig (BaseArtifactStoreConfig)
pydantic-model
Config class for the local artifact store.
Attributes:
Name | Type | Description |
---|---|---|
path |
str |
The path to the local artifact store. |
Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStoreConfig(BaseArtifactStoreConfig):
"""Config class for the local artifact store.
Attributes:
path: The path to the local artifact store.
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}
path: str = ""
@validator("path")
def ensure_path_local(cls, path: str) -> str:
"""Pydantic validator which ensures that the given path is a local path.
Args:
path: The path to validate.
Returns:
str: The validated (local) path.
Raises:
ArtifactStoreInterfaceError: If the given path is not a local path.
"""
remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
if any(path.startswith(prefix) for prefix in remote_prefixes):
raise ArtifactStoreInterfaceError(
f"The path '{path}' you defined for your local artifact store "
f"starts with a remote prefix."
)
return path
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
return True
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
ensure_path_local(path)
classmethod
Pydantic validator which ensures that the given path is a local path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to validate. |
required |
Returns:
Type | Description |
---|---|
str |
The validated (local) path. |
Exceptions:
Type | Description |
---|---|
ArtifactStoreInterfaceError |
If the given path is not a local path. |
Source code in zenml/artifact_stores/local_artifact_store.py
@validator("path")
def ensure_path_local(cls, path: str) -> str:
"""Pydantic validator which ensures that the given path is a local path.
Args:
path: The path to validate.
Returns:
str: The validated (local) path.
Raises:
ArtifactStoreInterfaceError: If the given path is not a local path.
"""
remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
if any(path.startswith(prefix) for prefix in remote_prefixes):
raise ArtifactStoreInterfaceError(
f"The path '{path}' you defined for your local artifact store "
f"starts with a remote prefix."
)
return path
LocalArtifactStoreFlavor (BaseArtifactStoreFlavor)
Class for the LocalArtifactStoreFlavor
.
Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStoreFlavor(BaseArtifactStoreFlavor):
"""Class for the `LocalArtifactStoreFlavor`."""
@property
def name(self) -> str:
"""Returns the name of the artifact store flavor.
Returns:
str: The name of the artifact store flavor.
"""
return "local"
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/local.svg"
@property
def config_class(self) -> Type[LocalArtifactStoreConfig]:
"""Config class for this flavor.
Returns:
The config class.
"""
return LocalArtifactStoreConfig
@property
def implementation_class(self) -> Type[LocalArtifactStore]:
"""Implementation class.
Returns:
The implementation class.
"""
return LocalArtifactStore
config_class: Type[zenml.artifact_stores.local_artifact_store.LocalArtifactStoreConfig]
property
readonly
Config class for this flavor.
Returns:
Type | Description |
---|---|
Type[zenml.artifact_stores.local_artifact_store.LocalArtifactStoreConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[zenml.artifact_stores.local_artifact_store.LocalArtifactStore]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[zenml.artifact_stores.local_artifact_store.LocalArtifactStore] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Returns the name of the artifact store flavor.
Returns:
Type | Description |
---|---|
str |
The name of the artifact store flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
step_logging_utils
Utilities for step logs.
get_step_logging_handler(logs_uri)
Sets up a logging handler for the artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
logs_uri |
str |
The URI of the output artifact. |
required |
Returns:
Type | Description |
---|---|
ArtifactStoreLoggingHandler |
The logging handler. |
Source code in zenml/artifact_stores/step_logging_utils.py
def get_step_logging_handler(logs_uri: str) -> ArtifactStoreLoggingHandler:
"""Sets up a logging handler for the artifact store.
Args:
logs_uri: The URI of the output artifact.
Returns:
The logging handler.
"""
log_format = "%(asctime)s - %(message)s"
date_format = "%Y-%m-%dT%H:%M:%S" # ISO 8601 format
formatter = logging.Formatter(log_format, datefmt=date_format)
handler = ArtifactStoreLoggingHandler(logs_uri)
handler.setFormatter(formatter)
return handler
prepare_logs_uri(artifact_store, step_name, log_key=None)
Generates and prepares a URI for the log file for a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_store |
BaseArtifactStore |
The artifact store on which the artifact will be stored. |
required |
step_name |
str |
Name of the step. |
required |
log_key |
Optional[str] |
The unique identification key of the log file. |
None |
Returns:
Type | Description |
---|---|
str |
The URI of the logs file. |
Source code in zenml/artifact_stores/step_logging_utils.py
def prepare_logs_uri(
artifact_store: "BaseArtifactStore",
step_name: str,
log_key: Optional[str] = None,
) -> str:
"""Generates and prepares a URI for the log file for a step.
Args:
artifact_store: The artifact store on which the artifact will be stored.
step_name: Name of the step.
log_key: The unique identification key of the log file.
Returns:
The URI of the logs file.
"""
if log_key is None:
log_key = str(uuid4())
logs_base_uri = os.path.join(
artifact_store.path,
step_name,
"logs",
)
# Create the dir
if not fileio.exists(logs_base_uri):
fileio.makedirs(logs_base_uri)
# Delete the file if it already exists
logs_uri = os.path.join(logs_base_uri, f"{log_key}.log")
if fileio.exists(logs_uri):
logger.warning(
f"Logs file {logs_uri} already exists! Removing old log file..."
)
fileio.remove(logs_uri)
return logs_uri