Skip to content

Polars

zenml.integrations.polars special

Initialization of the Polars integration.

PolarsIntegration (Integration)

Definition of Polars integration for ZenML.

Source code in zenml/integrations/polars/__init__.py
class PolarsIntegration(Integration):
    """Definition of Polars integration for ZenML."""

    NAME = POLARS
    REQUIREMENTS = [
        "polars>=0.19.5",
        "pyarrow>=12.0.0"
    ]
    REQUIREMENTS_IGNORED_ON_UNINSTALL = ["pyarrow"]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.polars import materializers  # noqa

activate() classmethod

Activates the integration.

Source code in zenml/integrations/polars/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.polars import materializers  # noqa

materializers special

Initialization for the Polars materializers.

dataframe_materializer

Polars materializer.

PolarsMaterializer (BaseMaterializer)

Materializer to read/write Polars dataframes.

Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
class PolarsMaterializer(BaseMaterializer):
    """Materializer to read/write Polars dataframes."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        pl.DataFrame,
        pl.Series,
    )
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA

    def load(self, data_type: Type[Any]) -> Any:
        """Reads and returns Polars data after copying it to temporary path.

        Args:
            data_type: The type of the data to read.

        Returns:
            A Polars data frame or series.
        """
        with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
            io_utils.copy_dir(self.uri, temp_dir)

            # Load the data from the temporary directory
            table = pq.read_table(
                os.path.join(temp_dir, "dataframe.parquet").replace("\\", "/")
            )

            # If the data is of type pl.Series, convert it back to a pyarrow array
            # instead of a table.
            if (
                table.schema.metadata
                and b"zenml_is_pl_series" in table.schema.metadata
            ):
                isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
                isinstance_series = bool.from_bytes(isinstance_bytes, "big")
                if isinstance_series:
                    table = table.column(0)

            # Convert the table to a Polars data frame or series
            data = pl.from_arrow(table)

            return data

    def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
        """Writes Polars data to the artifact store.

        Args:
            data: The data to write.

        Raises:
            TypeError: If the data is not of type pl.DataFrame or pl.Series.
        """
        # Data type check
        if not isinstance(data, self.ASSOCIATED_TYPES):
            raise TypeError(
                f"Expected data of type {self.ASSOCIATED_TYPES}, "
                f"got {type(data)}"
            )

        # Convert the data to an Apache Arrow Table
        if isinstance(data, pl.DataFrame):
            table = data.to_arrow()
        else:
            # Construct a PyArrow Table with schema from the individual pl.Series
            # array if it is a single pl.Series.
            array = data.to_arrow()
            table = pa.Table.from_arrays([array], names=[data.name])

        # Register whether data is of type pl.Series, so that the materializer read step can
        # convert it back appropriately.
        isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
        table = table.replace_schema_metadata(
            {b"zenml_is_pl_series": isinstance_bytes}
        )

        with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
            # Write the table to a Parquet file
            path = os.path.join(temp_dir, "dataframe.parquet").replace(
                "\\", "/"
            )
            pq.write_table(table, path)  # Uses lz4 compression by default
            io_utils.copy_dir(temp_dir, self.uri)
load(self, data_type)

Reads and returns Polars data after copying it to temporary path.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

A Polars data frame or series.

Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
def load(self, data_type: Type[Any]) -> Any:
    """Reads and returns Polars data after copying it to temporary path.

    Args:
        data_type: The type of the data to read.

    Returns:
        A Polars data frame or series.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        io_utils.copy_dir(self.uri, temp_dir)

        # Load the data from the temporary directory
        table = pq.read_table(
            os.path.join(temp_dir, "dataframe.parquet").replace("\\", "/")
        )

        # If the data is of type pl.Series, convert it back to a pyarrow array
        # instead of a table.
        if (
            table.schema.metadata
            and b"zenml_is_pl_series" in table.schema.metadata
        ):
            isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
            isinstance_series = bool.from_bytes(isinstance_bytes, "big")
            if isinstance_series:
                table = table.column(0)

        # Convert the table to a Polars data frame or series
        data = pl.from_arrow(table)

        return data
save(self, data)

Writes Polars data to the artifact store.

Parameters:

Name Type Description Default
data Union[polars.DataFrame, polars.Series]

The data to write.

required

Exceptions:

Type Description
TypeError

If the data is not of type pl.DataFrame or pl.Series.

Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
    """Writes Polars data to the artifact store.

    Args:
        data: The data to write.

    Raises:
        TypeError: If the data is not of type pl.DataFrame or pl.Series.
    """
    # Data type check
    if not isinstance(data, self.ASSOCIATED_TYPES):
        raise TypeError(
            f"Expected data of type {self.ASSOCIATED_TYPES}, "
            f"got {type(data)}"
        )

    # Convert the data to an Apache Arrow Table
    if isinstance(data, pl.DataFrame):
        table = data.to_arrow()
    else:
        # Construct a PyArrow Table with schema from the individual pl.Series
        # array if it is a single pl.Series.
        array = data.to_arrow()
        table = pa.Table.from_arrays([array], names=[data.name])

    # Register whether data is of type pl.Series, so that the materializer read step can
    # convert it back appropriately.
    isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
    table = table.replace_schema_metadata(
        {b"zenml_is_pl_series": isinstance_bytes}
    )

    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        # Write the table to a Parquet file
        path = os.path.join(temp_dir, "dataframe.parquet").replace(
            "\\", "/"
        )
        pq.write_table(table, path)  # Uses lz4 compression by default
        io_utils.copy_dir(temp_dir, self.uri)