Pandas
zenml.integrations.pandas
special
Initialization of the Pandas integration.
PandasIntegration (Integration)
Definition of Pandas integration for ZenML.
Source code in zenml/integrations/pandas/__init__.py
class PandasIntegration(Integration):
"""Definition of Pandas integration for ZenML."""
NAME = PANDAS
REQUIREMENTS = ["pandas>=2.0.0"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.pandas import materializers # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/pandas/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.pandas import materializers # noqa
materializers
special
Initialization of the Pandas materializer.
pandas_materializer
Materializer for Pandas.
PandasMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
pd.DataFrame,
pd.Series,
)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
def __init__(
self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
"""Define `self.data_path`.
Args:
uri: The URI where the artifact data is stored.
artifact_store: The artifact store where the artifact data is stored.
"""
super().__init__(uri, artifact_store)
try:
import pyarrow # type: ignore # noqa
self.pyarrow_exists = True
except ImportError:
self.pyarrow_exists = False
logger.warning(
"By default, the `PandasMaterializer` stores data as a "
"`.csv` file. If you want to store data more efficiently, "
"you can install `pyarrow` by running "
"'`pip install pyarrow`'. This will allow `PandasMaterializer` "
"to automatically store the data as a `.parquet` file instead."
)
finally:
self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
self.csv_path = os.path.join(self.uri, CSV_FILENAME)
def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
"""Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.
Args:
data_type: The type of the data to read.
Raises:
ImportError: If pyarrow or fastparquet is not installed.
Returns:
The pandas dataframe or series.
"""
if self.artifact_store.exists(self.parquet_path):
if self.pyarrow_exists:
with self.artifact_store.open(
self.parquet_path, mode="rb"
) as f:
df = pd.read_parquet(f)
else:
raise ImportError(
"You have an old version of a `PandasMaterializer` "
"data artifact stored in the artifact store "
"as a `.parquet` file, which requires `pyarrow` "
"for reading, You can install `pyarrow` by running "
"'`pip install pyarrow fastparquet`'."
)
else:
with self.artifact_store.open(self.csv_path, mode="rb") as f:
df = pd.read_csv(f, index_col=0, parse_dates=True)
# validate the type of the data.
def is_dataframe_or_series(
df: Union[pd.DataFrame, pd.Series],
) -> Union[pd.DataFrame, pd.Series]:
"""Checks if the data is a `pd.DataFrame` or `pd.Series`.
Args:
df: The data to check.
Returns:
The data if it is a `pd.DataFrame` or `pd.Series`.
"""
if issubclass(data_type, pd.Series):
# Taking the first column if its a series as the assumption
# is that there will only be one
assert len(df.columns) == 1
df = df[df.columns[0]]
return df
else:
return df
return is_dataframe_or_series(df)
def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename.
Args:
df: The pandas dataframe or series to write.
"""
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
if self.pyarrow_exists:
with self.artifact_store.open(self.parquet_path, mode="wb") as f:
df.to_parquet(f, compression=COMPRESSION_TYPE)
else:
with self.artifact_store.open(self.csv_path, mode="wb") as f:
df.to_csv(f, index=True)
def save_visualizations(
self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, VisualizationType]:
"""Save visualizations of the given pandas dataframe or series.
Args:
df: The pandas dataframe or series to visualize.
Returns:
A dictionary of visualization URIs and their types.
"""
describe_uri = os.path.join(self.uri, "describe.csv")
describe_uri = describe_uri.replace("\\", "/")
with self.artifact_store.open(describe_uri, mode="wb") as f:
df.describe().to_csv(f)
return {describe_uri: VisualizationType.CSV}
def extract_metadata(
self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given pandas dataframe or series.
Args:
df: The pandas dataframe or series to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
pandas_metadata: Dict[str, "MetadataType"] = {"shape": df.shape}
if isinstance(df, pd.Series):
pandas_metadata["dtype"] = DType(df.dtype.type)
pandas_metadata["mean"] = float(df.mean().item())
pandas_metadata["std"] = float(df.std().item())
pandas_metadata["min"] = float(df.min().item())
pandas_metadata["max"] = float(df.max().item())
else:
pandas_metadata["dtype"] = {
str(key): DType(value.type) for key, value in df.dtypes.items()
}
for stat_name, stat in {
"mean": df.mean,
"std": df.std,
"min": df.min,
"max": df.max,
}.items():
pandas_metadata[stat_name] = {
str(key): float(value)
for key, value in stat(numeric_only=True).to_dict().items()
}
return pandas_metadata
__init__(self, uri, artifact_store=None)
special
Define self.data_path
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
str |
The URI where the artifact data is stored. |
required |
artifact_store |
Optional[zenml.artifact_stores.base_artifact_store.BaseArtifactStore] |
The artifact store where the artifact data is stored. |
None |
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def __init__(
self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
"""Define `self.data_path`.
Args:
uri: The URI where the artifact data is stored.
artifact_store: The artifact store where the artifact data is stored.
"""
super().__init__(uri, artifact_store)
try:
import pyarrow # type: ignore # noqa
self.pyarrow_exists = True
except ImportError:
self.pyarrow_exists = False
logger.warning(
"By default, the `PandasMaterializer` stores data as a "
"`.csv` file. If you want to store data more efficiently, "
"you can install `pyarrow` by running "
"'`pip install pyarrow`'. This will allow `PandasMaterializer` "
"to automatically store the data as a `.parquet` file instead."
)
finally:
self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
self.csv_path = os.path.join(self.uri, CSV_FILENAME)
extract_metadata(self, df)
Extract metadata from the given pandas dataframe or series.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Union[pandas.DataFrame, pandas.Series] |
The pandas dataframe or series to extract metadata from. |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
The extracted metadata as a dictionary. |
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def extract_metadata(
self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given pandas dataframe or series.
Args:
df: The pandas dataframe or series to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
pandas_metadata: Dict[str, "MetadataType"] = {"shape": df.shape}
if isinstance(df, pd.Series):
pandas_metadata["dtype"] = DType(df.dtype.type)
pandas_metadata["mean"] = float(df.mean().item())
pandas_metadata["std"] = float(df.std().item())
pandas_metadata["min"] = float(df.min().item())
pandas_metadata["max"] = float(df.max().item())
else:
pandas_metadata["dtype"] = {
str(key): DType(value.type) for key, value in df.dtypes.items()
}
for stat_name, stat in {
"mean": df.mean,
"std": df.std,
"min": df.min,
"max": df.max,
}.items():
pandas_metadata[stat_name] = {
str(key): float(value)
for key, value in stat(numeric_only=True).to_dict().items()
}
return pandas_metadata
load(self, data_type)
Reads pd.DataFrame
or pd.Series
from a .parquet
or .csv
file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Exceptions:
Type | Description |
---|---|
ImportError |
If pyarrow or fastparquet is not installed. |
Returns:
Type | Description |
---|---|
Union[pandas.DataFrame, pandas.Series] |
The pandas dataframe or series. |
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
"""Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.
Args:
data_type: The type of the data to read.
Raises:
ImportError: If pyarrow or fastparquet is not installed.
Returns:
The pandas dataframe or series.
"""
if self.artifact_store.exists(self.parquet_path):
if self.pyarrow_exists:
with self.artifact_store.open(
self.parquet_path, mode="rb"
) as f:
df = pd.read_parquet(f)
else:
raise ImportError(
"You have an old version of a `PandasMaterializer` "
"data artifact stored in the artifact store "
"as a `.parquet` file, which requires `pyarrow` "
"for reading, You can install `pyarrow` by running "
"'`pip install pyarrow fastparquet`'."
)
else:
with self.artifact_store.open(self.csv_path, mode="rb") as f:
df = pd.read_csv(f, index_col=0, parse_dates=True)
# validate the type of the data.
def is_dataframe_or_series(
df: Union[pd.DataFrame, pd.Series],
) -> Union[pd.DataFrame, pd.Series]:
"""Checks if the data is a `pd.DataFrame` or `pd.Series`.
Args:
df: The data to check.
Returns:
The data if it is a `pd.DataFrame` or `pd.Series`.
"""
if issubclass(data_type, pd.Series):
# Taking the first column if its a series as the assumption
# is that there will only be one
assert len(df.columns) == 1
df = df[df.columns[0]]
return df
else:
return df
return is_dataframe_or_series(df)
save(self, df)
Writes a pandas dataframe or series to the specified filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Union[pandas.DataFrame, pandas.Series] |
The pandas dataframe or series to write. |
required |
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename.
Args:
df: The pandas dataframe or series to write.
"""
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
if self.pyarrow_exists:
with self.artifact_store.open(self.parquet_path, mode="wb") as f:
df.to_parquet(f, compression=COMPRESSION_TYPE)
else:
with self.artifact_store.open(self.csv_path, mode="wb") as f:
df.to_csv(f, index=True)
save_visualizations(self, df)
Save visualizations of the given pandas dataframe or series.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Union[pandas.DataFrame, pandas.Series] |
The pandas dataframe or series to visualize. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.enums.VisualizationType] |
A dictionary of visualization URIs and their types. |
Source code in zenml/integrations/pandas/materializers/pandas_materializer.py
def save_visualizations(
self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, VisualizationType]:
"""Save visualizations of the given pandas dataframe or series.
Args:
df: The pandas dataframe or series to visualize.
Returns:
A dictionary of visualization URIs and their types.
"""
describe_uri = os.path.join(self.uri, "describe.csv")
describe_uri = describe_uri.replace("\\", "/")
with self.artifact_store.open(describe_uri, mode="wb") as f:
df.describe().to_csv(f)
return {describe_uri: VisualizationType.CSV}