Skip to content

S3

zenml.integrations.s3 special

Initialization of the S3 integration.

The S3 integration allows the use of cloud artifact stores and file operations on S3 buckets.

S3Integration (Integration)

Definition of S3 integration for ZenML.

Source code in zenml/integrations/s3/__init__.py
class S3Integration(Integration):
    """Definition of S3 integration for ZenML."""

    NAME = S3
    # boto3 isn't required for the filesystem to work, but it is required
    # for the AWS/S3 connector that can be used with the artifact store.
    # NOTE: to keep the dependency resolution for botocore consistent and fast
    # between s3fs and boto3, the boto3 upper version used here should be the
    # same as the one resolved by pip when installing boto3 without a
    # restriction alongside s3fs, e.g.:
    #
    #   pip install 's3fs>2022.3.0,<=2023.4.0' boto3
    #
    # The above command installs boto3==1.26.76, so we use the same version
    # here to avoid the dependency resolution overhead.
    REQUIREMENTS = [
        "s3fs>2022.3.0",
        "boto3",
        # The following dependencies are only required for the AWS connector.
        "aws-profile-manager",
    ]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the s3 integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor

        return [S3ArtifactStoreFlavor]

flavors() classmethod

Declare the stack component flavors for the s3 integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/s3/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the s3 integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor

    return [S3ArtifactStoreFlavor]

artifact_stores special

Initialization of the S3 Artifact Store.

s3_artifact_store

Implementation of the S3 Artifact Store.

S3ArtifactStore (BaseArtifactStore, AuthenticationMixin)

Artifact Store for S3 based artifacts.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
class S3ArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for S3 based artifacts."""

    _filesystem: Optional[ZenMLS3Filesystem] = None

    is_versioned: bool = False

    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Initializes the artifact store.

        Args:
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(*args, **kwargs)
        self._boto3_bucket_holder = None

        # determine bucket versioning status
        versioning = self._boto3_bucket.Versioning()
        with self._shield_lack_of_versioning_permissions(
            "s3:GetBucketVersioning"
        ):
            if versioning.status == "Enabled":
                self.is_versioned = True
                logger.warning(
                    f"The artifact store bucket `{self.config.bucket}` is versioned, "
                    "this may slow down logging process significantly."
                )

    @property
    def config(self) -> S3ArtifactStoreConfig:
        """Get the config of this artifact store.

        Returns:
            The config of this artifact store.
        """
        return cast(S3ArtifactStoreConfig, self._config)

    def get_credentials(
        self,
    ) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
        """Gets authentication credentials.

        If an authentication secret is configured, the secret values are
        returned. Otherwise, we fall back to the plain text component
        attributes.

        Returns:
            Tuple (key, secret, token, region) of credentials used to
            authenticate with the S3 filesystem.

        Raises:
            RuntimeError: If the AWS connector behaves unexpectedly.
        """
        connector = self.get_connector()
        if connector:
            from botocore.client import BaseClient

            client = connector.connect()
            if not isinstance(client, BaseClient):
                raise RuntimeError(
                    f"Expected a botocore.client.BaseClient while trying to "
                    f"use the linked connector, but got {type(client)}."
                )
            credentials = client.credentials
            return (
                credentials.access_key,
                credentials.secret_key,
                credentials.token,
                client.meta.region_name,
            )

        secret = self.get_typed_authentication_secret(
            expected_schema_type=AWSSecretSchema
        )
        if secret:
            return (
                secret.aws_access_key_id,
                secret.aws_secret_access_key,
                secret.aws_session_token,
                None,
            )
        else:
            return self.config.key, self.config.secret, self.config.token, None

    @property
    def filesystem(self) -> ZenMLS3Filesystem:
        """The s3 filesystem to access this artifact store.

        Returns:
            The s3 filesystem.
        """
        # Refresh the credentials also if the connector has expired
        if self._filesystem and not self.connector_has_expired():
            return self._filesystem

        key, secret, token, region = self.get_credentials()

        # Use the region from the connector if available, otherwise some
        # remote workloads (e.g. Sagemaker) might not work correctly because
        # they look for the bucket in the wrong region
        client_kwargs = {}
        if region:
            client_kwargs["region_name"] = region
        if self.config.client_kwargs:
            client_kwargs.update(self.config.client_kwargs)

        self._filesystem = ZenMLS3Filesystem(
            key=key,
            secret=secret,
            token=token,
            client_kwargs=client_kwargs,
            config_kwargs=self.config.config_kwargs,
            s3_additional_kwargs=self.config.s3_additional_kwargs,
        )
        return self._filesystem

    def cleanup(self) -> None:
        """Close the filesystem."""
        if self._filesystem:
            self._filesystem.close()

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )

        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
            in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path to list.

        Returns:
            A list of paths that are files in the given directory.
        """
        # remove s3 prefix if given, so we can remove the directory later as
        # this method is expected to only return filenames
        path = convert_to_str(path)
        if path.startswith("s3://"):
            path = path[5:]

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a file info dict returned by the S3 filesystem.

            Args:
                file_dict: A file info dict returned by the S3 filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["Key"])
            base_name = file_path[len(path) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
            # s3fs.listdir also returns the root directory, so we filter
            # it out here
            if _extract_basename(dict_)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path to create.
        """
        self.filesystem.makedir(path=path)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path of the file to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: The path to get stat info for.

        Returns:
            A dictionary containing the stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def size(self, path: PathType) -> int:
        """Get the size of a file in bytes.

        Args:
            path: The path to the file.

        Returns:
            The size of the file in bytes.
        """
        return self.filesystem.size(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
                directory path, a list of directories inside the current directory
                and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        for directory, subdirectories, files in self.filesystem.walk(path=top):
            yield f"s3://{directory}", subdirectories, files

    def _remove_previous_file_versions(self, path: PathType) -> None:
        """Keep only the latest file version in the given path.

        Method is useful for logs stored in versioned file systems
        like AWS S3.

        Args:
            path: The path to the file.
        """
        if self.is_versioned:
            if isinstance(path, bytes):
                path = path.decode()
            _, prefix = split_s3_path(path)
            with self._shield_lack_of_versioning_permissions(
                "s3:ListBucketVersions"
            ):
                for version in self._boto3_bucket.object_versions.filter(
                    Prefix=prefix
                ):
                    if not version.is_latest:
                        with self._shield_lack_of_versioning_permissions(
                            "s3:DeleteObjectVersion"
                        ):
                            version.delete()

    @property
    def _boto3_bucket(self) -> Any:
        """Get the boto3 bucket object.

        Returns:
            The boto3 bucket object.
        """
        if self._boto3_bucket_holder and not self.connector_has_expired():
            return self._boto3_bucket_holder

        key, secret, token, region = self.get_credentials()
        s3 = boto3.resource(
            "s3",
            aws_access_key_id=key,
            aws_secret_access_key=secret,
            aws_session_token=token,
            region_name=region,
        )
        self._boto3_bucket_holder = s3.Bucket(self.config.bucket)
        return self._boto3_bucket_holder

    @contextmanager
    def _shield_lack_of_versioning_permissions(
        self, auth_missing: str
    ) -> Generator[Any, None, None]:
        try:
            yield
        except ClientError as e:
            if "not authorized" in e.args[0] and auth_missing in e.args[0]:
                logger.warning(
                    "Your AWS Connector is lacking critical Versioning permissions. "
                    f"Please check that `{auth_missing}` is granted."
                    "This is needed to remove previous versions of log files from your "
                    "Artifact Store bucket."
                )
                self.is_versioned = False
config: S3ArtifactStoreConfig property readonly

Get the config of this artifact store.

Returns:

Type Description
S3ArtifactStoreConfig

The config of this artifact store.

filesystem: ZenMLS3Filesystem property readonly

The s3 filesystem to access this artifact store.

Returns:

Type Description
ZenMLS3Filesystem

The s3 filesystem.

__init__(self, *args, **kwargs) special

Initializes the artifact store.

Parameters:

Name Type Description Default
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initializes the artifact store.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self._boto3_bucket_holder = None

    # determine bucket versioning status
    versioning = self._boto3_bucket.Versioning()
    with self._shield_lack_of_versioning_permissions(
        "s3:GetBucketVersioning"
    ):
        if versioning.status == "Enabled":
            self.is_versioned = True
            logger.warning(
                f"The artifact store bucket `{self.config.bucket}` is versioned, "
                "this may slow down logging process significantly."
            )
cleanup(self)

Close the filesystem.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def cleanup(self) -> None:
    """Close the filesystem."""
    if self._filesystem:
        self._filesystem.close()
copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials(self)

Gets authentication credentials.

If an authentication secret is configured, the secret values are returned. Otherwise, we fall back to the plain text component attributes.

Returns:

Type Description
Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]

Tuple (key, secret, token, region) of credentials used to authenticate with the S3 filesystem.

Exceptions:

Type Description
RuntimeError

If the AWS connector behaves unexpectedly.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def get_credentials(
    self,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
    """Gets authentication credentials.

    If an authentication secret is configured, the secret values are
    returned. Otherwise, we fall back to the plain text component
    attributes.

    Returns:
        Tuple (key, secret, token, region) of credentials used to
        authenticate with the S3 filesystem.

    Raises:
        RuntimeError: If the AWS connector behaves unexpectedly.
    """
    connector = self.get_connector()
    if connector:
        from botocore.client import BaseClient

        client = connector.connect()
        if not isinstance(client, BaseClient):
            raise RuntimeError(
                f"Expected a botocore.client.BaseClient while trying to "
                f"use the linked connector, but got {type(client)}."
            )
        credentials = client.credentials
        return (
            credentials.access_key,
            credentials.secret_key,
            credentials.token,
            client.meta.region_name,
        )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AWSSecretSchema
    )
    if secret:
        return (
            secret.aws_access_key_id,
            secret.aws_secret_access_key,
            secret.aws_session_token,
            None,
        )
    else:
        return self.config.key, self.config.secret, self.config.token, None
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that are files in the given directory.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of paths that are files in the given directory.
    """
    # remove s3 prefix if given, so we can remove the directory later as
    # this method is expected to only return filenames
    path = convert_to_str(path)
    if path.startswith("s3://"):
        path = path[5:]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by the S3 filesystem.

        Args:
            file_dict: A file info dict returned by the S3 filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["Key"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
        # s3fs.listdir also returns the root directory, so we filter
        # it out here
        if _extract_basename(dict_)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the file to remove.

required
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(self, path)

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary containing the stat info.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        A dictionary containing the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for directory, subdirectories, files in self.filesystem.walk(path=top):
        yield f"s3://{directory}", subdirectories, files
ZenMLS3Filesystem (S3FileSystem)

Modified s3fs.S3FileSystem to disable caching.

The original s3fs.S3FileSystem caches all class instances based on the constructor input arguments and it never releases them. This is problematic in the context of the ZenML server, because the server is a long-running process that instantiates many S3 filesystems with different credentials, especially when the credentials are generated by service connectors.

The caching behavior of s3fs causes the server to slowly consume more and more memory over time until it crashes. This class disables the caching behavior of s3fs by setting the cachable attribute to False.

In addition to disabling instance caching, this class also provides a correct cleanup implementation by overriding the close_session method the S3 aiobotocore client. The original one provided by s3fs was causing memory leaks by creating a new event loop in the destructor instead of using the existing one.

A close method is also provided to allow for synchronous on-demand cleanup of the S3 client.

Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
class ZenMLS3Filesystem(s3fs.S3FileSystem):  # type: ignore[misc]
    """Modified s3fs.S3FileSystem to disable caching.

    The original s3fs.S3FileSystem caches all class instances based on the
    constructor input arguments and it never releases them. This is problematic
    in the context of the ZenML server, because the server is a long-running
    process that instantiates many S3 filesystems with different credentials,
    especially when the credentials are generated by service connectors.

    The caching behavior of s3fs causes the server to slowly consume more and
    more memory over time until it crashes. This class disables the caching
    behavior of s3fs by setting the `cachable` attribute to `False`.

    In addition to disabling instance caching, this class also provides a
    correct cleanup implementation by overriding the `close_session` method
    the S3 aiobotocore client. The original one provided by s3fs was causing
    memory leaks by creating a new event loop in the destructor instead of
    using the existing one.

    A `close` method is also provided to allow for synchronous on-demand cleanup
    of the S3 client.
    """

    cachable = False

    async def _close(self) -> None:
        """Close the S3 client."""
        if self._s3creator is not None:  # type: ignore[has-type]
            await self._s3creator.__aexit__(None, None, None)  # type: ignore[has-type]
            self._s3creator = None
            self._s3 = None

    close = sync_wrapper(_close)

    @staticmethod
    def close_session(loop: Any, s3: Any) -> None:
        """Close the S3 client session.

        Args:
            loop: The event loop to use for closing the session.
            s3: The S3 client to close.
        """
        # IMPORTANT: This method is a copy of the original close_session method
        # from s3fs.S3FileSystem. The only difference is that it uses the
        # provided event loop instead of creating a new one.
        if loop is not None and loop.is_running():
            try:
                # NOTE: this is the line in the original method that causes
                # the memory leak
                # loop = asyncio.get_event_loop()
                loop.create_task(s3.__aexit__(None, None, None))
                return
            except RuntimeError:
                pass
            try:
                sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
                return
            except FSTimeoutError:
                pass
        try:
            # close the actual socket
            s3._client._endpoint.http_session._connector._close()
        except AttributeError:
            # but during shutdown, it may have gone
            pass
close_session(loop, s3) staticmethod

Close the S3 client session.

Parameters:

Name Type Description Default
loop Any

The event loop to use for closing the session.

required
s3 Any

The S3 client to close.

required
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
@staticmethod
def close_session(loop: Any, s3: Any) -> None:
    """Close the S3 client session.

    Args:
        loop: The event loop to use for closing the session.
        s3: The S3 client to close.
    """
    # IMPORTANT: This method is a copy of the original close_session method
    # from s3fs.S3FileSystem. The only difference is that it uses the
    # provided event loop instead of creating a new one.
    if loop is not None and loop.is_running():
        try:
            # NOTE: this is the line in the original method that causes
            # the memory leak
            # loop = asyncio.get_event_loop()
            loop.create_task(s3.__aexit__(None, None, None))
            return
        except RuntimeError:
            pass
        try:
            sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
            return
        except FSTimeoutError:
            pass
    try:
        # close the actual socket
        s3._client._endpoint.http_session._connector._close()
    except AttributeError:
        # but during shutdown, it may have gone
        pass

flavors special

Amazon S3 integration flavors.

s3_artifact_store_flavor

Amazon S3 artifact store flavor.

S3ArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)

Configuration for the S3 Artifact Store.

All attributes of this class except path will be passed to the s3fs.S3FileSystem initialization. See here for more information on how to use those configuration options to connect to any S3-compatible storage.

When you want to register an S3ArtifactStore from the CLI and need to pass client_kwargs, config_kwargs or s3_additional_kwargs, you should pass them as a json string:

zenml artifact-store register my_s3_store --flavor=s3     --path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
class S3ArtifactStoreConfig(
    BaseArtifactStoreConfig, AuthenticationConfigMixin
):
    """Configuration for the S3 Artifact Store.

    All attributes of this class except `path` will be passed to the
    `s3fs.S3FileSystem` initialization. See
    [here](https://s3fs.readthedocs.io/en/latest/) for more information on how
    to use those configuration options to connect to any S3-compatible storage.

    When you want to register an S3ArtifactStore from the CLI and need to pass
    `client_kwargs`, `config_kwargs` or `s3_additional_kwargs`, you should pass
    them as a json string:
    ```
    zenml artifact-store register my_s3_store --flavor=s3 \
    --path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
    ```
    """

    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}

    key: Optional[str] = SecretField(default=None)
    secret: Optional[str] = SecretField(default=None)
    token: Optional[str] = SecretField(default=None)
    client_kwargs: Optional[Dict[str, Any]] = None
    config_kwargs: Optional[Dict[str, Any]] = None
    s3_additional_kwargs: Optional[Dict[str, Any]] = None

    _bucket: Optional[str] = None

    @field_validator("client_kwargs")
    @classmethod
    def _validate_client_kwargs(
        cls, value: Optional[Dict[str, Any]]
    ) -> Optional[Dict[str, Any]]:
        """Validates the `client_kwargs` attribute.

        Args:
            value: The value to validate.

        Raises:
            ValueError: If the value is not a valid URL.

        Returns:
            The validated value.
        """
        if value is None:
            return value

        if "endpoint_url" in value and value["endpoint_url"]:
            url = value["endpoint_url"].rstrip("/")
            scheme = re.search("^([a-z0-9]+://)", url)
            if scheme is None or scheme.group() not in ("https://", "http://"):
                raise ValueError(
                    "Invalid URL for endpoint url: {url}. Should be in the form "
                    "https://hostname[:port] or http://hostname[:port]."
                )

            # When running inside a container, if the URL uses localhost, the
            # target service will not be available. We try to replace localhost
            # with one of the special Docker or K3D internal hostnames.
            value["endpoint_url"] = replace_localhost_with_internal_hostname(
                url
            )
        return value

    @property
    def bucket(self) -> str:
        """The bucket name of the artifact store.

        Returns:
            The bucket name of the artifact store.
        """
        if self._bucket is None:
            self._bucket, _ = split_s3_path(self.path)
        return self._bucket
bucket: str property readonly

The bucket name of the artifact store.

Returns:

Type Description
str

The bucket name of the artifact store.

model_post_init(/, self, context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Parameters:

Name Type Description Default
self BaseModel

The BaseModel instance.

required
context Any

The context.

required
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
def init_private_attributes(self: BaseModel, context: Any, /) -> None:
    """This function is meant to behave like a BaseModel method to initialise private attributes.

    It takes context as an argument since that's what pydantic-core passes when calling it.

    Args:
        self: The BaseModel instance.
        context: The context.
    """
    if getattr(self, '__pydantic_private__', None) is None:
        pydantic_private = {}
        for name, private_attr in self.__private_attributes__.items():
            default = private_attr.get_default()
            if default is not PydanticUndefined:
                pydantic_private[name] = default
        object_setattr(self, '__pydantic_private__', pydantic_private)
S3ArtifactStoreFlavor (BaseArtifactStoreFlavor)

Flavor of the S3 artifact store.

Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
class S3ArtifactStoreFlavor(BaseArtifactStoreFlavor):
    """Flavor of the S3 artifact store."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return S3_ARTIFACT_STORE_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type="s3-bucket",
            resource_id_attr="path",
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A URL to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A URL to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A URL to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/aws.png"

    @property
    def config_class(self) -> Type[S3ArtifactStoreConfig]:
        """The config class of the flavor.

        Returns:
            The config class of the flavor.
        """
        return S3ArtifactStoreConfig

    @property
    def implementation_class(self) -> Type["S3ArtifactStore"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class for this flavor.
        """
        from zenml.integrations.s3.artifact_stores import S3ArtifactStore

        return S3ArtifactStore
config_class: Type[zenml.integrations.s3.flavors.s3_artifact_store_flavor.S3ArtifactStoreConfig] property readonly

The config class of the flavor.

Returns:

Type Description
Type[zenml.integrations.s3.flavors.s3_artifact_store_flavor.S3ArtifactStoreConfig]

The config class of the flavor.

docs_url: Optional[str] property readonly

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[S3ArtifactStore] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[S3ArtifactStore]

The implementation class for this flavor.

logo_url: str property readonly

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

utils

Utility methods for S3.

split_s3_path(s3_path)

Split S3 URI into bucket and key.

Parameters:

Name Type Description Default
s3_path str

S3 URI (e.g. "s3://bucket/path")

required

Returns:

Type Description
A tuple of bucket and key, for "s3

//bucket/path/path2" it will return ("bucket","path/path2")

Exceptions:

Type Description
ValueError

if the S3 URI is invalid

Source code in zenml/integrations/s3/utils.py
def split_s3_path(s3_path: str) -> Tuple[str, str]:
    """Split S3 URI into bucket and key.

    Args:
        s3_path: S3 URI (e.g. "s3://bucket/path")

    Returns:
        A tuple of bucket and key, for "s3://bucket/path/path2"
            it will return ("bucket","path/path2")

    Raises:
        ValueError: if the S3 URI is invalid
    """
    if not s3_path.startswith("s3://"):
        raise ValueError(
            f"Invalid S3 URI given: {s3_path}. "
            "It should start with `s3://`."
        )
    path_parts = s3_path.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)
    return bucket, key