Skip to content

Polars

zenml.integrations.polars

Initialization of the Polars integration.

Attributes

POLARS = 'polars' module-attribute

Classes

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
140
141
142
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for requirement in cls.get_requirements():
        parsed_requirement = Requirement(requirement)

        if not requirement_installed(parsed_requirement):
            logger.debug(
                "Requirement '%s' for integration '%s' is not installed "
                "or installed with the wrong version.",
                requirement,
                cls.NAME,
            )
            return False

        dependencies = get_dependencies(parsed_requirement)

        for dependency in dependencies:
            if not requirement_installed(dependency):
                logger.debug(
                    "Requirement '%s' for integration '%s' is not "
                    "installed or installed with the wrong version.",
                    dependency,
                    cls.NAME,
                )
                return False

    logger.debug(
        f"Integration '{cls.NAME}' is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
144
145
146
147
148
149
150
151
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
153
154
155
156
157
158
159
160
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

PolarsIntegration

Bases: Integration

Definition of Polars integration for ZenML.

Functions
activate() -> None classmethod

Activates the integration.

Source code in src/zenml/integrations/polars/__init__.py
30
31
32
33
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.polars import materializers  # noqa

Modules

materializers

Initialization for the Polars materializers.

Classes
Modules
dataframe_materializer

Polars materializer.

Classes
PolarsMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read/write Polars dataframes.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
load(data_type: Type[Any]) -> Any

Reads and returns Polars data after copying it to temporary path.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

A Polars data frame or series.

Source code in src/zenml/integrations/polars/materializers/dataframe_materializer.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def load(self, data_type: Type[Any]) -> Any:
    """Reads and returns Polars data after copying it to temporary path.

    Args:
        data_type: The type of the data to read.

    Returns:
        A Polars data frame or series.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        io_utils.copy_dir(self.uri, temp_dir)

        # Load the data from the temporary directory
        table = pq.read_table(
            os.path.join(temp_dir, "dataframe.parquet").replace("\\", "/")
        )

        # If the data is of type pl.Series, convert it back to a pyarrow array
        # instead of a table.
        if (
            table.schema.metadata
            and b"zenml_is_pl_series" in table.schema.metadata
        ):
            isinstance_bytes = table.schema.metadata[b"zenml_is_pl_series"]
            isinstance_series = bool.from_bytes(isinstance_bytes, "big")
            if isinstance_series:
                table = table.column(0)

        # Convert the table to a Polars data frame or series
        data = pl.from_arrow(table)

        return data
save(data: Union[pl.DataFrame, pl.Series]) -> None

Writes Polars data to the artifact store.

Parameters:

Name Type Description Default
data Union[DataFrame, Series]

The data to write.

required

Raises:

Type Description
TypeError

If the data is not of type pl.DataFrame or pl.Series.

Source code in src/zenml/integrations/polars/materializers/dataframe_materializer.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def save(self, data: Union[pl.DataFrame, pl.Series]) -> None:
    """Writes Polars data to the artifact store.

    Args:
        data: The data to write.

    Raises:
        TypeError: If the data is not of type pl.DataFrame or pl.Series.
    """
    # Data type check
    if not isinstance(data, self.ASSOCIATED_TYPES):
        raise TypeError(
            f"Expected data of type {self.ASSOCIATED_TYPES}, "
            f"got {type(data)}"
        )

    # Convert the data to an Apache Arrow Table
    if isinstance(data, pl.DataFrame):
        table = data.to_arrow()
    else:
        # Construct a PyArrow Table with schema from the individual pl.Series
        # array if it is a single pl.Series.
        array = data.to_arrow()
        table = pa.Table.from_arrays([array], names=[data.name])

    # Register whether data is of type pl.Series, so that the materializer read step can
    # convert it back appropriately.
    isinstance_bytes = isinstance(data, pl.Series).to_bytes(1, "big")
    table = table.replace_schema_metadata(
        {b"zenml_is_pl_series": isinstance_bytes}
    )

    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        # Write the table to a Parquet file
        path = os.path.join(temp_dir, "dataframe.parquet").replace(
            "\\", "/"
        )
        pq.write_table(table, path)  # Uses lz4 compression by default
        io_utils.copy_dir(temp_dir, self.uri)
Modules