Polars
        zenml.integrations.polars
  
      special
  
    Initialization of the Polars integration.
        
PolarsIntegration            (Integration)
        
    Definition of Polars integration for ZenML.
Source code in zenml/integrations/polars/__init__.py
          class PolarsIntegration(Integration):
    """Definition of Polars integration for ZenML."""
    NAME = POLARS
    REQUIREMENTS = [
        "polars>=0.19.5",
        "pyarrow>=12.0.0"
    ]
    REQUIREMENTS_IGNORED_ON_UNINSTALL = ["pyarrow"]
    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.polars import materializers  # noqa
activate()
  
      classmethod
  
    Activates the integration.
Source code in zenml/integrations/polars/__init__.py
          @classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.polars import materializers  # noqa
        materializers
  
      special
  
    Initialization for the Polars materializers.
        dataframe_materializer
    Polars materializer.
        
PolarsMaterializer            (BaseMaterializer)
        
    Materializer to read/write Polars dataframes.
Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
          class PolarsMaterializer(BaseMaterializer):
    """Materializer to read/write Polars dataframes."""
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        pl.DataFrame,
        pl.Series,
    )
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
    def load(self, data_type: Type[Any]) -> Any:
        """Reads and returns Polars data after copying it to temporary path.
        Args:
            data_type: The type of the data to read.
        Returns:
            A Polars data frame or series.
        """
        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()
        # Copy from artifact store to temporary directory
        io_utils.copy_dir(self.uri, temp_dir.name)
        # Load the data from the temporary directory
        table = pq.read_table(
            os.path.join(temp_dir.name, "dataframe.parquet").replace("\\", "/")
        )
        # If the data is of type pl.Series, convert it back to a pyarrow array
        # instead of a table.
        if (
            table.schema.metadata
            and b"zenml_is_pl_series" in table.schema.metadata
        ):
            isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
            isinstance_series = bool.from_bytes(isinstance_bytes, "big")
            if isinstance_series:
                table = table.column(0)
        # Convert the table to a Polars data frame or series
        data = pl.from_arrow(table)
        # Cleanup and return
        fileio.rmtree(temp_dir.name)
        return data
    def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
        """Writes Polars data to the artifact store.
        Args:
            data: The data to write.
        Raises:
            TypeError: If the data is not of type pl.DataFrame or pl.Series.
        """
        # Data type check
        if not isinstance(data, self.ASSOCIATED_TYPES):
            raise TypeError(
                f"Expected data of type {self.ASSOCIATED_TYPES}, "
                f"got {type(data)}"
            )
        # Convert the data to an Apache Arrow Table
        if isinstance(data, pl.DataFrame):
            table = data.to_arrow()
        else:
            # Construct a PyArrow Table with schema from the individual pl.Series
            # array if it is a single pl.Series.
            array = data.to_arrow()
            table = pa.Table.from_arrays([array], names=[data.name])
        # Register whether data is of type pl.Series, so that the materializer read step can
        # convert it back appropriately.
        isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
        table = table.replace_schema_metadata(
            {b"zenml_is_pl_series": isinstance_bytes}
        )
        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()
        # Write the table to a Parquet file
        path = os.path.join(temp_dir.name, "dataframe.parquet").replace(
            "\\", "/"
        )
        pq.write_table(table, path)  # Uses lz4 compression by default
        io_utils.copy_dir(temp_dir.name, self.uri)
        # Remove the temporary directory
        fileio.rmtree(temp_dir.name)
load(self, data_type)
    Reads and returns Polars data after copying it to temporary path.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| data_type | Type[Any] | The type of the data to read. | required | 
Returns:
| Type | Description | 
|---|---|
| Any | A Polars data frame or series. | 
Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
          def load(self, data_type: Type[Any]) -> Any:
    """Reads and returns Polars data after copying it to temporary path.
    Args:
        data_type: The type of the data to read.
    Returns:
        A Polars data frame or series.
    """
    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()
    # Copy from artifact store to temporary directory
    io_utils.copy_dir(self.uri, temp_dir.name)
    # Load the data from the temporary directory
    table = pq.read_table(
        os.path.join(temp_dir.name, "dataframe.parquet").replace("\\", "/")
    )
    # If the data is of type pl.Series, convert it back to a pyarrow array
    # instead of a table.
    if (
        table.schema.metadata
        and b"zenml_is_pl_series" in table.schema.metadata
    ):
        isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
        isinstance_series = bool.from_bytes(isinstance_bytes, "big")
        if isinstance_series:
            table = table.column(0)
    # Convert the table to a Polars data frame or series
    data = pl.from_arrow(table)
    # Cleanup and return
    fileio.rmtree(temp_dir.name)
    return data
save(self, data)
    Writes Polars data to the artifact store.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| data | Union[polars.DataFrame, polars.Series] | The data to write. | required | 
Exceptions:
| Type | Description | 
|---|---|
| TypeError | If the data is not of type pl.DataFrame or pl.Series. | 
Source code in zenml/integrations/polars/materializers/dataframe_materializer.py
          def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
    """Writes Polars data to the artifact store.
    Args:
        data: The data to write.
    Raises:
        TypeError: If the data is not of type pl.DataFrame or pl.Series.
    """
    # Data type check
    if not isinstance(data, self.ASSOCIATED_TYPES):
        raise TypeError(
            f"Expected data of type {self.ASSOCIATED_TYPES}, "
            f"got {type(data)}"
        )
    # Convert the data to an Apache Arrow Table
    if isinstance(data, pl.DataFrame):
        table = data.to_arrow()
    else:
        # Construct a PyArrow Table with schema from the individual pl.Series
        # array if it is a single pl.Series.
        array = data.to_arrow()
        table = pa.Table.from_arrays([array], names=[data.name])
    # Register whether data is of type pl.Series, so that the materializer read step can
    # convert it back appropriately.
    isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
    table = table.replace_schema_metadata(
        {b"zenml_is_pl_series": isinstance_bytes}
    )
    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()
    # Write the table to a Parquet file
    path = os.path.join(temp_dir.name, "dataframe.parquet").replace(
        "\\", "/"
    )
    pq.write_table(table, path)  # Uses lz4 compression by default
    io_utils.copy_dir(temp_dir.name, self.uri)
    # Remove the temporary directory
    fileio.rmtree(temp_dir.name)