Materializers
zenml.materializers
special
Materializers are used to convert a ZenML artifact into a specific format. They
are most often used to handle the input or output of ZenML steps, and can be
extended by building on the BaseMaterializer
class.
base_materializer
BaseMaterializer
Base Materializer to realize artifact data.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
"""Base Materializer to realize artifact data."""
ASSOCIATED_ARTIFACT_TYPES: ClassVar[List[Type["BaseArtifact"]]] = []
ASSOCIATED_TYPES: ClassVar[List[Type[Any]]] = []
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact."""
self.artifact = artifact
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Returns:
Any object that is to be passed into the relevant artifact in the
step.
"""
# TODO [ENG-140]: Add type checking for materializer handle_input
# if data_type not in self.ASSOCIATED_TYPES:
# raise ValueError(
# f"Data type {data_type} not supported by materializer "
# f"{self.__name__}. Supported types: {self.ASSOCIATED_TYPES}"
# )
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
Any object that is specified as an input artifact of the step.
"""
# TODO [ENG-141]: Put proper type checking
# if data_type not in self.ASSOCIATED_TYPES:
# raise ValueError(
# f"Data type {data_type} not supported by materializer "
# f"{self.__class__.__name__}. Supported types: "
# f"{self.ASSOCIATED_TYPES}"
# )
__init__(self, artifact)
special
Initializes a materializer with the given artifact.
Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact."""
self.artifact = artifact
handle_input(self, data_type)
Write logic here to handle input of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
What type the input should be materialized as. |
required |
Returns:
Type | Description |
---|---|
Any |
Any object that is to be passed into the relevant artifact in the step. |
Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Returns:
Any object that is to be passed into the relevant artifact in the
step.
"""
# TODO [ENG-140]: Add type checking for materializer handle_input
# if data_type not in self.ASSOCIATED_TYPES:
# raise ValueError(
# f"Data type {data_type} not supported by materializer "
# f"{self.__name__}. Supported types: {self.ASSOCIATED_TYPES}"
# )
handle_return(self, data)
Write logic here to handle return of the step function.
Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
Any object that is specified as an input artifact of the step.
"""
# TODO [ENG-141]: Put proper type checking
# if data_type not in self.ASSOCIATED_TYPES:
# raise ValueError(
# f"Data type {data_type} not supported by materializer "
# f"{self.__class__.__name__}. Supported types: "
# f"{self.ASSOCIATED_TYPES}"
# )
BaseMaterializerMeta (type)
Metaclass responsible for registering different BaseMaterializer subclasses for reading/writing artifacts.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
"""Metaclass responsible for registering different BaseMaterializer
subclasses for reading/writing artifacts."""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at
the `MaterializerRegistry`."""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
assert cls.ASSOCIATED_TYPES, (
"You should specify a list of ASSOCIATED_TYPES when creating a "
"Materializer!"
)
for associated_type in cls.ASSOCIATED_TYPES:
default_materializer_registry.register_materializer_type(
associated_type, cls
)
if cls.ASSOCIATED_ARTIFACT_TYPES:
type_registry.register_integration(
associated_type, cls.ASSOCIATED_ARTIFACT_TYPES
)
else:
from zenml.artifacts.base_artifact import BaseArtifact
type_registry.register_integration(
associated_type, [BaseArtifact]
)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Creates a Materializer class and registers it at
the MaterializerRegistry
.
Source code in zenml/materializers/base_materializer.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at
the `MaterializerRegistry`."""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
assert cls.ASSOCIATED_TYPES, (
"You should specify a list of ASSOCIATED_TYPES when creating a "
"Materializer!"
)
for associated_type in cls.ASSOCIATED_TYPES:
default_materializer_registry.register_materializer_type(
associated_type, cls
)
if cls.ASSOCIATED_ARTIFACT_TYPES:
type_registry.register_integration(
associated_type, cls.ASSOCIATED_ARTIFACT_TYPES
)
else:
from zenml.artifacts.base_artifact import BaseArtifact
type_registry.register_integration(
associated_type, [BaseArtifact]
)
return cls
beam_materializer
BeamMaterializer (BaseMaterializer)
Materializer to read data to and from beam.
Source code in zenml/materializers/beam_materializer.py
class BeamMaterializer(BaseMaterializer):
"""Materializer to read data to and from beam."""
ASSOCIATED_TYPES = [beam.PCollection]
ASSOCIATED_ARTIFACT_TYPES = [DataArtifact]
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads all files inside the artifact directory and materializes them
as a beam compatible output."""
# TODO [ENG-138]: Implement beam reading
super().handle_input(data_type)
def handle_return(self, pipeline: beam.Pipeline) -> None:
"""Appends a beam.io.WriteToParquet at the end of a beam pipeline
and therefore persists the results.
Args:
pipeline: A beam.pipeline object.
"""
# TODO [ENG-139]: Implement beam writing
super().handle_return(pipeline)
pipeline | beam.ParDo()
pipeline.run()
# pipeline | beam.io.WriteToParquet(self.artifact.uri)
# pipeline.run()
handle_input(self, data_type)
Reads all files inside the artifact directory and materializes them as a beam compatible output.
Source code in zenml/materializers/beam_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads all files inside the artifact directory and materializes them
as a beam compatible output."""
# TODO [ENG-138]: Implement beam reading
super().handle_input(data_type)
handle_return(self, pipeline)
Appends a beam.io.WriteToParquet at the end of a beam pipeline and therefore persists the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
Pipeline |
A beam.pipeline object. |
required |
Source code in zenml/materializers/beam_materializer.py
def handle_return(self, pipeline: beam.Pipeline) -> None:
"""Appends a beam.io.WriteToParquet at the end of a beam pipeline
and therefore persists the results.
Args:
pipeline: A beam.pipeline object.
"""
# TODO [ENG-139]: Implement beam writing
super().handle_return(pipeline)
pipeline | beam.ParDo()
pipeline.run()
# pipeline | beam.io.WriteToParquet(self.artifact.uri)
# pipeline.run()
built_in_materializer
BuiltInMaterializer (BaseMaterializer)
Read/Write JSON files.
Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
"""Read/Write JSON files."""
ASSOCIATED_ARTIFACT_TYPES = [DataArtifact]
ASSOCIATED_TYPES = [
int,
str,
bytes,
dict,
float,
list,
tuple,
bool,
]
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from json."""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
contents = yaml_utils.read_json(filepath)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
def handle_return(self, data: Any) -> None:
"""Handles basic built-in types and stores them as json"""
super().handle_return(data)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
yaml_utils.write_json(filepath, data)
handle_input(self, data_type)
Reads basic primitive types from json.
Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from json."""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
contents = yaml_utils.read_json(filepath)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
handle_return(self, data)
Handles basic built-in types and stores them as json
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
"""Handles basic built-in types and stores them as json"""
super().handle_return(data)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
yaml_utils.write_json(filepath, data)
default_materializer_registry
MaterializerRegistry
Matches a python type to a default materializer.
Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
"""Matches a python type to a default materializer."""
def __init__(self) -> None:
self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of an object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"{key} already registered with "
f"{self.materializer_types[key]}. Cannot register {type_}."
)
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of an object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of an object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
"""
if key in self.materializer_types:
return self.materializer_types[key]
else:
raise KeyError(
f"Type {key} does not have a default `Materializer`! Please "
f"specify your own `Materializer`."
)
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types."""
return self.materializer_types
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type."""
return key in self.materializer_types
__getitem__(self, key)
special
Get a single materializers based on the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of an object. |
required |
Returns:
Type | Description |
---|---|
Type[BaseMaterializer] |
|
Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of an object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
"""
if key in self.materializer_types:
return self.materializer_types[key]
else:
raise KeyError(
f"Type {key} does not have a default `Materializer`! Please "
f"specify your own `Materializer`."
)
get_materializer_types(self)
Get all registered materializer types.
Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types."""
return self.materializer_types
is_registered(self, key)
Returns if a materializer class is registered for the given type.
Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type."""
return key in self.materializer_types
register_and_overwrite_type(self, key, type_)
Registers a new materializer and also overwrites a default if set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of an object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of an object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)
Registers a new materializer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of an object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of an object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"{key} already registered with "
f"{self.materializer_types[key]}. Cannot register {type_}."
)
numpy_materializer
NumpyMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = [np.ndarray]
ASSOCIATED_ARTIFACT_TYPES = [DataArtifact]
def handle_input(self, data_type: Type[Any]) -> np.ndarray:
"""Reads numpy array from parquet file."""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
def handle_return(self, arr: np.ndarray) -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
handle_input(self, data_type)
Reads numpy array from parquet file.
Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> np.ndarray:
"""Reads numpy array from parquet file."""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
handle_return(self, arr)
Writes a np.ndarray to the artifact store as a parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
ndarray |
The numpy array to write. |
required |
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: np.ndarray) -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
pandas_materializer
PandasMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = [pd.DataFrame]
ASSOCIATED_ARTIFACT_TYPES = [
DataArtifact,
StatisticsArtifact,
SchemaArtifact,
]
def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
"""Reads pd.Dataframe from a parquet file."""
super().handle_input(data_type)
return pd.read_parquet(
os.path.join(self.artifact.uri, DEFAULT_FILENAME)
)
def handle_return(self, df: pd.DataFrame) -> None:
"""Writes a pandas dataframe to the specified filename.
Args:
df: The pandas dataframe to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
df.to_parquet(filepath, compression=COMPRESSION_TYPE)
handle_input(self, data_type)
Reads pd.Dataframe from a parquet file.
Source code in zenml/materializers/pandas_materializer.py
def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
"""Reads pd.Dataframe from a parquet file."""
super().handle_input(data_type)
return pd.read_parquet(
os.path.join(self.artifact.uri, DEFAULT_FILENAME)
)
handle_return(self, df)
Writes a pandas dataframe to the specified filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The pandas dataframe to write. |
required |
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: pd.DataFrame) -> None:
"""Writes a pandas dataframe to the specified filename.
Args:
df: The pandas dataframe to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
df.to_parquet(filepath, compression=COMPRESSION_TYPE)