Skip to content

Materializers

Initialization of ZenML materializers.

Materializers are used to convert a ZenML artifact into a specific format. They are most often used to handle the input or output of ZenML steps, and can be extended by building on the BaseMaterializer class.

BuiltInContainerMaterializer

Bases: BaseMaterializer

Handle built-in container types (dict, list, set, tuple).

Source code in src/zenml/materializers/built_in_materializer.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
class BuiltInContainerMaterializer(BaseMaterializer):
    """Handle built-in container types (dict, list, set, tuple)."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        dict,
        list,
        set,
        tuple,
    )

    def __init__(
        self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
    ):
        """Define `self.data_path` and `self.metadata_path`.

        Args:
            uri: The URI where the artifact data is stored.
            artifact_store: The artifact store where the artifact data is stored.
        """
        super().__init__(uri, artifact_store)
        self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)
        self.metadata_path = os.path.join(self.uri, DEFAULT_METADATA_FILENAME)

    def load(self, data_type: Type[Any]) -> Any:
        """Reads a materialized built-in container object.

        If the data was serialized to JSON, deserialize it.

        Otherwise, reconstruct all elements according to the metadata file:
            1. Resolve the data type using `find_type_by_str()`,
            2. Get the materializer via the `default_materializer_registry`,
            3. Initialize the materializer with the desired path,
            4. Use `load()` of that materializer to load the element.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.

        Raises:
            RuntimeError: If the data was not found.
        """
        # If the data was not serialized, there must be metadata present.
        if not self.artifact_store.exists(
            self.data_path
        ) and not self.artifact_store.exists(self.metadata_path):
            raise RuntimeError(
                f"Materialization of type {data_type} failed. Expected either"
                f"{self.data_path} or {self.metadata_path} to exist."
            )

        # If the data was serialized as JSON, deserialize it.
        if self.artifact_store.exists(self.data_path):
            outputs = yaml_utils.read_json(self.data_path)

        # Otherwise, use the metadata to reconstruct the data as a list.
        else:
            metadata = yaml_utils.read_json(self.metadata_path)
            outputs = []

            # Backwards compatibility for zenml <= 0.37.0
            if isinstance(metadata, dict):
                for path_, type_str in zip(
                    metadata["paths"], metadata["types"]
                ):
                    type_ = find_type_by_str(type_str)
                    materializer_class = materializer_registry[type_]
                    materializer = materializer_class(uri=path_)
                    element = materializer.load(type_)
                    outputs.append(element)

            # New format for zenml > 0.37.0
            elif isinstance(metadata, list):
                for entry in metadata:
                    path_ = entry["path"]
                    type_ = source_utils.load(entry["type"])
                    materializer_class = source_utils.load(
                        entry["materializer"]
                    )
                    materializer = materializer_class(uri=path_)
                    element = materializer.load(type_)
                    outputs.append(element)

            else:
                raise RuntimeError(f"Unknown metadata format: {metadata}.")

        # Cast the data to the correct type.
        if issubclass(data_type, dict) and not isinstance(outputs, dict):
            keys, values = outputs
            return data_type(zip(keys, values))
        if issubclass(data_type, tuple) and not isinstance(outputs, tuple):
            return data_type(outputs)
        if issubclass(data_type, set) and not isinstance(outputs, set):
            return data_type(outputs)
        return outputs

    def save(self, data: Any) -> None:
        """Materialize a built-in container object.

        If the object can be serialized to JSON, serialize it.

        Otherwise, use the `default_materializer_registry` to find the correct
        materializer for each element and materialize each element into a
        subdirectory.

        Tuples and sets are cast to list before materialization.

        For non-serializable dicts, materialize keys/values as separate lists.

        Args:
            data: The built-in container object to materialize.

        Raises:
            Exception: If any exception occurs, it is raised after cleanup.
        """
        # tuple and set: handle as list.
        if isinstance(data, tuple) or isinstance(data, set):
            data = list(data)

        # If the data is serializable, just write it into a single JSON file.
        if _is_serializable(data):
            yaml_utils.write_json(
                self.data_path,
                data,
                ensure_ascii=not ZENML_MATERIALIZER_ALLOW_NON_ASCII_JSON_DUMPS,
            )
            return

        # non-serializable dict: Handle as non-serializable list of lists.
        if isinstance(data, dict):
            data = [list(data.keys()), list(data.values())]

        # non-serializable list: Materialize each element into a subfolder.
        # Get path, type, and corresponding materializer for each element.
        metadata: List[Dict[str, str]] = []
        materializers: List[BaseMaterializer] = []
        try:
            for i, element in enumerate(data):
                element_path = os.path.join(self.uri, str(i))
                self.artifact_store.mkdir(element_path)
                type_ = type(element)
                materializer_class = materializer_registry[type_]
                materializer = materializer_class(uri=element_path)
                materializers.append(materializer)
                metadata.append(
                    {
                        "path": element_path,
                        "type": source_utils.resolve(type_).import_path,
                        "materializer": source_utils.resolve(
                            materializer_class
                        ).import_path,
                    }
                )
            # Write metadata as JSON.
            yaml_utils.write_json(self.metadata_path, metadata)
            # Materialize each element.
            for element, materializer in zip(data, materializers):
                materializer.validate_save_type_compatibility(type(element))
                materializer.save(element)
        # If an error occurs, delete all created files.
        except Exception as e:
            # Delete metadata
            if self.artifact_store.exists(self.metadata_path):
                self.artifact_store.remove(self.metadata_path)
            # Delete all elements that were already saved.
            for entry in metadata:
                self.artifact_store.rmtree(entry["path"])
            raise e

    # save dict type objects to JSON file with JSON visualization type
    def save_visualizations(self, data: Any) -> Dict[str, "VisualizationType"]:
        """Save visualizations for the given data.

        Args:
            data: The data to save visualizations for.

        Returns:
            A dictionary of visualization URIs and their types.
        """
        # dict/list type objects are always saved as JSON files
        # doesn't work for non-serializable types as they
        # are saved as list of lists in different files
        if _is_serializable(data):
            return {self.data_path.replace("\\", "/"): VisualizationType.JSON}
        return {}

    def extract_metadata(self, data: Any) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given built-in container object.

        Args:
            data: The built-in container object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        if hasattr(data, "__len__"):
            return {"length": len(data)}
        return {}

__init__(uri, artifact_store=None)

Define self.data_path and self.metadata_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
artifact_store Optional[BaseArtifactStore]

The artifact store where the artifact data is stored.

None
Source code in src/zenml/materializers/built_in_materializer.py
273
274
275
276
277
278
279
280
281
282
283
284
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Define `self.data_path` and `self.metadata_path`.

    Args:
        uri: The URI where the artifact data is stored.
        artifact_store: The artifact store where the artifact data is stored.
    """
    super().__init__(uri, artifact_store)
    self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)
    self.metadata_path = os.path.join(self.uri, DEFAULT_METADATA_FILENAME)

extract_metadata(data)

Extract metadata from the given built-in container object.

Parameters:

Name Type Description Default
data Any

The built-in container object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/materializers/built_in_materializer.py
450
451
452
453
454
455
456
457
458
459
460
461
def extract_metadata(self, data: Any) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given built-in container object.

    Args:
        data: The built-in container object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    if hasattr(data, "__len__"):
        return {"length": len(data)}
    return {}

load(data_type)

Reads a materialized built-in container object.

If the data was serialized to JSON, deserialize it.

Otherwise, reconstruct all elements according to the metadata file: 1. Resolve the data type using find_type_by_str(), 2. Get the materializer via the default_materializer_registry, 3. Initialize the materializer with the desired path, 4. Use load() of that materializer to load the element.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Raises:

Type Description
RuntimeError

If the data was not found.

Source code in src/zenml/materializers/built_in_materializer.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def load(self, data_type: Type[Any]) -> Any:
    """Reads a materialized built-in container object.

    If the data was serialized to JSON, deserialize it.

    Otherwise, reconstruct all elements according to the metadata file:
        1. Resolve the data type using `find_type_by_str()`,
        2. Get the materializer via the `default_materializer_registry`,
        3. Initialize the materializer with the desired path,
        4. Use `load()` of that materializer to load the element.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.

    Raises:
        RuntimeError: If the data was not found.
    """
    # If the data was not serialized, there must be metadata present.
    if not self.artifact_store.exists(
        self.data_path
    ) and not self.artifact_store.exists(self.metadata_path):
        raise RuntimeError(
            f"Materialization of type {data_type} failed. Expected either"
            f"{self.data_path} or {self.metadata_path} to exist."
        )

    # If the data was serialized as JSON, deserialize it.
    if self.artifact_store.exists(self.data_path):
        outputs = yaml_utils.read_json(self.data_path)

    # Otherwise, use the metadata to reconstruct the data as a list.
    else:
        metadata = yaml_utils.read_json(self.metadata_path)
        outputs = []

        # Backwards compatibility for zenml <= 0.37.0
        if isinstance(metadata, dict):
            for path_, type_str in zip(
                metadata["paths"], metadata["types"]
            ):
                type_ = find_type_by_str(type_str)
                materializer_class = materializer_registry[type_]
                materializer = materializer_class(uri=path_)
                element = materializer.load(type_)
                outputs.append(element)

        # New format for zenml > 0.37.0
        elif isinstance(metadata, list):
            for entry in metadata:
                path_ = entry["path"]
                type_ = source_utils.load(entry["type"])
                materializer_class = source_utils.load(
                    entry["materializer"]
                )
                materializer = materializer_class(uri=path_)
                element = materializer.load(type_)
                outputs.append(element)

        else:
            raise RuntimeError(f"Unknown metadata format: {metadata}.")

    # Cast the data to the correct type.
    if issubclass(data_type, dict) and not isinstance(outputs, dict):
        keys, values = outputs
        return data_type(zip(keys, values))
    if issubclass(data_type, tuple) and not isinstance(outputs, tuple):
        return data_type(outputs)
    if issubclass(data_type, set) and not isinstance(outputs, set):
        return data_type(outputs)
    return outputs

save(data)

Materialize a built-in container object.

If the object can be serialized to JSON, serialize it.

Otherwise, use the default_materializer_registry to find the correct materializer for each element and materialize each element into a subdirectory.

Tuples and sets are cast to list before materialization.

For non-serializable dicts, materialize keys/values as separate lists.

Parameters:

Name Type Description Default
data Any

The built-in container object to materialize.

required

Raises:

Type Description
Exception

If any exception occurs, it is raised after cleanup.

Source code in src/zenml/materializers/built_in_materializer.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def save(self, data: Any) -> None:
    """Materialize a built-in container object.

    If the object can be serialized to JSON, serialize it.

    Otherwise, use the `default_materializer_registry` to find the correct
    materializer for each element and materialize each element into a
    subdirectory.

    Tuples and sets are cast to list before materialization.

    For non-serializable dicts, materialize keys/values as separate lists.

    Args:
        data: The built-in container object to materialize.

    Raises:
        Exception: If any exception occurs, it is raised after cleanup.
    """
    # tuple and set: handle as list.
    if isinstance(data, tuple) or isinstance(data, set):
        data = list(data)

    # If the data is serializable, just write it into a single JSON file.
    if _is_serializable(data):
        yaml_utils.write_json(
            self.data_path,
            data,
            ensure_ascii=not ZENML_MATERIALIZER_ALLOW_NON_ASCII_JSON_DUMPS,
        )
        return

    # non-serializable dict: Handle as non-serializable list of lists.
    if isinstance(data, dict):
        data = [list(data.keys()), list(data.values())]

    # non-serializable list: Materialize each element into a subfolder.
    # Get path, type, and corresponding materializer for each element.
    metadata: List[Dict[str, str]] = []
    materializers: List[BaseMaterializer] = []
    try:
        for i, element in enumerate(data):
            element_path = os.path.join(self.uri, str(i))
            self.artifact_store.mkdir(element_path)
            type_ = type(element)
            materializer_class = materializer_registry[type_]
            materializer = materializer_class(uri=element_path)
            materializers.append(materializer)
            metadata.append(
                {
                    "path": element_path,
                    "type": source_utils.resolve(type_).import_path,
                    "materializer": source_utils.resolve(
                        materializer_class
                    ).import_path,
                }
            )
        # Write metadata as JSON.
        yaml_utils.write_json(self.metadata_path, metadata)
        # Materialize each element.
        for element, materializer in zip(data, materializers):
            materializer.validate_save_type_compatibility(type(element))
            materializer.save(element)
    # If an error occurs, delete all created files.
    except Exception as e:
        # Delete metadata
        if self.artifact_store.exists(self.metadata_path):
            self.artifact_store.remove(self.metadata_path)
        # Delete all elements that were already saved.
        for entry in metadata:
            self.artifact_store.rmtree(entry["path"])
        raise e

save_visualizations(data)

Save visualizations for the given data.

Parameters:

Name Type Description Default
data Any

The data to save visualizations for.

required

Returns:

Type Description
Dict[str, VisualizationType]

A dictionary of visualization URIs and their types.

Source code in src/zenml/materializers/built_in_materializer.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def save_visualizations(self, data: Any) -> Dict[str, "VisualizationType"]:
    """Save visualizations for the given data.

    Args:
        data: The data to save visualizations for.

    Returns:
        A dictionary of visualization URIs and their types.
    """
    # dict/list type objects are always saved as JSON files
    # doesn't work for non-serializable types as they
    # are saved as list of lists in different files
    if _is_serializable(data):
        return {self.data_path.replace("\\", "/"): VisualizationType.JSON}
    return {}

BuiltInMaterializer

Bases: BaseMaterializer

Handle JSON-serializable basic types (bool, float, int, str).

Source code in src/zenml/materializers/built_in_materializer.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class BuiltInMaterializer(BaseMaterializer):
    """Handle JSON-serializable basic types (`bool`, `float`, `int`, `str`)."""

    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = BASIC_TYPES

    def __init__(
        self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
    ):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
            artifact_store: The artifact store where the artifact data is stored.
        """
        super().__init__(uri, artifact_store)
        self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)

    def load(
        self, data_type: Union[Type[bool], Type[float], Type[int], Type[str]]
    ) -> Any:
        """Reads basic primitive types from JSON.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        contents = yaml_utils.read_json(self.data_path)
        if type(contents) is not data_type:
            # TODO [ENG-142]: Raise error or try to coerce
            logger.debug(
                f"Contents {contents} was type {type(contents)} but expected "
                f"{data_type}"
            )
        return contents

    def save(self, data: Union[bool, float, int, str]) -> None:
        """Serialize a basic type to JSON.

        Args:
            data: The data to store.
        """
        yaml_utils.write_json(
            self.data_path,
            data,
            ensure_ascii=not ZENML_MATERIALIZER_ALLOW_NON_ASCII_JSON_DUMPS,
        )

    def extract_metadata(
        self, data: Union[bool, float, int, str]
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given built-in container object.

        Args:
            data: The built-in container object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        # For boolean and numbers, add the string representation as metadata.
        # We don't to this for strings because they can be arbitrarily long.
        if isinstance(data, (bool, float, int)):
            return {"string_representation": str(data)}

        return {}

__init__(uri, artifact_store=None)

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
artifact_store Optional[BaseArtifactStore]

The artifact store where the artifact data is stored.

None
Source code in src/zenml/materializers/built_in_materializer.py
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
        artifact_store: The artifact store where the artifact data is stored.
    """
    super().__init__(uri, artifact_store)
    self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)

extract_metadata(data)

Extract metadata from the given built-in container object.

Parameters:

Name Type Description Default
data Union[bool, float, int, str]

The built-in container object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/materializers/built_in_materializer.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def extract_metadata(
    self, data: Union[bool, float, int, str]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given built-in container object.

    Args:
        data: The built-in container object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    # For boolean and numbers, add the string representation as metadata.
    # We don't to this for strings because they can be arbitrarily long.
    if isinstance(data, (bool, float, int)):
        return {"string_representation": str(data)}

    return {}

load(data_type)

Reads basic primitive types from JSON.

Parameters:

Name Type Description Default
data_type Union[Type[bool], Type[float], Type[int], Type[str]]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in src/zenml/materializers/built_in_materializer.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def load(
    self, data_type: Union[Type[bool], Type[float], Type[int], Type[str]]
) -> Any:
    """Reads basic primitive types from JSON.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    contents = yaml_utils.read_json(self.data_path)
    if type(contents) is not data_type:
        # TODO [ENG-142]: Raise error or try to coerce
        logger.debug(
            f"Contents {contents} was type {type(contents)} but expected "
            f"{data_type}"
        )
    return contents

save(data)

Serialize a basic type to JSON.

Parameters:

Name Type Description Default
data Union[bool, float, int, str]

The data to store.

required
Source code in src/zenml/materializers/built_in_materializer.py
 98
 99
100
101
102
103
104
105
106
107
108
def save(self, data: Union[bool, float, int, str]) -> None:
    """Serialize a basic type to JSON.

    Args:
        data: The data to store.
    """
    yaml_utils.write_json(
        self.data_path,
        data,
        ensure_ascii=not ZENML_MATERIALIZER_ALLOW_NON_ASCII_JSON_DUMPS,
    )

BytesMaterializer

Bases: BaseMaterializer

Handle bytes data type, which is not JSON serializable.

Source code in src/zenml/materializers/built_in_materializer.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class BytesMaterializer(BaseMaterializer):
    """Handle `bytes` data type, which is not JSON serializable."""

    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (bytes,)

    def __init__(
        self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
    ):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
            artifact_store: The artifact store where the artifact data is stored.
        """
        super().__init__(uri, artifact_store)
        self.data_path = os.path.join(self.uri, DEFAULT_BYTES_FILENAME)

    def load(self, data_type: Type[Any]) -> Any:
        """Reads a bytes object from file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        with self.artifact_store.open(self.data_path, "rb") as file_:
            return file_.read()

    def save(self, data: Any) -> None:
        """Save a bytes object to file.

        Args:
            data: The data to store.
        """
        with self.artifact_store.open(self.data_path, "wb") as file_:
            file_.write(data)

__init__(uri, artifact_store=None)

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
artifact_store Optional[BaseArtifactStore]

The artifact store where the artifact data is stored.

None
Source code in src/zenml/materializers/built_in_materializer.py
135
136
137
138
139
140
141
142
143
144
145
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
        artifact_store: The artifact store where the artifact data is stored.
    """
    super().__init__(uri, artifact_store)
    self.data_path = os.path.join(self.uri, DEFAULT_BYTES_FILENAME)

load(data_type)

Reads a bytes object from file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in src/zenml/materializers/built_in_materializer.py
147
148
149
150
151
152
153
154
155
156
157
def load(self, data_type: Type[Any]) -> Any:
    """Reads a bytes object from file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    with self.artifact_store.open(self.data_path, "rb") as file_:
        return file_.read()

save(data)

Save a bytes object to file.

Parameters:

Name Type Description Default
data Any

The data to store.

required
Source code in src/zenml/materializers/built_in_materializer.py
159
160
161
162
163
164
165
166
def save(self, data: Any) -> None:
    """Save a bytes object to file.

    Args:
        data: The data to store.
    """
    with self.artifact_store.open(self.data_path, "wb") as file_:
        file_.write(data)

CloudpickleMaterializer

Bases: BaseMaterializer

Materializer using cloudpickle.

This materializer can materialize (almost) any object, but does so in a non-reproducble way since artifacts cannot be loaded from other Python versions. It is recommended to use this materializer only as a last resort.

That is also why it has SKIP_REGISTRATION set to True and is currently only used as a fallback materializer inside the materializer registry.

Source code in src/zenml/materializers/cloudpickle_materializer.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class CloudpickleMaterializer(BaseMaterializer):
    """Materializer using cloudpickle.

    This materializer can materialize (almost) any object, but does so in a
    non-reproducble way since artifacts cannot be loaded from other Python
    versions. It is recommended to use this materializer only as a last resort.

    That is also why it has `SKIP_REGISTRATION` set to True and is currently
    only used as a fallback materializer inside the materializer registry.
    """

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (object,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
    SKIP_REGISTRATION: ClassVar[bool] = True

    def load(self, data_type: Type[Any]) -> Any:
        """Reads an artifact from a cloudpickle file.

        Args:
            data_type: The data type of the artifact.

        Returns:
            The loaded artifact data.
        """
        # validate python version
        source_python_version = self._load_python_version()
        current_python_version = Environment().python_version()
        if source_python_version != current_python_version:
            logger.warning(
                f"Your artifact was materialized under Python version "
                f"'{source_python_version}' but you are currently using "
                f"'{current_python_version}'. This might cause unexpected "
                "behavior since pickle is not reproducible across Python "
                "versions. Attempting to load anyway..."
            )

        # load data
        filepath = os.path.join(self.uri, DEFAULT_FILENAME)
        with self.artifact_store.open(filepath, "rb") as fid:
            data = cloudpickle.load(fid)
        return data

    def _load_python_version(self) -> str:
        """Loads the Python version that was used to materialize the artifact.

        Returns:
            The Python version that was used to materialize the artifact.
        """
        filepath = os.path.join(self.uri, DEFAULT_PYTHON_VERSION_FILENAME)
        if os.path.exists(filepath):
            return read_file_contents_as_string(filepath)
        return "unknown"

    def save(self, data: Any) -> None:
        """Saves an artifact to a cloudpickle file.

        Args:
            data: The data to save.
        """
        # Log a warning if this materializer was not explicitly specified for
        # the given data type.
        if type(self) is CloudpickleMaterializer:
            logger.warning(
                f"No materializer is registered for type `{type(data)}`, so "
                "the default Pickle materializer was used. Pickle is not "
                "production ready and should only be used for prototyping as "
                "the artifacts cannot be loaded when running with a different "
                "Python version. Please consider implementing a custom "
                f"materializer for type `{type(data)}` according to the "
                "instructions at https://docs.zenml.io/how-to/handle-data-artifacts/handle-custom-data-types"
            )

        # save python version for validation on loading
        self._save_python_version()

        # save data
        filepath = os.path.join(self.uri, DEFAULT_FILENAME)
        with self.artifact_store.open(filepath, "wb") as fid:
            cloudpickle.dump(data, fid)

    def _save_python_version(self) -> None:
        """Saves the Python version used to materialize the artifact."""
        filepath = os.path.join(self.uri, DEFAULT_PYTHON_VERSION_FILENAME)
        current_python_version = Environment().python_version()
        write_file_contents_as_string(filepath, current_python_version)

load(data_type)

Reads an artifact from a cloudpickle file.

Parameters:

Name Type Description Default
data_type Type[Any]

The data type of the artifact.

required

Returns:

Type Description
Any

The loaded artifact data.

Source code in src/zenml/materializers/cloudpickle_materializer.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load(self, data_type: Type[Any]) -> Any:
    """Reads an artifact from a cloudpickle file.

    Args:
        data_type: The data type of the artifact.

    Returns:
        The loaded artifact data.
    """
    # validate python version
    source_python_version = self._load_python_version()
    current_python_version = Environment().python_version()
    if source_python_version != current_python_version:
        logger.warning(
            f"Your artifact was materialized under Python version "
            f"'{source_python_version}' but you are currently using "
            f"'{current_python_version}'. This might cause unexpected "
            "behavior since pickle is not reproducible across Python "
            "versions. Attempting to load anyway..."
        )

    # load data
    filepath = os.path.join(self.uri, DEFAULT_FILENAME)
    with self.artifact_store.open(filepath, "rb") as fid:
        data = cloudpickle.load(fid)
    return data

save(data)

Saves an artifact to a cloudpickle file.

Parameters:

Name Type Description Default
data Any

The data to save.

required
Source code in src/zenml/materializers/cloudpickle_materializer.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def save(self, data: Any) -> None:
    """Saves an artifact to a cloudpickle file.

    Args:
        data: The data to save.
    """
    # Log a warning if this materializer was not explicitly specified for
    # the given data type.
    if type(self) is CloudpickleMaterializer:
        logger.warning(
            f"No materializer is registered for type `{type(data)}`, so "
            "the default Pickle materializer was used. Pickle is not "
            "production ready and should only be used for prototyping as "
            "the artifacts cannot be loaded when running with a different "
            "Python version. Please consider implementing a custom "
            f"materializer for type `{type(data)}` according to the "
            "instructions at https://docs.zenml.io/how-to/handle-data-artifacts/handle-custom-data-types"
        )

    # save python version for validation on loading
    self._save_python_version()

    # save data
    filepath = os.path.join(self.uri, DEFAULT_FILENAME)
    with self.artifact_store.open(filepath, "wb") as fid:
        cloudpickle.dump(data, fid)

PydanticMaterializer

Bases: BaseMaterializer

Handle Pydantic BaseModel objects.

Source code in src/zenml/materializers/pydantic_materializer.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class PydanticMaterializer(BaseMaterializer):
    """Handle Pydantic BaseModel objects."""

    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (BaseModel,)

    def load(self, data_type: Type[BaseModel]) -> Any:
        """Reads BaseModel from JSON.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        data_path = os.path.join(self.uri, DEFAULT_FILENAME)
        contents = yaml_utils.read_json(data_path)
        return data_type.model_validate_json(contents)

    def save(self, data: BaseModel) -> None:
        """Serialize a BaseModel to JSON.

        Args:
            data: The data to store.
        """
        data_path = os.path.join(self.uri, DEFAULT_FILENAME)
        yaml_utils.write_json(data_path, data.model_dump_json())

    def extract_metadata(self, data: BaseModel) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given BaseModel object.

        Args:
            data: The BaseModel object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return {"schema": data.schema()}

extract_metadata(data)

Extract metadata from the given BaseModel object.

Parameters:

Name Type Description Default
data BaseModel

The BaseModel object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/materializers/pydantic_materializer.py
59
60
61
62
63
64
65
66
67
68
def extract_metadata(self, data: BaseModel) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given BaseModel object.

    Args:
        data: The BaseModel object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {"schema": data.schema()}

load(data_type)

Reads BaseModel from JSON.

Parameters:

Name Type Description Default
data_type Type[BaseModel]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in src/zenml/materializers/pydantic_materializer.py
37
38
39
40
41
42
43
44
45
46
47
48
def load(self, data_type: Type[BaseModel]) -> Any:
    """Reads BaseModel from JSON.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    data_path = os.path.join(self.uri, DEFAULT_FILENAME)
    contents = yaml_utils.read_json(data_path)
    return data_type.model_validate_json(contents)

save(data)

Serialize a BaseModel to JSON.

Parameters:

Name Type Description Default
data BaseModel

The data to store.

required
Source code in src/zenml/materializers/pydantic_materializer.py
50
51
52
53
54
55
56
57
def save(self, data: BaseModel) -> None:
    """Serialize a BaseModel to JSON.

    Args:
        data: The data to store.
    """
    data_path = os.path.join(self.uri, DEFAULT_FILENAME)
    yaml_utils.write_json(data_path, data.model_dump_json())

ServiceMaterializer

Bases: BaseMaterializer

Materializer to read/write service instances.

Source code in src/zenml/materializers/service_materializer.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class ServiceMaterializer(BaseMaterializer):
    """Materializer to read/write service instances."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (BaseService,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.SERVICE

    def load(self, data_type: Type[Any]) -> BaseService:
        """Creates and returns a service.

        This service is instantiated from the serialized service configuration
        and last known status information saved as artifact.

        Args:
            data_type: The type of the data to read.

        Returns:
            A ZenML service instance.
        """
        filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
        with self.artifact_store.open(filepath, "r") as f:
            service_id = f.read().strip()

        service = Client().get_service(name_id_or_prefix=uuid.UUID(service_id))
        return BaseDeploymentService.from_model(service)

    def save(self, service: BaseService) -> None:
        """Writes a ZenML service.

        The configuration and last known status of the input service instance
        are serialized and saved as an artifact.

        Args:
            service: A ZenML service instance.
        """
        filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
        with self.artifact_store.open(filepath, "w") as f:
            f.write(str(service.uuid))

    def extract_metadata(
        self, service: BaseService
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given service.

        Args:
            service: The service to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        from zenml.metadata.metadata_types import Uri

        if prediction_url := service.get_prediction_url() or None:
            return {"uri": Uri(prediction_url)}
        return {}

extract_metadata(service)

Extract metadata from the given service.

Parameters:

Name Type Description Default
service BaseService

The service to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/materializers/service_materializer.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def extract_metadata(
    self, service: BaseService
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given service.

    Args:
        service: The service to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    from zenml.metadata.metadata_types import Uri

    if prediction_url := service.get_prediction_url() or None:
        return {"uri": Uri(prediction_url)}
    return {}

load(data_type)

Creates and returns a service.

This service is instantiated from the serialized service configuration and last known status information saved as artifact.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
BaseService

A ZenML service instance.

Source code in src/zenml/materializers/service_materializer.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def load(self, data_type: Type[Any]) -> BaseService:
    """Creates and returns a service.

    This service is instantiated from the serialized service configuration
    and last known status information saved as artifact.

    Args:
        data_type: The type of the data to read.

    Returns:
        A ZenML service instance.
    """
    filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
    with self.artifact_store.open(filepath, "r") as f:
        service_id = f.read().strip()

    service = Client().get_service(name_id_or_prefix=uuid.UUID(service_id))
    return BaseDeploymentService.from_model(service)

save(service)

Writes a ZenML service.

The configuration and last known status of the input service instance are serialized and saved as an artifact.

Parameters:

Name Type Description Default
service BaseService

A ZenML service instance.

required
Source code in src/zenml/materializers/service_materializer.py
56
57
58
59
60
61
62
63
64
65
66
67
def save(self, service: BaseService) -> None:
    """Writes a ZenML service.

    The configuration and last known status of the input service instance
    are serialized and saved as an artifact.

    Args:
        service: A ZenML service instance.
    """
    filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
    with self.artifact_store.open(filepath, "w") as f:
        f.write(str(service.uuid))

StructuredStringMaterializer

Bases: BaseMaterializer

Materializer for HTML or Markdown strings.

Source code in src/zenml/materializers/structured_string_materializer.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class StructuredStringMaterializer(BaseMaterializer):
    """Materializer for HTML or Markdown strings."""

    ASSOCIATED_TYPES = (CSVString, HTMLString, MarkdownString, JSONString)
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA_ANALYSIS

    def load(self, data_type: Type[STRUCTURED_STRINGS]) -> STRUCTURED_STRINGS:
        """Loads the data from the HTML or Markdown file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The loaded data.
        """
        with self.artifact_store.open(self._get_filepath(data_type), "r") as f:
            return data_type(f.read())

    def save(self, data: STRUCTURED_STRINGS) -> None:
        """Save data as an HTML or Markdown file.

        Args:
            data: The data to save as an HTML or Markdown file.
        """
        with self.artifact_store.open(
            self._get_filepath(type(data)), "w"
        ) as f:
            f.write(data)

    def save_visualizations(
        self, data: STRUCTURED_STRINGS
    ) -> Dict[str, VisualizationType]:
        """Save visualizations for the given data.

        Args:
            data: The data to save visualizations for.

        Returns:
            A dictionary of visualization URIs and their types.
        """
        filepath = self._get_filepath(type(data))
        filepath = filepath.replace("\\", "/")
        visualization_type = self._get_visualization_type(type(data))
        return {filepath: visualization_type}

    def _get_filepath(self, data_type: Type[STRUCTURED_STRINGS]) -> str:
        """Get the file path for the given data type.

        Args:
            data_type: The type of the data.

        Returns:
            The file path for the given data type.

        Raises:
            ValueError: If the data type is not supported.
        """
        if issubclass(data_type, CSVString):
            filename = CSV_FILENAME
        elif issubclass(data_type, HTMLString):
            filename = HTML_FILENAME
        elif issubclass(data_type, MarkdownString):
            filename = MARKDOWN_FILENAME
        elif issubclass(data_type, JSONString):
            filename = JSON_FILENAME
        else:
            raise ValueError(
                f"Data type {data_type} is not supported by this materializer."
            )
        return os.path.join(self.uri, filename)

    def _get_visualization_type(
        self, data_type: Type[STRUCTURED_STRINGS]
    ) -> VisualizationType:
        """Get the visualization type for the given data type.

        Args:
            data_type: The type of the data.

        Returns:
            The visualization type for the given data type.

        Raises:
            ValueError: If the data type is not supported.
        """
        if issubclass(data_type, CSVString):
            return VisualizationType.CSV
        elif issubclass(data_type, HTMLString):
            return VisualizationType.HTML
        elif issubclass(data_type, MarkdownString):
            return VisualizationType.MARKDOWN
        elif issubclass(data_type, JSONString):
            return VisualizationType.JSON
        else:
            raise ValueError(
                f"Data type {data_type} is not supported by this materializer."
            )

load(data_type)

Loads the data from the HTML or Markdown file.

Parameters:

Name Type Description Default
data_type Type[STRUCTURED_STRINGS]

The type of the data to read.

required

Returns:

Type Description
STRUCTURED_STRINGS

The loaded data.

Source code in src/zenml/materializers/structured_string_materializer.py
41
42
43
44
45
46
47
48
49
50
51
def load(self, data_type: Type[STRUCTURED_STRINGS]) -> STRUCTURED_STRINGS:
    """Loads the data from the HTML or Markdown file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The loaded data.
    """
    with self.artifact_store.open(self._get_filepath(data_type), "r") as f:
        return data_type(f.read())

save(data)

Save data as an HTML or Markdown file.

Parameters:

Name Type Description Default
data STRUCTURED_STRINGS

The data to save as an HTML or Markdown file.

required
Source code in src/zenml/materializers/structured_string_materializer.py
53
54
55
56
57
58
59
60
61
62
def save(self, data: STRUCTURED_STRINGS) -> None:
    """Save data as an HTML or Markdown file.

    Args:
        data: The data to save as an HTML or Markdown file.
    """
    with self.artifact_store.open(
        self._get_filepath(type(data)), "w"
    ) as f:
        f.write(data)

save_visualizations(data)

Save visualizations for the given data.

Parameters:

Name Type Description Default
data STRUCTURED_STRINGS

The data to save visualizations for.

required

Returns:

Type Description
Dict[str, VisualizationType]

A dictionary of visualization URIs and their types.

Source code in src/zenml/materializers/structured_string_materializer.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def save_visualizations(
    self, data: STRUCTURED_STRINGS
) -> Dict[str, VisualizationType]:
    """Save visualizations for the given data.

    Args:
        data: The data to save visualizations for.

    Returns:
        A dictionary of visualization URIs and their types.
    """
    filepath = self._get_filepath(type(data))
    filepath = filepath.replace("\\", "/")
    visualization_type = self._get_visualization_type(type(data))
    return {filepath: visualization_type}

UUIDMaterializer

Bases: BaseMaterializer

Materializer to handle UUID objects.

Source code in src/zenml/materializers/uuid_materializer.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class UUIDMaterializer(BaseMaterializer):
    """Materializer to handle UUID objects."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (uuid.UUID,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA

    def __init__(
        self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
    ):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
            artifact_store: The artifact store where the artifact data is stored.
        """
        super().__init__(uri, artifact_store)
        self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)

    def load(self, _: Type[uuid.UUID]) -> uuid.UUID:
        """Read UUID from artifact store.

        Args:
            _: The type of the data to be loaded.

        Returns:
            The loaded UUID.
        """
        with self.artifact_store.open(self.data_path, "r") as f:
            uuid_str = f.read().strip()
        return uuid.UUID(uuid_str)

    def save(self, data: uuid.UUID) -> None:
        """Write UUID to artifact store.

        Args:
            data: The UUID to be saved.
        """
        with self.artifact_store.open(self.data_path, "w") as f:
            f.write(str(data))

    def extract_metadata(self, data: uuid.UUID) -> Dict[str, MetadataType]:
        """Extract metadata from the UUID.

        Args:
            data: The UUID to extract metadata from.

        Returns:
            A dictionary of metadata extracted from the UUID.
        """
        return {
            "string_representation": str(data),
        }

__init__(uri, artifact_store=None)

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
artifact_store Optional[BaseArtifactStore]

The artifact store where the artifact data is stored.

None
Source code in src/zenml/materializers/uuid_materializer.py
34
35
36
37
38
39
40
41
42
43
44
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
        artifact_store: The artifact store where the artifact data is stored.
    """
    super().__init__(uri, artifact_store)
    self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)

extract_metadata(data)

Extract metadata from the UUID.

Parameters:

Name Type Description Default
data UUID

The UUID to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata extracted from the UUID.

Source code in src/zenml/materializers/uuid_materializer.py
68
69
70
71
72
73
74
75
76
77
78
79
def extract_metadata(self, data: uuid.UUID) -> Dict[str, MetadataType]:
    """Extract metadata from the UUID.

    Args:
        data: The UUID to extract metadata from.

    Returns:
        A dictionary of metadata extracted from the UUID.
    """
    return {
        "string_representation": str(data),
    }

load(_)

Read UUID from artifact store.

Parameters:

Name Type Description Default
_ Type[UUID]

The type of the data to be loaded.

required

Returns:

Type Description
UUID

The loaded UUID.

Source code in src/zenml/materializers/uuid_materializer.py
46
47
48
49
50
51
52
53
54
55
56
57
def load(self, _: Type[uuid.UUID]) -> uuid.UUID:
    """Read UUID from artifact store.

    Args:
        _: The type of the data to be loaded.

    Returns:
        The loaded UUID.
    """
    with self.artifact_store.open(self.data_path, "r") as f:
        uuid_str = f.read().strip()
    return uuid.UUID(uuid_str)

save(data)

Write UUID to artifact store.

Parameters:

Name Type Description Default
data UUID

The UUID to be saved.

required
Source code in src/zenml/materializers/uuid_materializer.py
59
60
61
62
63
64
65
66
def save(self, data: uuid.UUID) -> None:
    """Write UUID to artifact store.

    Args:
        data: The UUID to be saved.
    """
    with self.artifact_store.open(self.data_path, "w") as f:
        f.write(str(data))