S3
zenml.integrations.s3
special
Initialization of the S3 integration.
The S3 integration allows the use of cloud artifact stores and file operations on S3 buckets.
S3Integration (Integration)
Definition of S3 integration for ZenML.
Source code in zenml/integrations/s3/__init__.py
class S3Integration(Integration):
"""Definition of S3 integration for ZenML."""
NAME = S3
# boto3 isn't required for the filesystem to work, but it is required
# for the AWS/S3 connector that can be used with the artifact store.
# NOTE: to keep the dependency resolution for botocore consistent and fast
# between s3fs and boto3, the boto3 upper version used here should be the
# same as the one resolved by pip when installing boto3 without a
# restriction alongside s3fs, e.g.:
#
# pip install 's3fs>2022.3.0,<=2023.4.0' boto3
#
# The above command installs boto3==1.26.76, so we use the same version
# here to avoid the dependency resolution overhead.
REQUIREMENTS = [
"s3fs>2022.3.0",
"boto3",
# The following dependencies are only required for the AWS connector.
"aws-profile-manager",
]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the s3 integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor
return [S3ArtifactStoreFlavor]
flavors()
classmethod
Declare the stack component flavors for the s3 integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/s3/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the s3 integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor
return [S3ArtifactStoreFlavor]
artifact_stores
special
Initialization of the S3 Artifact Store.
s3_artifact_store
Implementation of the S3 Artifact Store.
S3ArtifactStore (BaseArtifactStore, AuthenticationMixin)
Artifact Store for S3 based artifacts.
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
class S3ArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for S3 based artifacts."""
_filesystem: Optional[ZenMLS3Filesystem] = None
is_versioned: bool = False
def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
"""Initializes the artifact store.
Args:
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
"""
super().__init__(*args, **kwargs)
self._boto3_bucket_holder = None
# determine bucket versioning status
versioning = self._boto3_bucket.Versioning()
with self._shield_lack_of_versioning_permissions(
"s3:GetBucketVersioning"
):
if versioning.status == "Enabled":
self.is_versioned = True
logger.warning(
f"The artifact store bucket `{self.config.bucket}` is versioned, "
"this may slow down logging process significantly."
)
@property
def config(self) -> S3ArtifactStoreConfig:
"""Get the config of this artifact store.
Returns:
The config of this artifact store.
"""
return cast(S3ArtifactStoreConfig, self._config)
def get_credentials(
self,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""Gets authentication credentials.
If an authentication secret is configured, the secret values are
returned. Otherwise, we fall back to the plain text component
attributes.
Returns:
Tuple (key, secret, token, region) of credentials used to
authenticate with the S3 filesystem.
Raises:
RuntimeError: If the AWS connector behaves unexpectedly.
"""
connector = self.get_connector()
if connector:
from botocore.client import BaseClient
client = connector.connect()
if not isinstance(client, BaseClient):
raise RuntimeError(
f"Expected a botocore.client.BaseClient while trying to "
f"use the linked connector, but got {type(client)}."
)
credentials = client.credentials
return (
credentials.access_key,
credentials.secret_key,
credentials.token,
client.meta.region_name,
)
secret = self.get_typed_authentication_secret(
expected_schema_type=AWSSecretSchema
)
if secret:
return (
secret.aws_access_key_id,
secret.aws_secret_access_key,
secret.aws_session_token,
None,
)
else:
return self.config.key, self.config.secret, self.config.token, None
@property
def filesystem(self) -> ZenMLS3Filesystem:
"""The s3 filesystem to access this artifact store.
Returns:
The s3 filesystem.
"""
# Refresh the credentials also if the connector has expired
if self._filesystem and not self.connector_has_expired():
return self._filesystem
key, secret, token, region = self.get_credentials()
# Use the region from the connector if available, otherwise some
# remote workloads (e.g. Sagemaker) might not work correctly because
# they look for the bucket in the wrong region
client_kwargs = {}
if region:
client_kwargs["region_name"] = region
if self.config.client_kwargs:
client_kwargs.update(self.config.client_kwargs)
self._filesystem = ZenMLS3Filesystem(
key=key,
secret=secret,
token=token,
client_kwargs=client_kwargs,
config_kwargs=self.config.config_kwargs,
s3_additional_kwargs=self.config.s3_additional_kwargs,
)
return self._filesystem
def cleanup(self) -> None:
"""Close the filesystem."""
if self._filesystem:
self._filesystem.close()
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of paths that are files in the given directory.
"""
# remove s3 prefix if given, so we can remove the directory later as
# this method is expected to only return filenames
path = convert_to_str(path)
if path.startswith("s3://"):
path = path[5:]
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by the S3 filesystem.
Args:
file_dict: A file info dict returned by the S3 filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["Key"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
# s3fs.listdir also returns the root directory, so we filter
# it out here
if _extract_basename(dict_)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path of the file to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
A dictionary containing the stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def size(self, path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return self.filesystem.size(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
for directory, subdirectories, files in self.filesystem.walk(path=top):
yield f"s3://{directory}", subdirectories, files
def _remove_previous_file_versions(self, path: PathType) -> None:
"""Keep only the latest file version in the given path.
Method is useful for logs stored in versioned file systems
like AWS S3.
Args:
path: The path to the file.
"""
if self.is_versioned:
if isinstance(path, bytes):
path = path.decode()
_, prefix = split_s3_path(path)
with self._shield_lack_of_versioning_permissions(
"s3:ListBucketVersions"
):
for version in self._boto3_bucket.object_versions.filter(
Prefix=prefix
):
if not version.is_latest:
with self._shield_lack_of_versioning_permissions(
"s3:DeleteObjectVersion"
):
version.delete()
@property
def _boto3_bucket(self) -> Any:
"""Get the boto3 bucket object.
Returns:
The boto3 bucket object.
"""
if self._boto3_bucket_holder and not self.connector_has_expired():
return self._boto3_bucket_holder
key, secret, token, region = self.get_credentials()
s3 = boto3.resource(
"s3",
aws_access_key_id=key,
aws_secret_access_key=secret,
aws_session_token=token,
region_name=region,
)
self._boto3_bucket_holder = s3.Bucket(self.config.bucket)
return self._boto3_bucket_holder
@contextmanager
def _shield_lack_of_versioning_permissions(
self, auth_missing: str
) -> Generator[Any, None, None]:
try:
yield
except ClientError as e:
if "not authorized" in e.args[0] and auth_missing in e.args[0]:
logger.warning(
"Your AWS Connector is lacking critical Versioning permissions. "
f"Please check that `{auth_missing}` is granted."
"This is needed to remove previous versions of log files from your "
"Artifact Store bucket."
)
self.is_versioned = False
config: S3ArtifactStoreConfig
property
readonly
Get the config of this artifact store.
Returns:
Type | Description |
---|---|
S3ArtifactStoreConfig |
The config of this artifact store. |
filesystem: ZenMLS3Filesystem
property
readonly
The s3 filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
ZenMLS3Filesystem |
The s3 filesystem. |
__init__(self, *args, **kwargs)
special
Initializes the artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Additional positional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
"""Initializes the artifact store.
Args:
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
"""
super().__init__(*args, **kwargs)
self._boto3_bucket_holder = None
# determine bucket versioning status
versioning = self._boto3_bucket.Versioning()
with self._shield_lack_of_versioning_permissions(
"s3:GetBucketVersioning"
):
if versioning.status == "Enabled":
self.is_versioned = True
logger.warning(
f"The artifact store bucket `{self.config.bucket}` is versioned, "
"this may slow down logging process significantly."
)
cleanup(self)
Close the filesystem.
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def cleanup(self) -> None:
"""Close the filesystem."""
if self._filesystem:
self._filesystem.close()
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
get_credentials(self)
Gets authentication credentials.
If an authentication secret is configured, the secret values are returned. Otherwise, we fall back to the plain text component attributes.
Returns:
Type | Description |
---|---|
Tuple[Optional[str], Optional[str], Optional[str], Optional[str]] |
Tuple (key, secret, token, region) of credentials used to authenticate with the S3 filesystem. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the AWS connector behaves unexpectedly. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def get_credentials(
self,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
"""Gets authentication credentials.
If an authentication secret is configured, the secret values are
returned. Otherwise, we fall back to the plain text component
attributes.
Returns:
Tuple (key, secret, token, region) of credentials used to
authenticate with the S3 filesystem.
Raises:
RuntimeError: If the AWS connector behaves unexpectedly.
"""
connector = self.get_connector()
if connector:
from botocore.client import BaseClient
client = connector.connect()
if not isinstance(client, BaseClient):
raise RuntimeError(
f"Expected a botocore.client.BaseClient while trying to "
f"use the linked connector, but got {type(client)}."
)
credentials = client.credentials
return (
credentials.access_key,
credentials.secret_key,
credentials.token,
client.meta.region_name,
)
secret = self.get_typed_authentication_secret(
expected_schema_type=AWSSecretSchema
)
if secret:
return (
secret.aws_access_key_id,
secret.aws_secret_access_key,
secret.aws_session_token,
None,
)
else:
return self.config.key, self.config.secret, self.config.token, None
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that are files in the given directory. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of paths that are files in the given directory.
"""
# remove s3 prefix if given, so we can remove the directory later as
# this method is expected to only return filenames
path = convert_to_str(path)
if path.startswith("s3://"):
path = path[5:]
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by the S3 filesystem.
Args:
file_dict: A file info dict returned by the S3 filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["Key"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
# s3fs.listdir also returns the root directory, so we filter
# it out here
if _extract_basename(dict_)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the file to remove. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path of the file to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
size(self, path)
Get the size of a file in bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
int |
The size of the file in bytes. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def size(self, path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return self.filesystem.size(path=path) # type: ignore[no-any-return]
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary containing the stat info. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
A dictionary containing the stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
for directory, subdirectories, files in self.filesystem.walk(path=top):
yield f"s3://{directory}", subdirectories, files
ZenMLS3Filesystem (S3FileSystem)
Modified s3fs.S3FileSystem to disable caching.
The original s3fs.S3FileSystem caches all class instances based on the constructor input arguments and it never releases them. This is problematic in the context of the ZenML server, because the server is a long-running process that instantiates many S3 filesystems with different credentials, especially when the credentials are generated by service connectors.
The caching behavior of s3fs causes the server to slowly consume more and
more memory over time until it crashes. This class disables the caching
behavior of s3fs by setting the cachable
attribute to False
.
In addition to disabling instance caching, this class also provides a
correct cleanup implementation by overriding the close_session
method
the S3 aiobotocore client. The original one provided by s3fs was causing
memory leaks by creating a new event loop in the destructor instead of
using the existing one.
A close
method is also provided to allow for synchronous on-demand cleanup
of the S3 client.
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
class ZenMLS3Filesystem(s3fs.S3FileSystem): # type: ignore[misc]
"""Modified s3fs.S3FileSystem to disable caching.
The original s3fs.S3FileSystem caches all class instances based on the
constructor input arguments and it never releases them. This is problematic
in the context of the ZenML server, because the server is a long-running
process that instantiates many S3 filesystems with different credentials,
especially when the credentials are generated by service connectors.
The caching behavior of s3fs causes the server to slowly consume more and
more memory over time until it crashes. This class disables the caching
behavior of s3fs by setting the `cachable` attribute to `False`.
In addition to disabling instance caching, this class also provides a
correct cleanup implementation by overriding the `close_session` method
the S3 aiobotocore client. The original one provided by s3fs was causing
memory leaks by creating a new event loop in the destructor instead of
using the existing one.
A `close` method is also provided to allow for synchronous on-demand cleanup
of the S3 client.
"""
cachable = False
async def _close(self) -> None:
"""Close the S3 client."""
if self._s3creator is not None: # type: ignore[has-type]
await self._s3creator.__aexit__(None, None, None) # type: ignore[has-type]
self._s3creator = None
self._s3 = None
close = sync_wrapper(_close)
@staticmethod
def close_session(loop: Any, s3: Any) -> None:
"""Close the S3 client session.
Args:
loop: The event loop to use for closing the session.
s3: The S3 client to close.
"""
# IMPORTANT: This method is a copy of the original close_session method
# from s3fs.S3FileSystem. The only difference is that it uses the
# provided event loop instead of creating a new one.
if loop is not None and loop.is_running():
try:
# NOTE: this is the line in the original method that causes
# the memory leak
# loop = asyncio.get_event_loop()
loop.create_task(s3.__aexit__(None, None, None))
return
except RuntimeError:
pass
try:
sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
return
except FSTimeoutError:
pass
try:
# close the actual socket
s3._client._endpoint.http_session._connector._close()
except AttributeError:
# but during shutdown, it may have gone
pass
close_session(loop, s3)
staticmethod
Close the S3 client session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
loop |
Any |
The event loop to use for closing the session. |
required |
s3 |
Any |
The S3 client to close. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
@staticmethod
def close_session(loop: Any, s3: Any) -> None:
"""Close the S3 client session.
Args:
loop: The event loop to use for closing the session.
s3: The S3 client to close.
"""
# IMPORTANT: This method is a copy of the original close_session method
# from s3fs.S3FileSystem. The only difference is that it uses the
# provided event loop instead of creating a new one.
if loop is not None and loop.is_running():
try:
# NOTE: this is the line in the original method that causes
# the memory leak
# loop = asyncio.get_event_loop()
loop.create_task(s3.__aexit__(None, None, None))
return
except RuntimeError:
pass
try:
sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
return
except FSTimeoutError:
pass
try:
# close the actual socket
s3._client._endpoint.http_session._connector._close()
except AttributeError:
# but during shutdown, it may have gone
pass
flavors
special
Amazon S3 integration flavors.
s3_artifact_store_flavor
Amazon S3 artifact store flavor.
S3ArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)
Configuration for the S3 Artifact Store.
All attributes of this class except path
will be passed to the
s3fs.S3FileSystem
initialization. See
here for more information on how
to use those configuration options to connect to any S3-compatible storage.
When you want to register an S3ArtifactStore from the CLI and need to pass
client_kwargs
, config_kwargs
or s3_additional_kwargs
, you should pass
them as a json string:
zenml artifact-store register my_s3_store --flavor=s3 --path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
class S3ArtifactStoreConfig(
BaseArtifactStoreConfig, AuthenticationConfigMixin
):
"""Configuration for the S3 Artifact Store.
All attributes of this class except `path` will be passed to the
`s3fs.S3FileSystem` initialization. See
[here](https://s3fs.readthedocs.io/en/latest/) for more information on how
to use those configuration options to connect to any S3-compatible storage.
When you want to register an S3ArtifactStore from the CLI and need to pass
`client_kwargs`, `config_kwargs` or `s3_additional_kwargs`, you should pass
them as a json string:
```
zenml artifact-store register my_s3_store --flavor=s3 \
--path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
```
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
key: Optional[str] = SecretField(default=None)
secret: Optional[str] = SecretField(default=None)
token: Optional[str] = SecretField(default=None)
client_kwargs: Optional[Dict[str, Any]] = None
config_kwargs: Optional[Dict[str, Any]] = None
s3_additional_kwargs: Optional[Dict[str, Any]] = None
_bucket: Optional[str] = None
@field_validator("client_kwargs")
@classmethod
def _validate_client_kwargs(
cls, value: Optional[Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Validates the `client_kwargs` attribute.
Args:
value: The value to validate.
Raises:
ValueError: If the value is not a valid URL.
Returns:
The validated value.
"""
if value is None:
return value
if "endpoint_url" in value and value["endpoint_url"]:
url = value["endpoint_url"].rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for endpoint url: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
value["endpoint_url"] = replace_localhost_with_internal_hostname(
url
)
return value
@property
def bucket(self) -> str:
"""The bucket name of the artifact store.
Returns:
The bucket name of the artifact store.
"""
if self._bucket is None:
self._bucket, _ = split_s3_path(self.path)
return self._bucket
bucket: str
property
readonly
The bucket name of the artifact store.
Returns:
Type | Description |
---|---|
str |
The bucket name of the artifact store. |
model_post_init(/, self, context)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
BaseModel |
The BaseModel instance. |
required |
context |
Any |
The context. |
required |
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
def init_private_attributes(self: BaseModel, context: Any, /) -> None:
"""This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args:
self: The BaseModel instance.
context: The context.
"""
if getattr(self, '__pydantic_private__', None) is None:
pydantic_private = {}
for name, private_attr in self.__private_attributes__.items():
default = private_attr.get_default()
if default is not PydanticUndefined:
pydantic_private[name] = default
object_setattr(self, '__pydantic_private__', pydantic_private)
S3ArtifactStoreFlavor (BaseArtifactStoreFlavor)
Flavor of the S3 artifact store.
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
class S3ArtifactStoreFlavor(BaseArtifactStoreFlavor):
"""Flavor of the S3 artifact store."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return S3_ARTIFACT_STORE_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type="s3-bucket",
resource_id_attr="path",
)
@property
def docs_url(self) -> Optional[str]:
"""A URL to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A URL to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A URL to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/aws.png"
@property
def config_class(self) -> Type[S3ArtifactStoreConfig]:
"""The config class of the flavor.
Returns:
The config class of the flavor.
"""
return S3ArtifactStoreConfig
@property
def implementation_class(self) -> Type["S3ArtifactStore"]:
"""Implementation class for this flavor.
Returns:
The implementation class for this flavor.
"""
from zenml.integrations.s3.artifact_stores import S3ArtifactStore
return S3ArtifactStore
config_class: Type[zenml.integrations.s3.flavors.s3_artifact_store_flavor.S3ArtifactStoreConfig]
property
readonly
The config class of the flavor.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.s3.flavors.s3_artifact_store_flavor.S3ArtifactStoreConfig] |
The config class of the flavor. |
docs_url: Optional[str]
property
readonly
A URL to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[S3ArtifactStore]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[S3ArtifactStore] |
The implementation class for this flavor. |
logo_url: str
property
readonly
A URL to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A URL to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
utils
Utility methods for S3.
split_s3_path(s3_path)
Split S3 URI into bucket and key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
s3_path |
str |
S3 URI (e.g. "s3://bucket/path") |
required |
Returns:
Type | Description |
---|---|
A tuple of bucket and key, for "s3 |
//bucket/path/path2" it will return ("bucket","path/path2") |
Exceptions:
Type | Description |
---|---|
ValueError |
if the S3 URI is invalid |
Source code in zenml/integrations/s3/utils.py
def split_s3_path(s3_path: str) -> Tuple[str, str]:
"""Split S3 URI into bucket and key.
Args:
s3_path: S3 URI (e.g. "s3://bucket/path")
Returns:
A tuple of bucket and key, for "s3://bucket/path/path2"
it will return ("bucket","path/path2")
Raises:
ValueError: if the S3 URI is invalid
"""
if not s3_path.startswith("s3://"):
raise ValueError(
f"Invalid S3 URI given: {s3_path}. It should start with `s3://`."
)
path_parts = s3_path.replace("s3://", "").split("/")
bucket = path_parts.pop(0)
key = "/".join(path_parts)
return bucket, key