Skip to content

Pandas

zenml.integrations.pandas

Initialization of the Pandas integration.

Attributes

PANDAS = 'pandas' module-attribute

Classes

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

PandasIntegration

Bases: Integration

Definition of Pandas integration for ZenML.

Functions
activate() -> None classmethod

Activates the integration.

Source code in src/zenml/integrations/pandas/__init__.py
26
27
28
29
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.pandas import materializers  # noqa

Modules

materializers

Initialization of the Pandas materializer.

Classes
Modules
pandas_materializer

Materializer for Pandas.

This materializer handles pandas DataFrame and Series objects.

Special features
  • Handles pandas DataFrames and Series with various data types
  • Provides helpful error messages for custom data type errors
  • Warns when custom data types are detected that might need additional libraries
Environment Variables

ZENML_PANDAS_SAMPLE_ROWS: Controls the number of sample rows to include in visualizations. Defaults to 10 if not set.

Classes
PandasMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read data to and from pandas.

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
artifact_store Optional[BaseArtifactStore]

The artifact store where the artifact data is stored.

None
Source code in src/zenml/integrations/pandas/materializers/pandas_materializer.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
        artifact_store: The artifact store where the artifact data is stored.
    """
    super().__init__(uri, artifact_store)
    try:
        import pyarrow  # type: ignore # noqa

        self.pyarrow_exists = True
    except ImportError:
        self.pyarrow_exists = False
        logger.warning(
            "By default, the `PandasMaterializer` stores data as a "
            "`.csv` file. If you want to store data more efficiently, "
            "you can install `pyarrow` by running "
            "'`pip install pyarrow`'. This will allow `PandasMaterializer` "
            "to automatically store the data as a `.parquet` file instead."
        )
    finally:
        self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
        self.csv_path = os.path.join(self.uri, CSV_FILENAME)
Functions
extract_metadata(df: Union[pd.DataFrame, pd.Series]) -> Dict[str, MetadataType]

Extract metadata from the given pandas dataframe or series.

Parameters:

Name Type Description Default
df Union[DataFrame, Series]

The pandas dataframe or series to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/integrations/pandas/materializers/pandas_materializer.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def extract_metadata(
    self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given pandas dataframe or series.

    Args:
        df: The pandas dataframe or series to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    # Store whether it's a Series for later reference
    is_series = isinstance(df, pd.Series)

    # Store original shape before conversion
    original_shape = df.shape

    # Convert Series to DataFrame for consistent handling of dtypes
    if is_series:
        series_obj = df  # Keep original Series for some calculations
        df = df.to_frame(name="series")

    pandas_metadata: Dict[str, "MetadataType"] = {"shape": original_shape}

    # Add information about custom data types to metadata
    custom_types = {}
    try:
        for col, dtype in df.dtypes.items():
            dtype_str = str(dtype)
            if not is_standard_dtype(dtype_str):
                col_name = "series" if is_series else str(col)
                custom_types[col_name] = dtype_str
                # Try to get module information if available
                try:
                    module_name = dtype.type.__module__
                    custom_types[f"{col_name}_module"] = module_name
                except (AttributeError, TypeError):
                    pass

        if custom_types:
            pandas_metadata["custom_types"] = custom_types
    except Exception as e:
        logger.debug(f"Error extracting custom type metadata: {e}")

    if is_series:
        # For Series, use the original series object for statistics
        pandas_metadata["dtype"] = DType(series_obj.dtype.type)
        pandas_metadata["mean"] = float(series_obj.mean().item())
        pandas_metadata["std"] = float(series_obj.std().item())
        pandas_metadata["min"] = float(series_obj.min().item())
        pandas_metadata["max"] = float(series_obj.max().item())

    else:
        pandas_metadata["dtype"] = {
            str(key): DType(value.type) for key, value in df.dtypes.items()
        }
        for stat_name, stat in {
            "mean": df.mean,
            "std": df.std,
            "min": df.min,
            "max": df.max,
        }.items():
            pandas_metadata[stat_name] = {
                str(key): float(value)
                for key, value in stat(numeric_only=True).to_dict().items()
            }

    return pandas_metadata
load(data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]

Reads pd.DataFrame or pd.Series from a .parquet or .csv file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Raises:

Type Description
ImportError

If pyarrow or fastparquet is not installed.

TypeError

Raised if there is an error when reading parquet files.

zenml_type_error

If the data type is a custom data type.

Returns:

Type Description
Union[DataFrame, Series]

The pandas dataframe or series.

Source code in src/zenml/integrations/pandas/materializers/pandas_materializer.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
    """Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.

    Args:
        data_type: The type of the data to read.

    Raises:
        ImportError: If pyarrow or fastparquet is not installed.
        TypeError: Raised if there is an error when reading parquet files.
        zenml_type_error: If the data type is a custom data type.

    Returns:
        The pandas dataframe or series.
    """
    try:
        # First try normal loading
        if self.artifact_store.exists(self.parquet_path):
            if self.pyarrow_exists:
                with self.artifact_store.open(
                    self.parquet_path, mode="rb"
                ) as f:
                    df = pd.read_parquet(f)
            else:
                raise ImportError(
                    "You have an old version of a `PandasMaterializer` "
                    "data artifact stored in the artifact store "
                    "as a `.parquet` file, which requires `pyarrow` "
                    "for reading, You can install `pyarrow` by running "
                    "'`pip install pyarrow fastparquet`'."
                )
        else:
            with self.artifact_store.open(self.csv_path, mode="rb") as f:
                df = pd.read_csv(f, index_col=0, parse_dates=True)
    except TypeError as e:
        # Check for common data type error patterns
        error_str = str(e).lower()
        is_dtype_error = (
            "not understood" in error_str
            or "no type" in error_str
            or "cannot deserialize" in error_str
            or "data type" in error_str
        )

        if is_dtype_error:
            # If the error is due to a custom data type, raise a ZenML TypeError
            # This is to avoid the original error from being swallowed
            # and to provide a more helpful error message
            zenml_type_error = TypeError(
                "Encountered an error with custom data types. This may be due to "
                "missing libraries that were used when the data was originally created. "
                "For example, you might need to install libraries like 'geopandas' for "
                "GeoPandas data types, 'pandas-gbq' for BigQuery data types, or "
                "'pyarrow' for Arrow data types. Make sure to import these libraries "
                "in your step code as well as adding them to your step requirements, "
                "even if you're not directly using them in your code. Pandas needs "
                "these libraries to be imported to properly load the custom data types. "
                "Try installing any packages that were used in previous pipeline steps "
                "but might not be available in the current environment."
            )
            raise zenml_type_error from e
        # We don't know how to handle this error, so re-raise the original error
        raise e

    # validate the type of the data.
    def is_dataframe_or_series(
        df: Union[pd.DataFrame, pd.Series],
    ) -> Union[pd.DataFrame, pd.Series]:
        """Checks if the data is a `pd.DataFrame` or `pd.Series`.

        Args:
            df: The data to check.

        Returns:
            The data if it is a `pd.DataFrame` or `pd.Series`.
        """
        if issubclass(data_type, pd.Series):
            # Taking the first column if its a series as the assumption
            # is that there will only be one
            assert len(df.columns) == 1
            df = df[df.columns[0]]
            return df
        else:
            return df

    return is_dataframe_or_series(df)
save(df: Union[pd.DataFrame, pd.Series]) -> None

Writes a pandas dataframe or series to the specified filename.

Parameters:

Name Type Description Default
df Union[DataFrame, Series]

The pandas dataframe or series to write.

required
Source code in src/zenml/integrations/pandas/materializers/pandas_materializer.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
    """Writes a pandas dataframe or series to the specified filename.

    Args:
        df: The pandas dataframe or series to write.
    """
    if isinstance(df, pd.Series):
        df = df.to_frame(name="series")

    if self.pyarrow_exists:
        with self.artifact_store.open(self.parquet_path, mode="wb") as f:
            df.to_parquet(f, compression=COMPRESSION_TYPE)
    else:
        with self.artifact_store.open(self.csv_path, mode="wb") as f:
            df.to_csv(f, index=True)
save_visualizations(df: Union[pd.DataFrame, pd.Series]) -> Dict[str, VisualizationType]

Save visualizations of the given pandas dataframe or series.

Creates two visualizations: 1. A statistical description of the data (using df.describe()) 2. A sample of the data (first N rows controlled by ZENML_PANDAS_SAMPLE_ROWS)

Note

The number of sample rows shown can be controlled with the ZENML_PANDAS_SAMPLE_ROWS environment variable.

Parameters:

Name Type Description Default
df Union[DataFrame, Series]

The pandas dataframe or series to visualize.

required

Returns:

Type Description
Dict[str, VisualizationType]

A dictionary of visualization URIs and their types.

Source code in src/zenml/integrations/pandas/materializers/pandas_materializer.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def save_visualizations(
    self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, VisualizationType]:
    """Save visualizations of the given pandas dataframe or series.

    Creates two visualizations:
    1. A statistical description of the data (using df.describe())
    2. A sample of the data (first N rows controlled by ZENML_PANDAS_SAMPLE_ROWS)

    Note:
        The number of sample rows shown can be controlled with the
        ZENML_PANDAS_SAMPLE_ROWS environment variable.

    Args:
        df: The pandas dataframe or series to visualize.

    Returns:
        A dictionary of visualization URIs and their types.
    """
    visualizations = {}
    describe_uri = os.path.join(self.uri, "describe.csv")
    describe_uri = describe_uri.replace("\\", "/")
    with self.artifact_store.open(describe_uri, mode="wb") as f:
        df.describe().to_csv(f)
    visualizations[describe_uri] = VisualizationType.CSV

    # Get the number of sample rows from environment variable or use default
    sample_rows = int(
        os.environ.get("ZENML_PANDAS_SAMPLE_ROWS", DEFAULT_SAMPLE_ROWS)
    )

    # Add our sample visualization (with configurable number of rows)
    if isinstance(df, pd.Series):
        sample_df = df.head(sample_rows).to_frame()
    else:
        sample_df = df.head(sample_rows)

    sample_uri = os.path.join(self.uri, "sample.csv")
    sample_uri = sample_uri.replace("\\", "/")
    with self.artifact_store.open(sample_uri, mode="wb") as f:
        sample_df.to_csv(f)

    visualizations[sample_uri] = VisualizationType.CSV

    return visualizations
Functions
is_standard_dtype(dtype_str: str) -> bool

Check if a dtype string represents a standard pandas/numpy dtype.

Parameters:

Name Type Description Default
dtype_str str

String representation of the dtype

required

Returns:

Name Type Description
bool bool

True if it's a standard dtype, False otherwise

Source code in src/zenml/integrations/pandas/materializers/pandas_materializer.py
63
64
65
66
67
68
69
70
71
72
73
def is_standard_dtype(dtype_str: str) -> bool:
    """Check if a dtype string represents a standard pandas/numpy dtype.

    Args:
        dtype_str: String representation of the dtype

    Returns:
        bool: True if it's a standard dtype, False otherwise
    """
    dtype_str = dtype_str.lower()
    return any(prefix in dtype_str for prefix in STANDARD_DTYPE_PREFIXES)