Skip to content

Pandas

zenml.integrations.pandas special

Initialization of the Pandas integration.

PandasIntegration (Integration)

Definition of Pandas integration for ZenML.

Source code in zenml/integrations/pandas/__init__.py
class PandasIntegration(Integration):
    """Definition of Pandas integration for ZenML."""

    NAME = PANDAS
    REQUIREMENTS = ["pandas>=2.0.0"]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.pandas import materializers  # noqa

activate() classmethod

Activates the integration.

Source code in zenml/integrations/pandas/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.pandas import materializers  # noqa

materializers special

Initialization of the Pandas materializer.

pandas_materializer

Materializer for Pandas.

PandasMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        pd.DataFrame,
        pd.Series,
    )
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA

    def __init__(
        self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
    ):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
            artifact_store: The artifact store where the artifact data is stored.
        """
        super().__init__(uri, artifact_store)
        try:
            import pyarrow  # type: ignore # noqa

            self.pyarrow_exists = True
        except ImportError:
            self.pyarrow_exists = False
            logger.warning(
                "By default, the `PandasMaterializer` stores data as a "
                "`.csv` file. If you want to store data more efficiently, "
                "you can install `pyarrow` by running "
                "'`pip install pyarrow`'. This will allow `PandasMaterializer` "
                "to automatically store the data as a `.parquet` file instead."
            )
        finally:
            self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
            self.csv_path = os.path.join(self.uri, CSV_FILENAME)

    def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
        """Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.

        Args:
            data_type: The type of the data to read.

        Raises:
            ImportError: If pyarrow or fastparquet is not installed.

        Returns:
            The pandas dataframe or series.
        """
        if self.artifact_store.exists(self.parquet_path):
            if self.pyarrow_exists:
                with self.artifact_store.open(
                    self.parquet_path, mode="rb"
                ) as f:
                    df = pd.read_parquet(f)
            else:
                raise ImportError(
                    "You have an old version of a `PandasMaterializer` "
                    "data artifact stored in the artifact store "
                    "as a `.parquet` file, which requires `pyarrow` "
                    "for reading, You can install `pyarrow` by running "
                    "'`pip install pyarrow fastparquet`'."
                )
        else:
            with self.artifact_store.open(self.csv_path, mode="rb") as f:
                df = pd.read_csv(f, index_col=0, parse_dates=True)

        # validate the type of the data.
        def is_dataframe_or_series(
            df: Union[pd.DataFrame, pd.Series],
        ) -> Union[pd.DataFrame, pd.Series]:
            """Checks if the data is a `pd.DataFrame` or `pd.Series`.

            Args:
                df: The data to check.

            Returns:
                The data if it is a `pd.DataFrame` or `pd.Series`.
            """
            if issubclass(data_type, pd.Series):
                # Taking the first column if its a series as the assumption
                # is that there will only be one
                assert len(df.columns) == 1
                df = df[df.columns[0]]
                return df
            else:
                return df

        return is_dataframe_or_series(df)

    def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
        """Writes a pandas dataframe or series to the specified filename.

        Args:
            df: The pandas dataframe or series to write.
        """
        if isinstance(df, pd.Series):
            df = df.to_frame(name="series")

        if self.pyarrow_exists:
            with self.artifact_store.open(self.parquet_path, mode="wb") as f:
                df.to_parquet(f, compression=COMPRESSION_TYPE)
        else:
            with self.artifact_store.open(self.csv_path, mode="wb") as f:
                df.to_csv(f, index=True)

    def save_visualizations(
        self, df: Union[pd.DataFrame, pd.Series]
    ) -> Dict[str, VisualizationType]:
        """Save visualizations of the given pandas dataframe or series.

        Args:
            df: The pandas dataframe or series to visualize.

        Returns:
            A dictionary of visualization URIs and their types.
        """
        describe_uri = os.path.join(self.uri, "describe.csv")
        describe_uri = describe_uri.replace("\\", "/")
        with self.artifact_store.open(describe_uri, mode="wb") as f:
            df.describe().to_csv(f)
        return {describe_uri: VisualizationType.CSV}

    def extract_metadata(
        self, df: Union[pd.DataFrame, pd.Series]
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given pandas dataframe or series.

        Args:
            df: The pandas dataframe or series to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        pandas_metadata: Dict[str, "MetadataType"] = {"shape": df.shape}

        if isinstance(df, pd.Series):
            pandas_metadata["dtype"] = DType(df.dtype.type)
            pandas_metadata["mean"] = float(df.mean().item())
            pandas_metadata["std"] = float(df.std().item())
            pandas_metadata["min"] = float(df.min().item())
            pandas_metadata["max"] = float(df.max().item())

        else:
            pandas_metadata["dtype"] = {
                str(key): DType(value.type) for key, value in df.dtypes.items()
            }
            for stat_name, stat in {
                "mean": df.mean,
                "std": df.std,
                "min": df.min,
                "max": df.max,
            }.items():
                pandas_metadata[stat_name] = {
                    str(key): float(value)
                    for key, value in stat(numeric_only=True).to_dict().items()
                }

        return pandas_metadata
__init__(self, uri, artifact_store=None) special

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
artifact_store Optional[zenml.artifact_stores.base_artifact_store.BaseArtifactStore]

The artifact store where the artifact data is stored.

None
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
        artifact_store: The artifact store where the artifact data is stored.
    """
    super().__init__(uri, artifact_store)
    try:
        import pyarrow  # type: ignore # noqa

        self.pyarrow_exists = True
    except ImportError:
        self.pyarrow_exists = False
        logger.warning(
            "By default, the `PandasMaterializer` stores data as a "
            "`.csv` file. If you want to store data more efficiently, "
            "you can install `pyarrow` by running "
            "'`pip install pyarrow`'. This will allow `PandasMaterializer` "
            "to automatically store the data as a `.parquet` file instead."
        )
    finally:
        self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
        self.csv_path = os.path.join(self.uri, CSV_FILENAME)
extract_metadata(self, df)

Extract metadata from the given pandas dataframe or series.

Parameters:

Name Type Description Default
df Union[pandas.DataFrame, pandas.Series]

The pandas dataframe or series to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def extract_metadata(
    self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given pandas dataframe or series.

    Args:
        df: The pandas dataframe or series to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    pandas_metadata: Dict[str, "MetadataType"] = {"shape": df.shape}

    if isinstance(df, pd.Series):
        pandas_metadata["dtype"] = DType(df.dtype.type)
        pandas_metadata["mean"] = float(df.mean().item())
        pandas_metadata["std"] = float(df.std().item())
        pandas_metadata["min"] = float(df.min().item())
        pandas_metadata["max"] = float(df.max().item())

    else:
        pandas_metadata["dtype"] = {
            str(key): DType(value.type) for key, value in df.dtypes.items()
        }
        for stat_name, stat in {
            "mean": df.mean,
            "std": df.std,
            "min": df.min,
            "max": df.max,
        }.items():
            pandas_metadata[stat_name] = {
                str(key): float(value)
                for key, value in stat(numeric_only=True).to_dict().items()
            }

    return pandas_metadata
load(self, data_type)

Reads pd.DataFrame or pd.Series from a .parquet or .csv file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Exceptions:

Type Description
ImportError

If pyarrow or fastparquet is not installed.

Returns:

Type Description
Union[pandas.DataFrame, pandas.Series]

The pandas dataframe or series.

Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
    """Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.

    Args:
        data_type: The type of the data to read.

    Raises:
        ImportError: If pyarrow or fastparquet is not installed.

    Returns:
        The pandas dataframe or series.
    """
    if self.artifact_store.exists(self.parquet_path):
        if self.pyarrow_exists:
            with self.artifact_store.open(
                self.parquet_path, mode="rb"
            ) as f:
                df = pd.read_parquet(f)
        else:
            raise ImportError(
                "You have an old version of a `PandasMaterializer` "
                "data artifact stored in the artifact store "
                "as a `.parquet` file, which requires `pyarrow` "
                "for reading, You can install `pyarrow` by running "
                "'`pip install pyarrow fastparquet`'."
            )
    else:
        with self.artifact_store.open(self.csv_path, mode="rb") as f:
            df = pd.read_csv(f, index_col=0, parse_dates=True)

    # validate the type of the data.
    def is_dataframe_or_series(
        df: Union[pd.DataFrame, pd.Series],
    ) -> Union[pd.DataFrame, pd.Series]:
        """Checks if the data is a `pd.DataFrame` or `pd.Series`.

        Args:
            df: The data to check.

        Returns:
            The data if it is a `pd.DataFrame` or `pd.Series`.
        """
        if issubclass(data_type, pd.Series):
            # Taking the first column if its a series as the assumption
            # is that there will only be one
            assert len(df.columns) == 1
            df = df[df.columns[0]]
            return df
        else:
            return df

    return is_dataframe_or_series(df)
save(self, df)

Writes a pandas dataframe or series to the specified filename.

Parameters:

Name Type Description Default
df Union[pandas.DataFrame, pandas.Series]

The pandas dataframe or series to write.

required
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
    """Writes a pandas dataframe or series to the specified filename.

    Args:
        df: The pandas dataframe or series to write.
    """
    if isinstance(df, pd.Series):
        df = df.to_frame(name="series")

    if self.pyarrow_exists:
        with self.artifact_store.open(self.parquet_path, mode="wb") as f:
            df.to_parquet(f, compression=COMPRESSION_TYPE)
    else:
        with self.artifact_store.open(self.csv_path, mode="wb") as f:
            df.to_csv(f, index=True)
save_visualizations(self, df)

Save visualizations of the given pandas dataframe or series.

Parameters:

Name Type Description Default
df Union[pandas.DataFrame, pandas.Series]

The pandas dataframe or series to visualize.

required

Returns:

Type Description
Dict[str, zenml.enums.VisualizationType]

A dictionary of visualization URIs and their types.

Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def save_visualizations(
    self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, VisualizationType]:
    """Save visualizations of the given pandas dataframe or series.

    Args:
        df: The pandas dataframe or series to visualize.

    Returns:
        A dictionary of visualization URIs and their types.
    """
    describe_uri = os.path.join(self.uri, "describe.csv")
    describe_uri = describe_uri.replace("\\", "/")
    with self.artifact_store.open(describe_uri, mode="wb") as f:
        df.describe().to_csv(f)
    return {describe_uri: VisualizationType.CSV}