Materializers
zenml.materializers
special
Initialization of ZenML materializers.
Materializers are used to convert a ZenML artifact into a specific format. They
are most often used to handle the input or output of ZenML steps, and can be
extended by building on the BaseMaterializer
class.
base_materializer
Metaclass implementation for registering ZenML BaseMaterializer subclasses.
BaseMaterializer
Base Materializer to realize artifact data.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
"""Base Materializer to realize artifact data."""
ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact.
Args:
artifact: The artifact to materialize.
"""
self.artifact = artifact
def _can_handle_type(self, data_type: Type[Any]) -> bool:
"""Whether the materializer can read/write a certain type.
Args:
data_type: The type to check.
Returns:
Whether the materializer can read/write the given type.
"""
return any(
issubclass(data_type, associated_type)
for associated_type in self.ASSOCIATED_TYPES
)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Raises:
TypeError: If the data is not of the correct type.
"""
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to handle type {data_type}. {self.__class__.__name__} "
f"can only read artifacts to the following types: "
f"{self.ASSOCIATED_TYPES}."
)
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
Raises:
TypeError: If the data is not of the correct type.
"""
data_type = type(data)
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to write {data_type}. {self.__class__.__name__} "
f"can only write the following types: {self.ASSOCIATED_TYPES}."
)
__init__(self, artifact)
special
Initializes a materializer with the given artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
BaseArtifact |
The artifact to materialize. |
required |
Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact.
Args:
artifact: The artifact to materialize.
"""
self.artifact = artifact
handle_input(self, data_type)
Write logic here to handle input of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
What type the input should be materialized as. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the data is not of the correct type. |
Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Raises:
TypeError: If the data is not of the correct type.
"""
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to handle type {data_type}. {self.__class__.__name__} "
f"can only read artifacts to the following types: "
f"{self.ASSOCIATED_TYPES}."
)
handle_return(self, data)
Write logic here to handle return of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
Any object that is specified as an input artifact of the step. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the data is not of the correct type. |
Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
Raises:
TypeError: If the data is not of the correct type.
"""
data_type = type(data)
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to write {data_type}. {self.__class__.__name__} "
f"can only write the following types: {self.ASSOCIATED_TYPES}."
)
BaseMaterializerMeta (type)
Metaclass responsible for registering different BaseMaterializer subclasses.
Materializers are used for reading/writing artifacts.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
"""Metaclass responsible for registering different BaseMaterializer subclasses.
Materializers are used for reading/writing artifacts.
"""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at the `MaterializerRegistry`.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The BaseMaterializerMeta class.
Raises:
MaterializerInterfaceError: If the class was improperly defined.
"""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
from zenml.artifacts.base_artifact import BaseArtifact
if not cls.ASSOCIATED_TYPES:
raise MaterializerInterfaceError(
f"Invalid materializer class '{name}'. When creating a "
f"custom materializer, make sure to specify at least one "
f"type in its ASSOCIATED_TYPES class variable.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
if not (
inspect.isclass(artifact_type)
and issubclass(artifact_type, BaseArtifact)
):
raise MaterializerInterfaceError(
f"Associated artifact type {artifact_type} for "
f"materializer {name} is not a `BaseArtifact` "
f"subclass.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
for associated_type in cls.ASSOCIATED_TYPES:
if not inspect.isclass(associated_type):
raise MaterializerInterfaceError(
f"Associated type {associated_type} for materializer "
f"{name} is not a class.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
default_materializer_registry.register_materializer_type(
associated_type, cls
)
type_registry.register_integration(
associated_type, artifact_types
)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Creates a Materializer class and registers it at the MaterializerRegistry
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the class. |
required |
bases |
Tuple[Type[Any], ...] |
The base classes of the class. |
required |
dct |
Dict[str, Any] |
The dictionary of the class. |
required |
Returns:
Type | Description |
---|---|
BaseMaterializerMeta |
The BaseMaterializerMeta class. |
Exceptions:
Type | Description |
---|---|
MaterializerInterfaceError |
If the class was improperly defined. |
Source code in zenml/materializers/base_materializer.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at the `MaterializerRegistry`.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The BaseMaterializerMeta class.
Raises:
MaterializerInterfaceError: If the class was improperly defined.
"""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
from zenml.artifacts.base_artifact import BaseArtifact
if not cls.ASSOCIATED_TYPES:
raise MaterializerInterfaceError(
f"Invalid materializer class '{name}'. When creating a "
f"custom materializer, make sure to specify at least one "
f"type in its ASSOCIATED_TYPES class variable.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
if not (
inspect.isclass(artifact_type)
and issubclass(artifact_type, BaseArtifact)
):
raise MaterializerInterfaceError(
f"Associated artifact type {artifact_type} for "
f"materializer {name} is not a `BaseArtifact` "
f"subclass.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
for associated_type in cls.ASSOCIATED_TYPES:
if not inspect.isclass(associated_type):
raise MaterializerInterfaceError(
f"Associated type {associated_type} for materializer "
f"{name} is not a class.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
default_materializer_registry.register_materializer_type(
associated_type, cls
)
type_registry.register_integration(
associated_type, artifact_types
)
return cls
built_in_materializer
Implementation of ZenML's builtin materializer.
BuiltInContainerMaterializer (BaseMaterializer)
Handle built-in container types (dict, list, set, tuple).
Source code in zenml/materializers/built_in_materializer.py
class BuiltInContainerMaterializer(BaseMaterializer):
"""Handle built-in container types (dict, list, set, tuple)."""
ASSOCIATED_TYPES = (dict, list, set, tuple)
def __init__(self, artifact: "BaseArtifact"):
"""Define `self.data_path` and `self.metadata_path`.
Args:
artifact: Artifact required by `BaseMaterializer.__init__()`.
"""
super().__init__(artifact)
self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
self.metadata_path = os.path.join(
self.artifact.uri, DEFAULT_METADATA_FILENAME
)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads a materialized built-in container object.
If the data was serialized to JSON, deserialize it.
Otherwise, reconstruct all elements according to the metadata file:
1. Resolve the data type using `find_type_by_str()`,
2. Get the materializer via the `default_materializer_registry`,
3. Initialize the materializer with a mock `DataArtifact`, whose
`uri` attribute is overwritten to point to the desired path,
4. Use `handle_input()` of that materializer to load the element.
Args:
data_type: The type of the data to read.
Returns:
The data read.
Raises:
RuntimeError: If the data was not found.
"""
super().handle_input(data_type)
# If the data was not serialized, there must be metadata present.
if not fileio.exists(self.data_path) and not fileio.exists(
self.metadata_path
):
raise RuntimeError(
f"Materialization of type {data_type} failed. Expected either"
f"{self.data_path} or {self.metadata_path} to exist."
)
# If the data was serialized as JSON, deserialize it.
if fileio.exists(self.data_path):
outputs = yaml_utils.read_json(self.data_path)
# Otherwise, use the metadata to reconstruct the data as a list.
else:
metadata = yaml_utils.read_json(self.metadata_path)
outputs = []
for path_, type_str in zip(metadata["paths"], metadata["types"]):
type_ = find_type_by_str(type_str)
materializer_class = default_materializer_registry[type_]
mock_artifact = DataArtifact()
mock_artifact.uri = path_
materializer = materializer_class(mock_artifact)
element = materializer.handle_input(type_)
outputs.append(element)
# Cast the data to the correct type.
if issubclass(data_type, dict) and not isinstance(outputs, dict):
keys, values = outputs
return dict(zip(keys, values))
if issubclass(data_type, tuple):
return tuple(outputs)
if issubclass(data_type, set):
return set(outputs)
return outputs
def handle_return(self, data: Any) -> None:
"""Materialize a built-in container object.
If the object can be serialized to JSON, serialize it.
Otherwise, use the `default_materializer_registry` to find the correct
materializer for each element and materialize each element into a
subdirectory.
Tuples and sets are cast to list before materialization.
For non-serializable dicts, materialize keys/values as separate lists.
Args:
data: The built-in container object to materialize.
Raises:
Exception: If any exception occurs, it is raised after cleanup.
"""
super().handle_return(data)
# tuple and set: handle as list.
if isinstance(data, tuple) or isinstance(data, set):
data = list(data)
# If the data is serializable, just write it into a single JSON file.
if _is_serializable(data):
yaml_utils.write_json(self.data_path, data)
return
# non-serializable dict: Handle as non-serializable list of lists.
if isinstance(data, dict):
data = [list(data.keys()), list(data.values())]
# non-serializable list: Materialize each element into a subfolder.
# Get path, type, and corresponding materializer for each element.
paths, types, materializers = [], [], []
for i, element in enumerate(data):
element_path = os.path.join(self.artifact.uri, str(i))
fileio.mkdir(element_path)
type_ = type(element)
paths.append(element_path)
types.append(str(type_))
materializer_class = default_materializer_registry[type_]
mock_artifact = DataArtifact()
mock_artifact.uri = element_path
materializer = materializer_class(mock_artifact)
materializers.append(materializer)
try:
# Write metadata as JSON.
metadata = {"length": len(data), "paths": paths, "types": types}
yaml_utils.write_json(self.metadata_path, metadata)
# Materialize each element.
for element, materializer in zip(data, materializers):
materializer.handle_return(element)
# If an error occurs, delete all created files.
except Exception as e:
# Delete metadata
if fileio.exists(self.metadata_path):
fileio.remove(self.metadata_path)
# Delete all elements that were already saved.
for element_path in paths:
fileio.rmtree(element_path)
raise e
__init__(self, artifact)
special
Define self.data_path
and self.metadata_path
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
BaseArtifact |
Artifact required by |
required |
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Define `self.data_path` and `self.metadata_path`.
Args:
artifact: Artifact required by `BaseMaterializer.__init__()`.
"""
super().__init__(artifact)
self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
self.metadata_path = os.path.join(
self.artifact.uri, DEFAULT_METADATA_FILENAME
)
handle_input(self, data_type)
Reads a materialized built-in container object.
If the data was serialized to JSON, deserialize it.
Otherwise, reconstruct all elements according to the metadata file:
1. Resolve the data type using find_type_by_str()
,
2. Get the materializer via the default_materializer_registry
,
3. Initialize the materializer with a mock DataArtifact
, whose
uri
attribute is overwritten to point to the desired path,
4. Use handle_input()
of that materializer to load the element.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Any |
The data read. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the data was not found. |
Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads a materialized built-in container object.
If the data was serialized to JSON, deserialize it.
Otherwise, reconstruct all elements according to the metadata file:
1. Resolve the data type using `find_type_by_str()`,
2. Get the materializer via the `default_materializer_registry`,
3. Initialize the materializer with a mock `DataArtifact`, whose
`uri` attribute is overwritten to point to the desired path,
4. Use `handle_input()` of that materializer to load the element.
Args:
data_type: The type of the data to read.
Returns:
The data read.
Raises:
RuntimeError: If the data was not found.
"""
super().handle_input(data_type)
# If the data was not serialized, there must be metadata present.
if not fileio.exists(self.data_path) and not fileio.exists(
self.metadata_path
):
raise RuntimeError(
f"Materialization of type {data_type} failed. Expected either"
f"{self.data_path} or {self.metadata_path} to exist."
)
# If the data was serialized as JSON, deserialize it.
if fileio.exists(self.data_path):
outputs = yaml_utils.read_json(self.data_path)
# Otherwise, use the metadata to reconstruct the data as a list.
else:
metadata = yaml_utils.read_json(self.metadata_path)
outputs = []
for path_, type_str in zip(metadata["paths"], metadata["types"]):
type_ = find_type_by_str(type_str)
materializer_class = default_materializer_registry[type_]
mock_artifact = DataArtifact()
mock_artifact.uri = path_
materializer = materializer_class(mock_artifact)
element = materializer.handle_input(type_)
outputs.append(element)
# Cast the data to the correct type.
if issubclass(data_type, dict) and not isinstance(outputs, dict):
keys, values = outputs
return dict(zip(keys, values))
if issubclass(data_type, tuple):
return tuple(outputs)
if issubclass(data_type, set):
return set(outputs)
return outputs
handle_return(self, data)
Materialize a built-in container object.
If the object can be serialized to JSON, serialize it.
Otherwise, use the default_materializer_registry
to find the correct
materializer for each element and materialize each element into a
subdirectory.
Tuples and sets are cast to list before materialization.
For non-serializable dicts, materialize keys/values as separate lists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
The built-in container object to materialize. |
required |
Exceptions:
Type | Description |
---|---|
Exception |
If any exception occurs, it is raised after cleanup. |
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
"""Materialize a built-in container object.
If the object can be serialized to JSON, serialize it.
Otherwise, use the `default_materializer_registry` to find the correct
materializer for each element and materialize each element into a
subdirectory.
Tuples and sets are cast to list before materialization.
For non-serializable dicts, materialize keys/values as separate lists.
Args:
data: The built-in container object to materialize.
Raises:
Exception: If any exception occurs, it is raised after cleanup.
"""
super().handle_return(data)
# tuple and set: handle as list.
if isinstance(data, tuple) or isinstance(data, set):
data = list(data)
# If the data is serializable, just write it into a single JSON file.
if _is_serializable(data):
yaml_utils.write_json(self.data_path, data)
return
# non-serializable dict: Handle as non-serializable list of lists.
if isinstance(data, dict):
data = [list(data.keys()), list(data.values())]
# non-serializable list: Materialize each element into a subfolder.
# Get path, type, and corresponding materializer for each element.
paths, types, materializers = [], [], []
for i, element in enumerate(data):
element_path = os.path.join(self.artifact.uri, str(i))
fileio.mkdir(element_path)
type_ = type(element)
paths.append(element_path)
types.append(str(type_))
materializer_class = default_materializer_registry[type_]
mock_artifact = DataArtifact()
mock_artifact.uri = element_path
materializer = materializer_class(mock_artifact)
materializers.append(materializer)
try:
# Write metadata as JSON.
metadata = {"length": len(data), "paths": paths, "types": types}
yaml_utils.write_json(self.metadata_path, metadata)
# Materialize each element.
for element, materializer in zip(data, materializers):
materializer.handle_return(element)
# If an error occurs, delete all created files.
except Exception as e:
# Delete metadata
if fileio.exists(self.metadata_path):
fileio.remove(self.metadata_path)
# Delete all elements that were already saved.
for element_path in paths:
fileio.rmtree(element_path)
raise e
BuiltInMaterializer (BaseMaterializer)
Handle JSON-serializable basic types (bool
, float
, int
, str
).
Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
"""Handle JSON-serializable basic types (`bool`, `float`, `int`, `str`)."""
ASSOCIATED_ARTIFACT_TYPES = (
DataArtifact,
DataAnalysisArtifact,
)
ASSOCIATED_TYPES = BASIC_TYPES
def __init__(self, artifact: "BaseArtifact"):
"""Define `self.data_path`.
Args:
artifact: Artifact required by `BaseMaterializer.__init__()`.
"""
super().__init__(artifact)
self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from JSON.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
super().handle_input(data_type)
contents = yaml_utils.read_json(self.data_path)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
def handle_return(self, data: Any) -> None:
"""Serialize a basic type to JSON.
Args:
data: The data to store.
"""
super().handle_return(data)
yaml_utils.write_json(self.data_path, data)
__init__(self, artifact)
special
Define self.data_path
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
BaseArtifact |
Artifact required by |
required |
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Define `self.data_path`.
Args:
artifact: Artifact required by `BaseMaterializer.__init__()`.
"""
super().__init__(artifact)
self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
handle_input(self, data_type)
Reads basic primitive types from JSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Any |
The data read. |
Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from JSON.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
super().handle_input(data_type)
contents = yaml_utils.read_json(self.data_path)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
handle_return(self, data)
Serialize a basic type to JSON.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
The data to store. |
required |
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
"""Serialize a basic type to JSON.
Args:
data: The data to store.
"""
super().handle_return(data)
yaml_utils.write_json(self.data_path, data)
BytesMaterializer (BaseMaterializer)
Handle bytes
data type, which is not JSON serializable.
Source code in zenml/materializers/built_in_materializer.py
class BytesMaterializer(BaseMaterializer):
"""Handle `bytes` data type, which is not JSON serializable."""
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact, DataAnalysisArtifact)
ASSOCIATED_TYPES = (bytes,)
def __init__(self, artifact: "BaseArtifact"):
"""Define `self.data_path`.
Args:
artifact: Artifact required by `BaseMaterializer.__init__()`.
"""
super().__init__(artifact)
self.data_path = os.path.join(self.artifact.uri, DEFAULT_BYTES_FILENAME)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads a bytes object from file.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
super().handle_input(data_type)
with fileio.open(self.data_path, "rb") as file_:
return file_.read()
def handle_return(self, data: Any) -> None:
"""Save a bytes object to file.
Args:
data: The data to store.
"""
super().handle_return(data)
with fileio.open(self.data_path, "wb") as file_:
file_.write(data)
__init__(self, artifact)
special
Define self.data_path
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
BaseArtifact |
Artifact required by |
required |
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Define `self.data_path`.
Args:
artifact: Artifact required by `BaseMaterializer.__init__()`.
"""
super().__init__(artifact)
self.data_path = os.path.join(self.artifact.uri, DEFAULT_BYTES_FILENAME)
handle_input(self, data_type)
Reads a bytes object from file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Any |
The data read. |
Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads a bytes object from file.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
super().handle_input(data_type)
with fileio.open(self.data_path, "rb") as file_:
return file_.read()
handle_return(self, data)
Save a bytes object to file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
The data to store. |
required |
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
"""Save a bytes object to file.
Args:
data: The data to store.
"""
super().handle_return(data)
with fileio.open(self.data_path, "wb") as file_:
file_.write(data)
find_type_by_str(type_str)
Get a Python type, given its string representation.
E.g., "int
.
Currently this is implemented by checking all artifact types registered in
the default_materializer_registry
. This means, only types in the registry
can be found. Any other types will cause a RunTimeError
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
type_str |
str |
The string representation of a type. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the type could not be resolved. |
Returns:
Type | Description |
---|---|
Type[Any] |
The type whose string representation is |
Source code in zenml/materializers/built_in_materializer.py
def find_type_by_str(type_str: str) -> Type[Any]:
"""Get a Python type, given its string representation.
E.g., "<class 'int'>" should resolve to `int`.
Currently this is implemented by checking all artifact types registered in
the `default_materializer_registry`. This means, only types in the registry
can be found. Any other types will cause a `RunTimeError`.
Args:
type_str: The string representation of a type.
Raises:
RuntimeError: If the type could not be resolved.
Returns:
The type whose string representation is `type_str`.
"""
# TODO: how to handle subclasses of registered types?
registered_types = default_materializer_registry.materializer_types.keys()
type_str_mapping = {str(type_): type_ for type_ in registered_types}
if type_str in type_str_mapping:
return type_str_mapping[type_str]
raise RuntimeError(f"Cannot resolve type '{type_str}'.")
default_materializer_registry
Implementation of a default materializer registry.
MaterializerRegistry
Matches a Python type to a default materializer.
Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
"""Matches a Python type to a default materializer."""
def __init__(self) -> None:
"""Initialize the materializer registry."""
self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"Found existing materializer class for {key}: "
f"{self.materializer_types[key]}. Skipping registration of "
f"{type_}."
)
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
Raises:
StepInterfaceError: If the key (or any of its superclasses) is not
registered or the key has more than one superclass with
different default materializers
"""
# Check whether the type is registered
if key in self.materializer_types:
return self.materializer_types[key]
# If the type is not registered, check for superclasses
materializers_for_compatible_superclasses = {
materializer
for registered_type, materializer in self.materializer_types.items()
if issubclass(key, registered_type)
}
# Make sure that there is only a single materializer
if len(materializers_for_compatible_superclasses) == 1:
return materializers_for_compatible_superclasses.pop()
if len(materializers_for_compatible_superclasses) > 1:
raise StepInterfaceError(
f"Type {key} is subclassing more than one type, thus it "
f"maps to multiple materializers within the materializer "
f"registry: {materializers_for_compatible_superclasses}. "
f"Please specify which of these materializers you would "
f"like to use explicitly in your step.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
raise StepInterfaceError(
f"No materializer registered for class {key}. You can register a "
f"default materializer for specific types by subclassing "
f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
f" variable.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types.
Returns:
A dictionary of registered materializer types.
"""
return self.materializer_types
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type.
Args:
key: Indicates the type of object.
Returns:
True if a materializer is registered for the given type, False
otherwise.
"""
return any(issubclass(key, type_) for type_ in self.materializer_types)
__getitem__(self, key)
special
Get a single materializers based on the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
Returns:
Type | Description |
---|---|
Type[BaseMaterializer] |
|
Exceptions:
Type | Description |
---|---|
StepInterfaceError |
If the key (or any of its superclasses) is not registered or the key has more than one superclass with different default materializers |
Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
Raises:
StepInterfaceError: If the key (or any of its superclasses) is not
registered or the key has more than one superclass with
different default materializers
"""
# Check whether the type is registered
if key in self.materializer_types:
return self.materializer_types[key]
# If the type is not registered, check for superclasses
materializers_for_compatible_superclasses = {
materializer
for registered_type, materializer in self.materializer_types.items()
if issubclass(key, registered_type)
}
# Make sure that there is only a single materializer
if len(materializers_for_compatible_superclasses) == 1:
return materializers_for_compatible_superclasses.pop()
if len(materializers_for_compatible_superclasses) > 1:
raise StepInterfaceError(
f"Type {key} is subclassing more than one type, thus it "
f"maps to multiple materializers within the materializer "
f"registry: {materializers_for_compatible_superclasses}. "
f"Please specify which of these materializers you would "
f"like to use explicitly in your step.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
raise StepInterfaceError(
f"No materializer registered for class {key}. You can register a "
f"default materializer for specific types by subclassing "
f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
f" variable.",
url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
)
__init__(self)
special
Initialize the materializer registry.
Source code in zenml/materializers/default_materializer_registry.py
def __init__(self) -> None:
"""Initialize the materializer registry."""
self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
get_materializer_types(self)
Get all registered materializer types.
Returns:
Type | Description |
---|---|
Dict[Type[Any], Type[BaseMaterializer]] |
A dictionary of registered materializer types. |
Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types.
Returns:
A dictionary of registered materializer types.
"""
return self.materializer_types
is_registered(self, key)
Returns if a materializer class is registered for the given type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
Returns:
Type | Description |
---|---|
bool |
True if a materializer is registered for the given type, False otherwise. |
Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type.
Args:
key: Indicates the type of object.
Returns:
True if a materializer is registered for the given type, False
otherwise.
"""
return any(issubclass(key, type_) for type_ in self.materializer_types)
register_and_overwrite_type(self, key, type_)
Registers a new materializer and also overwrites a default if set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)
Registers a new materializer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"Found existing materializer class for {key}: "
f"{self.materializer_types[key]}. Skipping registration of "
f"{type_}."
)
numpy_materializer
Implementation of the ZenML NumPy materializer.
NumpyMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = (np.ndarray,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
"""Reads numpy array from parquet file.
Args:
data_type: The type of the data to read.
Returns:
The numpy array.
"""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
def handle_return(self, arr: "NDArray[Any]") -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
handle_input(self, data_type)
Reads numpy array from parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
NDArray[Any] |
The numpy array. |
Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
"""Reads numpy array from parquet file.
Args:
data_type: The type of the data to read.
Returns:
The numpy array.
"""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
handle_return(self, arr)
Writes a np.ndarray to the artifact store as a parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
NDArray[Any] |
The numpy array to write. |
required |
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: "NDArray[Any]") -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
pandas_materializer
Materializer for Pandas.
PandasMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = (pd.DataFrame, pd.Series)
ASSOCIATED_ARTIFACT_TYPES = (
DataArtifact,
StatisticsArtifact,
SchemaArtifact,
)
def handle_input(
self, data_type: Type[Any]
) -> Union[pd.DataFrame, pd.Series]:
"""Reads pd.DataFrame or pd.Series from a parquet file.
Args:
data_type: The type of the data to read.
Returns:
The pandas dataframe or series.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
# Create a temporary folder
temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)
# Copy from artifact store to temporary file
fileio.copy(filepath, temp_file)
# Load the model from the temporary file
df = pd.read_parquet(temp_file)
# Cleanup and return
fileio.rmtree(temp_dir)
if issubclass(data_type, pd.Series):
# Taking the first column if its a series as the assumption
# is that there will only be one
assert len(df.columns) == 1
df = df[df.columns[0]]
return df
def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename.
Args:
df: The pandas dataframe or series to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
# Create a temporary file to store the model
with tempfile.NamedTemporaryFile(
mode="w", suffix=".gzip", delete=False
) as f:
df.to_parquet(f.name, compression=COMPRESSION_TYPE)
fileio.copy(f.name, filepath)
# Close and remove the temporary file
f.close()
fileio.remove(f.name)
handle_input(self, data_type)
Reads pd.DataFrame or pd.Series from a parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Union[pandas.core.frame.DataFrame, pandas.core.series.Series] |
The pandas dataframe or series. |
Source code in zenml/materializers/pandas_materializer.py
def handle_input(
self, data_type: Type[Any]
) -> Union[pd.DataFrame, pd.Series]:
"""Reads pd.DataFrame or pd.Series from a parquet file.
Args:
data_type: The type of the data to read.
Returns:
The pandas dataframe or series.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
# Create a temporary folder
temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)
# Copy from artifact store to temporary file
fileio.copy(filepath, temp_file)
# Load the model from the temporary file
df = pd.read_parquet(temp_file)
# Cleanup and return
fileio.rmtree(temp_dir)
if issubclass(data_type, pd.Series):
# Taking the first column if its a series as the assumption
# is that there will only be one
assert len(df.columns) == 1
df = df[df.columns[0]]
return df
handle_return(self, df)
Writes a pandas dataframe or series to the specified filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Union[pandas.core.frame.DataFrame, pandas.core.series.Series] |
The pandas dataframe or series to write. |
required |
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename.
Args:
df: The pandas dataframe or series to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
# Create a temporary file to store the model
with tempfile.NamedTemporaryFile(
mode="w", suffix=".gzip", delete=False
) as f:
df.to_parquet(f.name, compression=COMPRESSION_TYPE)
fileio.copy(f.name, filepath)
# Close and remove the temporary file
f.close()
fileio.remove(f.name)
service_materializer
Implementation of a materializer to read and write ZenML service instances.
ServiceMaterializer (BaseMaterializer)
Materializer to read/write service instances.
Source code in zenml/materializers/service_materializer.py
class ServiceMaterializer(BaseMaterializer):
"""Materializer to read/write service instances."""
ASSOCIATED_TYPES = (BaseService,)
ASSOCIATED_ARTIFACT_TYPES = (ServiceArtifact,)
def handle_input(self, data_type: Type[Any]) -> BaseService:
"""Creates and returns a service.
This service is instantiated from the serialized service configuration
and last known status information saved as artifact.
Args:
data_type: The type of the data to read.
Returns:
A ZenML service instance.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "r") as f:
service = ServiceRegistry().load_service_from_json(f.read())
return service
def handle_return(self, service: BaseService) -> None:
"""Writes a ZenML service.
The configuration and last known status of the input service instance
are serialized and saved as an artifact.
Args:
service: A ZenML service instance.
"""
super().handle_return(service)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "w") as f:
f.write(service.json(indent=4))
handle_input(self, data_type)
Creates and returns a service.
This service is instantiated from the serialized service configuration and last known status information saved as artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
BaseService |
A ZenML service instance. |
Source code in zenml/materializers/service_materializer.py
def handle_input(self, data_type: Type[Any]) -> BaseService:
"""Creates and returns a service.
This service is instantiated from the serialized service configuration
and last known status information saved as artifact.
Args:
data_type: The type of the data to read.
Returns:
A ZenML service instance.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "r") as f:
service = ServiceRegistry().load_service_from_json(f.read())
return service
handle_return(self, service)
Writes a ZenML service.
The configuration and last known status of the input service instance are serialized and saved as an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
A ZenML service instance. |
required |
Source code in zenml/materializers/service_materializer.py
def handle_return(self, service: BaseService) -> None:
"""Writes a ZenML service.
The configuration and last known status of the input service instance
are serialized and saved as an artifact.
Args:
service: A ZenML service instance.
"""
super().handle_return(service)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "w") as f:
f.write(service.json(indent=4))