Skip to content

Polars

zenml.integrations.polars special

Initialization of the Polars integration.

PolarsIntegration (Integration)

Definition of Polars integration for ZenML.

Source code in zenml/integrations/polars/__init__.py
class PolarsIntegration(Integration):
    """Definition of Polars integration for ZenML."""

    NAME = POLARS
    REQUIREMENTS = [
        "polars>=0.19.5",
        "pyarrow>=12.0.0"
    ]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.polars import materializers  # noqa

activate() classmethod

Activates the integration.

Source code in zenml/integrations/polars/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.polars import materializers  # noqa

materializers special

Initialization for the Polars materializers.

dataframe_materializer

Polars materializer.

PolarsMaterializer (BaseMaterializer)

Materializer to read/write Polars dataframes.

Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
class PolarsMaterializer(BaseMaterializer):
    """Materializer to read/write Polars dataframes."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        pl.DataFrame,
        pl.Series,
    )
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA

    def load(self, data_type: Type[Any]) -> Any:
        """Reads and returns Polars data after copying it to temporary path.

        Args:
            data_type: The type of the data to read.

        Returns:
            A Polars data frame or series.
        """
        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()

        # Copy from artifact store to temporary directory
        io_utils.copy_dir(self.uri, temp_dir.name)

        # Load the data from the temporary directory
        table = pq.read_table(
            os.path.join(temp_dir.name, "dataframe.parquet").replace("\\", "/")
        )

        # If the data is of type pl.Series, convert it back to a pyarrow array
        # instead of a table.
        if (
            table.schema.metadata
            and b"zenml_is_pl_series" in table.schema.metadata
        ):
            isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
            isinstance_series = bool.from_bytes(isinstance_bytes, "big")
            if isinstance_series:
                table = table.column(0)

        # Convert the table to a Polars data frame or series
        data = pl.from_arrow(table)

        # Cleanup and return
        fileio.rmtree(temp_dir.name)

        return data

    def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
        """Writes Polars data to the artifact store.

        Args:
            data: The data to write.

        Raises:
            TypeError: If the data is not of type pl.DataFrame or pl.Series.
        """
        # Data type check
        if not isinstance(data, self.ASSOCIATED_TYPES):
            raise TypeError(
                f"Expected data of type {self.ASSOCIATED_TYPES}, "
                f"got {type(data)}"
            )

        # Convert the data to an Apache Arrow Table
        if isinstance(data, pl.DataFrame):
            table = data.to_arrow()
        else:
            # Construct a PyArrow Table with schema from the individual pl.Series
            # array if it is a single pl.Series.
            array = data.to_arrow()
            table = pa.Table.from_arrays([array], names=[data.name])

        # Register whether data is of type pl.Series, so that the materializer read step can
        # convert it back appropriately.
        isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
        table = table.replace_schema_metadata(
            {b"zenml_is_pl_series": isinstance_bytes}
        )

        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()

        # Write the table to a Parquet file
        path = os.path.join(temp_dir.name, "dataframe.parquet").replace(
            "\\", "/"
        )
        pq.write_table(table, path)  # Uses lz4 compression by default
        io_utils.copy_dir(temp_dir.name, self.uri)

        # Remove the temporary directory
        fileio.rmtree(temp_dir.name)
load(self, data_type)

Reads and returns Polars data after copying it to temporary path.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

A Polars data frame or series.

Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
def load(self, data_type: Type[Any]) -> Any:
    """Reads and returns Polars data after copying it to temporary path.

    Args:
        data_type: The type of the data to read.

    Returns:
        A Polars data frame or series.
    """
    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()

    # Copy from artifact store to temporary directory
    io_utils.copy_dir(self.uri, temp_dir.name)

    # Load the data from the temporary directory
    table = pq.read_table(
        os.path.join(temp_dir.name, "dataframe.parquet").replace("\\", "/")
    )

    # If the data is of type pl.Series, convert it back to a pyarrow array
    # instead of a table.
    if (
        table.schema.metadata
        and b"zenml_is_pl_series" in table.schema.metadata
    ):
        isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
        isinstance_series = bool.from_bytes(isinstance_bytes, "big")
        if isinstance_series:
            table = table.column(0)

    # Convert the table to a Polars data frame or series
    data = pl.from_arrow(table)

    # Cleanup and return
    fileio.rmtree(temp_dir.name)

    return data
save(self, data)

Writes Polars data to the artifact store.

Parameters:

Name Type Description Default
data Union[polars.DataFrame, polars.Series]

The data to write.

required

Exceptions:

Type Description
TypeError

If the data is not of type pl.DataFrame or pl.Series.

Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
    """Writes Polars data to the artifact store.

    Args:
        data: The data to write.

    Raises:
        TypeError: If the data is not of type pl.DataFrame or pl.Series.
    """
    # Data type check
    if not isinstance(data, self.ASSOCIATED_TYPES):
        raise TypeError(
            f"Expected data of type {self.ASSOCIATED_TYPES}, "
            f"got {type(data)}"
        )

    # Convert the data to an Apache Arrow Table
    if isinstance(data, pl.DataFrame):
        table = data.to_arrow()
    else:
        # Construct a PyArrow Table with schema from the individual pl.Series
        # array if it is a single pl.Series.
        array = data.to_arrow()
        table = pa.Table.from_arrays([array], names=[data.name])

    # Register whether data is of type pl.Series, so that the materializer read step can
    # convert it back appropriately.
    isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
    table = table.replace_schema_metadata(
        {b"zenml_is_pl_series": isinstance_bytes}
    )

    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()

    # Write the table to a Parquet file
    path = os.path.join(temp_dir.name, "dataframe.parquet").replace(
        "\\", "/"
    )
    pq.write_table(table, path)  # Uses lz4 compression by default
    io_utils.copy_dir(temp_dir.name, self.uri)

    # Remove the temporary directory
    fileio.rmtree(temp_dir.name)