Skip to content

Materializers

zenml.materializers special

Materializers are used to convert a ZenML artifact into a specific format. They are most often used to handle the input or output of ZenML steps, and can be extended by building on the BaseMaterializer class.

base_materializer

BaseMaterializer

Base Materializer to realize artifact data.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
    """Base Materializer to realize artifact data."""

    ASSOCIATED_ARTIFACT_TYPES: ClassVar[List[Type["BaseArtifact"]]] = []
    ASSOCIATED_TYPES: ClassVar[List[Type[Any]]] = []

    def __init__(self, artifact: "BaseArtifact"):
        """Initializes a materializer with the given artifact."""
        self.artifact = artifact

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Write logic here to handle input of the step function.

        Args:
            data_type: What type the input should be materialized as.
        Returns:
            Any object that is to be passed into the relevant artifact in the
            step.
        """
        # TODO [ENG-140]: Add type checking for materializer handle_input
        # if data_type not in self.ASSOCIATED_TYPES:
        #     raise ValueError(
        #         f"Data type {data_type} not supported by materializer "
        #         f"{self.__name__}. Supported types: {self.ASSOCIATED_TYPES}"
        #     )

    def handle_return(self, data: Any) -> None:
        """Write logic here to handle return of the step function.

        Args:
            Any object that is specified as an input artifact of the step.
        """
        # TODO [ENG-141]: Put proper type checking
        # if data_type not in self.ASSOCIATED_TYPES:
        #     raise ValueError(
        #         f"Data type {data_type} not supported by materializer "
        #         f"{self.__class__.__name__}. Supported types: "
        #         f"{self.ASSOCIATED_TYPES}"
        #     )
__init__(self, artifact) special

Initializes a materializer with the given artifact.

Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Initializes a materializer with the given artifact."""
    self.artifact = artifact
handle_input(self, data_type)

Write logic here to handle input of the step function.

Parameters:

Name Type Description Default
data_type Type[Any]

What type the input should be materialized as.

required

Returns:

Type Description
Any

Any object that is to be passed into the relevant artifact in the step.

Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Write logic here to handle input of the step function.

    Args:
        data_type: What type the input should be materialized as.
    Returns:
        Any object that is to be passed into the relevant artifact in the
        step.
    """
    # TODO [ENG-140]: Add type checking for materializer handle_input
    # if data_type not in self.ASSOCIATED_TYPES:
    #     raise ValueError(
    #         f"Data type {data_type} not supported by materializer "
    #         f"{self.__name__}. Supported types: {self.ASSOCIATED_TYPES}"
    #     )
handle_return(self, data)

Write logic here to handle return of the step function.

Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
    """Write logic here to handle return of the step function.

    Args:
        Any object that is specified as an input artifact of the step.
    """
    # TODO [ENG-141]: Put proper type checking
    # if data_type not in self.ASSOCIATED_TYPES:
    #     raise ValueError(
    #         f"Data type {data_type} not supported by materializer "
    #         f"{self.__class__.__name__}. Supported types: "
    #         f"{self.ASSOCIATED_TYPES}"
    #     )

BaseMaterializerMeta (type)

Metaclass responsible for registering different BaseMaterializer subclasses for reading/writing artifacts.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
    """Metaclass responsible for registering different BaseMaterializer
    subclasses for reading/writing artifacts."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseMaterializerMeta":
        """Creates a Materializer class and registers it at
        the `MaterializerRegistry`."""
        cls = cast(
            Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
        )
        if name != "BaseMaterializer":
            assert cls.ASSOCIATED_TYPES, (
                "You should specify a list of ASSOCIATED_TYPES when creating a "
                "Materializer!"
            )
            for associated_type in cls.ASSOCIATED_TYPES:
                default_materializer_registry.register_materializer_type(
                    associated_type, cls
                )

                if cls.ASSOCIATED_ARTIFACT_TYPES:
                    type_registry.register_integration(
                        associated_type, cls.ASSOCIATED_ARTIFACT_TYPES
                    )
                else:
                    from zenml.artifacts.base_artifact import BaseArtifact

                    type_registry.register_integration(
                        associated_type, [BaseArtifact]
                    )
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a Materializer class and registers it at the MaterializerRegistry.

Source code in zenml/materializers/base_materializer.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
    """Creates a Materializer class and registers it at
    the `MaterializerRegistry`."""
    cls = cast(
        Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
    )
    if name != "BaseMaterializer":
        assert cls.ASSOCIATED_TYPES, (
            "You should specify a list of ASSOCIATED_TYPES when creating a "
            "Materializer!"
        )
        for associated_type in cls.ASSOCIATED_TYPES:
            default_materializer_registry.register_materializer_type(
                associated_type, cls
            )

            if cls.ASSOCIATED_ARTIFACT_TYPES:
                type_registry.register_integration(
                    associated_type, cls.ASSOCIATED_ARTIFACT_TYPES
                )
            else:
                from zenml.artifacts.base_artifact import BaseArtifact

                type_registry.register_integration(
                    associated_type, [BaseArtifact]
                )
    return cls

beam_materializer

BeamMaterializer (BaseMaterializer)

Materializer to read data to and from beam.

Source code in zenml/materializers/beam_materializer.py
class BeamMaterializer(BaseMaterializer):
    """Materializer to read data to and from beam."""

    ASSOCIATED_TYPES = [beam.PCollection]
    ASSOCIATED_ARTIFACT_TYPES = [DataArtifact]

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads all files inside the artifact directory and materializes them
        as a beam compatible output."""
        # TODO [ENG-138]: Implement beam reading
        super().handle_input(data_type)

    def handle_return(self, pipeline: beam.Pipeline) -> None:
        """Appends a beam.io.WriteToParquet at the end of a beam pipeline
        and therefore persists the results.

        Args:
            pipeline: A beam.pipeline object.
        """
        # TODO [ENG-139]: Implement beam writing
        super().handle_return(pipeline)
        pipeline | beam.ParDo()
        pipeline.run()
        # pipeline | beam.io.WriteToParquet(self.artifact.uri)
        # pipeline.run()
handle_input(self, data_type)

Reads all files inside the artifact directory and materializes them as a beam compatible output.

Source code in zenml/materializers/beam_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads all files inside the artifact directory and materializes them
    as a beam compatible output."""
    # TODO [ENG-138]: Implement beam reading
    super().handle_input(data_type)
handle_return(self, pipeline)

Appends a beam.io.WriteToParquet at the end of a beam pipeline and therefore persists the results.

Parameters:

Name Type Description Default
pipeline Pipeline

A beam.pipeline object.

required
Source code in zenml/materializers/beam_materializer.py
def handle_return(self, pipeline: beam.Pipeline) -> None:
    """Appends a beam.io.WriteToParquet at the end of a beam pipeline
    and therefore persists the results.

    Args:
        pipeline: A beam.pipeline object.
    """
    # TODO [ENG-139]: Implement beam writing
    super().handle_return(pipeline)
    pipeline | beam.ParDo()
    pipeline.run()
    # pipeline | beam.io.WriteToParquet(self.artifact.uri)
    # pipeline.run()

built_in_materializer

BuiltInMaterializer (BaseMaterializer)

Read/Write JSON files.

Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
    """Read/Write JSON files."""

    # TODO [LOW]: consider adding typing.Dict and typing.List
    # since these are the 'correct' way to annotate these types.

    ASSOCIATED_ARTIFACT_TYPES = [
        DataArtifact,
        DataAnalysisArtifact,
    ]
    ASSOCIATED_TYPES = [
        int,
        str,
        bytes,
        dict,
        float,
        list,
        tuple,
        bool,
    ]

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads basic primitive types from json."""
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        contents = yaml_utils.read_json(filepath)
        if type(contents) != data_type:
            # TODO [ENG-142]: Raise error or try to coerce
            logger.debug(
                f"Contents {contents} was type {type(contents)} but expected "
                f"{data_type}"
            )
        return contents

    def handle_return(self, data: Any) -> None:
        """Handles basic built-in types and stores them as json"""
        super().handle_return(data)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        yaml_utils.write_json(filepath, data)
handle_input(self, data_type)

Reads basic primitive types from json.

Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads basic primitive types from json."""
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    contents = yaml_utils.read_json(filepath)
    if type(contents) != data_type:
        # TODO [ENG-142]: Raise error or try to coerce
        logger.debug(
            f"Contents {contents} was type {type(contents)} but expected "
            f"{data_type}"
        )
    return contents
handle_return(self, data)

Handles basic built-in types and stores them as json

Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
    """Handles basic built-in types and stores them as json"""
    super().handle_return(data)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    yaml_utils.write_json(filepath, data)

default_materializer_registry

MaterializerRegistry

Matches a python type to a default materializer.

Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
    """Matches a python type to a default materializer."""

    def __init__(self) -> None:
        self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}

    def register_materializer_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer.

        Args:
            key: Indicates the type of an object.
            type_: A BaseMaterializer subclass.
        """
        if key not in self.materializer_types:
            self.materializer_types[key] = type_
            logger.debug(f"Registered materializer {type_} for {key}")

        else:
            logger.debug(
                f"{key} already registered with "
                f"{self.materializer_types[key]}. Cannot register {type_}."
            )

    def register_and_overwrite_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer and also overwrites a default if set.

        Args:
            key: Indicates the type of an object.
            type_: A BaseMaterializer subclass.
        """
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")

    def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
        """Get a single materializers based on the key.

        Args:
            key: Indicates the type of an object.

        Returns:
            `BaseMaterializer` subclass that was registered for this key.
        """
        if key in self.materializer_types:
            return self.materializer_types[key]
        else:
            raise KeyError(
                f"Type {key} does not have a default `Materializer`! Please "
                f"specify your own `Materializer`."
            )

    def get_materializer_types(
        self,
    ) -> Dict[Type[Any], Type["BaseMaterializer"]]:
        """Get all registered materializer types."""
        return self.materializer_types

    def is_registered(self, key: Type[Any]) -> bool:
        """Returns if a materializer class is registered for the given type."""
        return key in self.materializer_types
__getitem__(self, key) special

Get a single materializers based on the key.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of an object.

required

Returns:

Type Description
Type[BaseMaterializer]

BaseMaterializer subclass that was registered for this key.

Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
    """Get a single materializers based on the key.

    Args:
        key: Indicates the type of an object.

    Returns:
        `BaseMaterializer` subclass that was registered for this key.
    """
    if key in self.materializer_types:
        return self.materializer_types[key]
    else:
        raise KeyError(
            f"Type {key} does not have a default `Materializer`! Please "
            f"specify your own `Materializer`."
        )
get_materializer_types(self)

Get all registered materializer types.

Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
    self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
    """Get all registered materializer types."""
    return self.materializer_types
is_registered(self, key)

Returns if a materializer class is registered for the given type.

Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
    """Returns if a materializer class is registered for the given type."""
    return key in self.materializer_types
register_and_overwrite_type(self, key, type_)

Registers a new materializer and also overwrites a default if set.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of an object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer and also overwrites a default if set.

    Args:
        key: Indicates the type of an object.
        type_: A BaseMaterializer subclass.
    """
    self.materializer_types[key] = type_
    logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)

Registers a new materializer.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of an object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer.

    Args:
        key: Indicates the type of an object.
        type_: A BaseMaterializer subclass.
    """
    if key not in self.materializer_types:
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")

    else:
        logger.debug(
            f"{key} already registered with "
            f"{self.materializer_types[key]}. Cannot register {type_}."
        )

numpy_materializer

NumpyMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = [np.ndarray]
    ASSOCIATED_ARTIFACT_TYPES = [DataArtifact]

    def handle_input(self, data_type: Type[Any]) -> np.ndarray:
        """Reads numpy array from parquet file."""
        super().handle_input(data_type)
        shape_dict = yaml_utils.read_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME)
        )
        shape_tuple = tuple(shape_dict.values())
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
        ) as f:
            input_stream = pa.input_stream(f)
            data = pq.read_table(input_stream)
        vals = getattr(data.to_pandas(), DATA_VAR).values
        return np.reshape(vals, shape_tuple)

    def handle_return(self, arr: np.ndarray) -> None:
        """Writes a np.ndarray to the artifact store as a parquet file.

        Args:
            arr: The numpy array to write.
        """
        super().handle_return(arr)
        yaml_utils.write_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME),
            {str(i): x for i, x in enumerate(arr.shape)},
        )
        pa_table = pa.table({DATA_VAR: arr.flatten()})
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
        ) as f:
            stream = pa.output_stream(f)
            pq.write_table(pa_table, stream)
handle_input(self, data_type)

Reads numpy array from parquet file.

Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> np.ndarray:
    """Reads numpy array from parquet file."""
    super().handle_input(data_type)
    shape_dict = yaml_utils.read_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME)
    )
    shape_tuple = tuple(shape_dict.values())
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
    ) as f:
        input_stream = pa.input_stream(f)
        data = pq.read_table(input_stream)
    vals = getattr(data.to_pandas(), DATA_VAR).values
    return np.reshape(vals, shape_tuple)
handle_return(self, arr)

Writes a np.ndarray to the artifact store as a parquet file.

Parameters:

Name Type Description Default
arr ndarray

The numpy array to write.

required
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: np.ndarray) -> None:
    """Writes a np.ndarray to the artifact store as a parquet file.

    Args:
        arr: The numpy array to write.
    """
    super().handle_return(arr)
    yaml_utils.write_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME),
        {str(i): x for i, x in enumerate(arr.shape)},
    )
    pa_table = pa.table({DATA_VAR: arr.flatten()})
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
    ) as f:
        stream = pa.output_stream(f)
        pq.write_table(pa_table, stream)

pandas_materializer

PandasMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = [pd.DataFrame]
    ASSOCIATED_ARTIFACT_TYPES = [
        DataArtifact,
        StatisticsArtifact,
        SchemaArtifact,
    ]

    def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
        """Reads pd.Dataframe from a parquet file."""
        super().handle_input(data_type)
        return pd.read_parquet(
            os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        )

    def handle_return(self, df: pd.DataFrame) -> None:
        """Writes a pandas dataframe to the specified filename.

        Args:
            df: The pandas dataframe to write.
        """
        super().handle_return(df)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        df.to_parquet(filepath, compression=COMPRESSION_TYPE)
handle_input(self, data_type)

Reads pd.Dataframe from a parquet file.

Source code in zenml/materializers/pandas_materializer.py
def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
    """Reads pd.Dataframe from a parquet file."""
    super().handle_input(data_type)
    return pd.read_parquet(
        os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    )
handle_return(self, df)

Writes a pandas dataframe to the specified filename.

Parameters:

Name Type Description Default
df DataFrame

The pandas dataframe to write.

required
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: pd.DataFrame) -> None:
    """Writes a pandas dataframe to the specified filename.

    Args:
        df: The pandas dataframe to write.
    """
    super().handle_return(df)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    df.to_parquet(filepath, compression=COMPRESSION_TYPE)