Skip to content

S3

zenml.integrations.s3

Initialization of the S3 integration.

The S3 integration allows the use of cloud artifact stores and file operations on S3 buckets.

Attributes

S3 = 's3' module-attribute

S3_ARTIFACT_STORE_FLAVOR = 's3' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

S3Integration

Bases: Integration

Definition of S3 integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the s3 integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/s3/__init__.py
43
44
45
46
47
48
49
50
51
52
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the s3 integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor

    return [S3ArtifactStoreFlavor]

Modules

artifact_stores

Initialization of the S3 Artifact Store.

Classes
S3ArtifactStore(*args: Any, **kwargs: Any)

Bases: BaseArtifactStore, AuthenticationMixin

Artifact Store for S3 based artifacts.

Initializes the artifact store.

Parameters:

Name Type Description Default
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initializes the artifact store.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self._boto3_bucket_holder = None

    # determine bucket versioning status
    versioning = self._boto3_bucket.Versioning()
    with self._shield_lack_of_versioning_permissions(
        "s3:GetBucketVersioning"
    ):
        if versioning.status == "Enabled":
            self.is_versioned = True
            logger.warning(
                f"The artifact store bucket `{self.config.bucket}` is versioned, "
                "this may slow down logging process significantly."
            )
Attributes
config: S3ArtifactStoreConfig property

Get the config of this artifact store.

Returns:

Type Description
S3ArtifactStoreConfig

The config of this artifact store.

filesystem: ZenMLS3Filesystem property

The s3 filesystem to access this artifact store.

Returns:

Type Description
ZenMLS3Filesystem

The s3 filesystem.

Functions
cleanup() -> None

Close the filesystem.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
238
239
240
241
def cleanup(self) -> None:
    """Close the filesystem."""
    if self._filesystem:
        self._filesystem.close()
copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file.

Parameters:

Name Type Description Default
src PathType

The path to copy from.

required
dst PathType

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(path: PathType) -> bool

Check whether a path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
282
283
284
285
286
287
288
289
290
291
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials() -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]

Gets authentication credentials.

If an authentication secret is configured, the secret values are returned. Otherwise, we fall back to the plain text component attributes.

Returns:

Type Description
Optional[str]

Tuple (key, secret, token, region) of credentials used to

Optional[str]

authenticate with the S3 filesystem.

Raises:

Type Description
RuntimeError

If the AWS connector behaves unexpectedly.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def get_credentials(
    self,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
    """Gets authentication credentials.

    If an authentication secret is configured, the secret values are
    returned. Otherwise, we fall back to the plain text component
    attributes.

    Returns:
        Tuple (key, secret, token, region) of credentials used to
        authenticate with the S3 filesystem.

    Raises:
        RuntimeError: If the AWS connector behaves unexpectedly.
    """
    connector = self.get_connector()
    if connector:
        from botocore.client import BaseClient

        client = connector.connect()
        if not isinstance(client, BaseClient):
            raise RuntimeError(
                f"Expected a botocore.client.BaseClient while trying to "
                f"use the linked connector, but got {type(client)}."
            )
        credentials = client.credentials
        return (
            credentials.access_key,
            credentials.secret_key,
            credentials.token,
            client.meta.region_name,
        )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AWSSecretSchema
    )
    if secret:
        return (
            secret.aws_access_key_id,
            secret.aws_secret_access_key,
            secret.aws_session_token,
            None,
        )
    else:
        return self.config.key, self.config.secret, self.config.token, None
glob(pattern: PathType) -> List[PathType]

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern PathType

The glob pattern to match, see details above.

required

Returns:

Type Description
List[PathType]

A list of paths that match the given glob pattern.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
isdir(path: PathType) -> bool

Check whether a path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
311
312
313
314
315
316
317
318
319
320
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(path: PathType) -> List[PathType]

Return a list of files in a directory.

Parameters:

Name Type Description Default
path PathType

The path to list.

required

Returns:

Type Description
List[PathType]

A list of paths that are files in the given directory.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of paths that are files in the given directory.
    """
    # remove s3 prefix if given, so we can remove the directory later as
    # this method is expected to only return filenames
    path = convert_to_str(path)
    if path.startswith("s3://"):
        path = path[5:]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by the S3 filesystem.

        Args:
            file_dict: A file info dict returned by the S3 filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["Key"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
        # s3fs.listdir also returns the root directory, so we filter
        # it out here
        if _extract_basename(dict_)
    ]
makedirs(path: PathType) -> None

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
358
359
360
361
362
363
364
365
366
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(path: PathType) -> None

Create a directory at the given path.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
368
369
370
371
372
373
374
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path)
open(path: PathType, mode: str = 'r') -> Any

Open a file at the given path.

Parameters:

Name Type Description Default
path PathType

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
243
244
245
246
247
248
249
250
251
252
253
254
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(path: PathType) -> None

Remove the file at the given path.

Parameters:

Name Type Description Default
path PathType

The path of the file to remove.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
376
377
378
379
380
381
382
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename source file to destination file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(path: PathType) -> None

Remove the given directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to remove.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
410
411
412
413
414
415
416
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(path: PathType) -> int

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
429
430
431
432
433
434
435
436
437
438
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(path: PathType) -> Dict[str, Any]

Return stat info for the given path.

Parameters:

Name Type Description Default
path PathType

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary containing the stat info.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
418
419
420
421
422
423
424
425
426
427
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        A dictionary containing the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., None]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for directory, subdirectories, files in self.filesystem.walk(path=top):
        yield f"s3://{directory}", subdirectories, files
Modules
s3_artifact_store

Implementation of the S3 Artifact Store.

Classes
S3ArtifactStore(*args: Any, **kwargs: Any)

Bases: BaseArtifactStore, AuthenticationMixin

Artifact Store for S3 based artifacts.

Initializes the artifact store.

Parameters:

Name Type Description Default
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initializes the artifact store.

    Args:
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self._boto3_bucket_holder = None

    # determine bucket versioning status
    versioning = self._boto3_bucket.Versioning()
    with self._shield_lack_of_versioning_permissions(
        "s3:GetBucketVersioning"
    ):
        if versioning.status == "Enabled":
            self.is_versioned = True
            logger.warning(
                f"The artifact store bucket `{self.config.bucket}` is versioned, "
                "this may slow down logging process significantly."
            )
Attributes
config: S3ArtifactStoreConfig property

Get the config of this artifact store.

Returns:

Type Description
S3ArtifactStoreConfig

The config of this artifact store.

filesystem: ZenMLS3Filesystem property

The s3 filesystem to access this artifact store.

Returns:

Type Description
ZenMLS3Filesystem

The s3 filesystem.

Functions
cleanup() -> None

Close the filesystem.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
238
239
240
241
def cleanup(self) -> None:
    """Close the filesystem."""
    if self._filesystem:
        self._filesystem.close()
copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file.

Parameters:

Name Type Description Default
src PathType

The path to copy from.

required
dst PathType

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(path: PathType) -> bool

Check whether a path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
282
283
284
285
286
287
288
289
290
291
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials() -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]

Gets authentication credentials.

If an authentication secret is configured, the secret values are returned. Otherwise, we fall back to the plain text component attributes.

Returns:

Type Description
Optional[str]

Tuple (key, secret, token, region) of credentials used to

Optional[str]

authenticate with the S3 filesystem.

Raises:

Type Description
RuntimeError

If the AWS connector behaves unexpectedly.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def get_credentials(
    self,
) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
    """Gets authentication credentials.

    If an authentication secret is configured, the secret values are
    returned. Otherwise, we fall back to the plain text component
    attributes.

    Returns:
        Tuple (key, secret, token, region) of credentials used to
        authenticate with the S3 filesystem.

    Raises:
        RuntimeError: If the AWS connector behaves unexpectedly.
    """
    connector = self.get_connector()
    if connector:
        from botocore.client import BaseClient

        client = connector.connect()
        if not isinstance(client, BaseClient):
            raise RuntimeError(
                f"Expected a botocore.client.BaseClient while trying to "
                f"use the linked connector, but got {type(client)}."
            )
        credentials = client.credentials
        return (
            credentials.access_key,
            credentials.secret_key,
            credentials.token,
            client.meta.region_name,
        )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AWSSecretSchema
    )
    if secret:
        return (
            secret.aws_access_key_id,
            secret.aws_secret_access_key,
            secret.aws_session_token,
            None,
        )
    else:
        return self.config.key, self.config.secret, self.config.token, None
glob(pattern: PathType) -> List[PathType]

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern PathType

The glob pattern to match, see details above.

required

Returns:

Type Description
List[PathType]

A list of paths that match the given glob pattern.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
isdir(path: PathType) -> bool

Check whether a path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
311
312
313
314
315
316
317
318
319
320
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(path: PathType) -> List[PathType]

Return a list of files in a directory.

Parameters:

Name Type Description Default
path PathType

The path to list.

required

Returns:

Type Description
List[PathType]

A list of paths that are files in the given directory.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of paths that are files in the given directory.
    """
    # remove s3 prefix if given, so we can remove the directory later as
    # this method is expected to only return filenames
    path = convert_to_str(path)
    if path.startswith("s3://"):
        path = path[5:]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by the S3 filesystem.

        Args:
            file_dict: A file info dict returned by the S3 filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["Key"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
        # s3fs.listdir also returns the root directory, so we filter
        # it out here
        if _extract_basename(dict_)
    ]
makedirs(path: PathType) -> None

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
358
359
360
361
362
363
364
365
366
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(path: PathType) -> None

Create a directory at the given path.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
368
369
370
371
372
373
374
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path)
open(path: PathType, mode: str = 'r') -> Any

Open a file at the given path.

Parameters:

Name Type Description Default
path PathType

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
243
244
245
246
247
248
249
250
251
252
253
254
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(path: PathType) -> None

Remove the file at the given path.

Parameters:

Name Type Description Default
path PathType

The path of the file to remove.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
376
377
378
379
380
381
382
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename source file to destination file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(path: PathType) -> None

Remove the given directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to remove.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
410
411
412
413
414
415
416
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(path: PathType) -> int

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
429
430
431
432
433
434
435
436
437
438
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(path: PathType) -> Dict[str, Any]

Return stat info for the given path.

Parameters:

Name Type Description Default
path PathType

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary containing the stat info.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
418
419
420
421
422
423
424
425
426
427
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        A dictionary containing the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., None]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for directory, subdirectories, files in self.filesystem.walk(path=top):
        yield f"s3://{directory}", subdirectories, files
ZenMLS3Filesystem

Bases: S3FileSystem

Modified s3fs.S3FileSystem to disable caching.

The original s3fs.S3FileSystem caches all class instances based on the constructor input arguments and it never releases them. This is problematic in the context of the ZenML server, because the server is a long-running process that instantiates many S3 filesystems with different credentials, especially when the credentials are generated by service connectors.

The caching behavior of s3fs causes the server to slowly consume more and more memory over time until it crashes. This class disables the caching behavior of s3fs by setting the cachable attribute to False.

In addition to disabling instance caching, this class also provides a correct cleanup implementation by overriding the close_session method the S3 aiobotocore client. The original one provided by s3fs was causing memory leaks by creating a new event loop in the destructor instead of using the existing one.

A close method is also provided to allow for synchronous on-demand cleanup of the S3 client.

Functions
close_session(loop: Any, s3: Any) -> None staticmethod

Close the S3 client session.

Parameters:

Name Type Description Default
loop Any

The event loop to use for closing the session.

required
s3 Any

The S3 client to close.

required
Source code in src/zenml/integrations/s3/artifact_stores/s3_artifact_store.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@staticmethod
def close_session(loop: Any, s3: Any) -> None:
    """Close the S3 client session.

    Args:
        loop: The event loop to use for closing the session.
        s3: The S3 client to close.
    """
    # IMPORTANT: This method is a copy of the original close_session method
    # from s3fs.S3FileSystem. The only difference is that it uses the
    # provided event loop instead of creating a new one.
    if loop is not None and loop.is_running():
        try:
            # NOTE: this is the line in the original method that causes
            # the memory leak
            # loop = asyncio.get_event_loop()
            loop.create_task(s3.__aexit__(None, None, None))
            return
        except RuntimeError:
            pass
        try:
            sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
            return
        except FSTimeoutError:
            pass
    try:
        # close the actual socket
        s3._client._endpoint.http_session._connector._close()
    except AttributeError:
        # but during shutdown, it may have gone
        pass
Functions

flavors

Amazon S3 integration flavors.

Classes
S3ArtifactStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseArtifactStoreConfig, AuthenticationConfigMixin

Configuration for the S3 Artifact Store.

All attributes of this class except path will be passed to the s3fs.S3FileSystem initialization. See here for more information on how to use those configuration options to connect to any S3-compatible storage.

When you want to register an S3ArtifactStore from the CLI and need to pass client_kwargs, config_kwargs or s3_additional_kwargs, you should pass them as a json string:

zenml artifact-store register my_s3_store --flavor=s3     --path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
bucket: str property

The bucket name of the artifact store.

Returns:

Type Description
str

The bucket name of the artifact store.

S3ArtifactStoreFlavor

Bases: BaseArtifactStoreFlavor

Flavor of the S3 artifact store.

Attributes
config_class: Type[S3ArtifactStoreConfig] property

The config class of the flavor.

Returns:

Type Description
Type[S3ArtifactStoreConfig]

The config class of the flavor.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[S3ArtifactStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[S3ArtifactStore]

The implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

Modules
s3_artifact_store_flavor

Amazon S3 artifact store flavor.

Classes
S3ArtifactStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseArtifactStoreConfig, AuthenticationConfigMixin

Configuration for the S3 Artifact Store.

All attributes of this class except path will be passed to the s3fs.S3FileSystem initialization. See here for more information on how to use those configuration options to connect to any S3-compatible storage.

When you want to register an S3ArtifactStore from the CLI and need to pass client_kwargs, config_kwargs or s3_additional_kwargs, you should pass them as a json string:

zenml artifact-store register my_s3_store --flavor=s3     --path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
bucket: str property

The bucket name of the artifact store.

Returns:

Type Description
str

The bucket name of the artifact store.

S3ArtifactStoreFlavor

Bases: BaseArtifactStoreFlavor

Flavor of the S3 artifact store.

Attributes
config_class: Type[S3ArtifactStoreConfig] property

The config class of the flavor.

Returns:

Type Description
Type[S3ArtifactStoreConfig]

The config class of the flavor.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[S3ArtifactStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[S3ArtifactStore]

The implementation class for this flavor.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

Functions

utils

Utility methods for S3.

Functions
split_s3_path(s3_path: str) -> Tuple[str, str]

Split S3 URI into bucket and key.

Parameters:

Name Type Description Default
s3_path str

S3 URI (e.g. "s3://bucket/path")

required

Returns:

Type Description
Tuple[str, str]

A tuple of bucket and key, for "s3://bucket/path/path2" it will return ("bucket","path/path2")

Raises:

Type Description
ValueError

if the S3 URI is invalid

Source code in src/zenml/integrations/s3/utils.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def split_s3_path(s3_path: str) -> Tuple[str, str]:
    """Split S3 URI into bucket and key.

    Args:
        s3_path: S3 URI (e.g. "s3://bucket/path")

    Returns:
        A tuple of bucket and key, for "s3://bucket/path/path2"
            it will return ("bucket","path/path2")

    Raises:
        ValueError: if the S3 URI is invalid
    """
    if not s3_path.startswith("s3://"):
        raise ValueError(
            f"Invalid S3 URI given: {s3_path}. It should start with `s3://`."
        )
    path_parts = s3_path.replace("s3://", "").split("/")
    bucket = path_parts.pop(0)
    key = "/".join(path_parts)
    return bucket, key