Skip to content

Great Expectations

zenml.integrations.great_expectations

Great Expectation integration for ZenML.

The Great Expectations integration enables you to use Great Expectations as a way of profiling and validating your data.

Attributes

GREAT_EXPECTATIONS = 'great_expectations' module-attribute

GREAT_EXPECTATIONS_DATA_VALIDATOR_FLAVOR = 'great_expectations' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor. Custom flavors are then scoped by user and workspace

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor. Custom flavors
            are then scoped by user and workspace

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )
    user = None
    workspace = None
    if is_custom:
        user = Client().active_user.id
        workspace = Client().active_workspace.id

    model_class = FlavorRequest if is_custom else InternalFlavorRequest
    model = model_class(
        user=user,
        workspace=workspace,
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

GreatExpectationsIntegration

Bases: Integration

Definition of Great Expectations integration for ZenML.

Functions
activate() -> None classmethod

Activate the Great Expectations integration.

Source code in src/zenml/integrations/great_expectations/__init__.py
37
38
39
40
@classmethod
def activate(cls) -> None:
    """Activate the Great Expectations integration."""
    from zenml.integrations.great_expectations import materializers  # noqa
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Great Expectations integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/great_expectations/__init__.py
42
43
44
45
46
47
48
49
50
51
52
53
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Great Expectations integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.great_expectations.flavors import (
        GreatExpectationsDataValidatorFlavor,
    )

    return [GreatExpectationsDataValidatorFlavor]
get_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/great_expectations/__init__.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    from zenml.integrations.pandas import PandasIntegration

    return cls.REQUIREMENTS + \
        PandasIntegration.get_requirements(target_os=target_os)

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
170
171
172
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
174
175
176
177
178
179
180
181
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
183
184
185
186
187
188
189
190
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

data_validators

Initialization of the Great Expectations data validator for ZenML.

Classes
GreatExpectationsDataValidator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], workspace: UUID, created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseDataValidator

Great Expectations data validator stack component.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    workspace: UUID,
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        workspace: The ID of the workspace the component belongs to.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.workspace = workspace
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: GreatExpectationsDataValidatorConfig property

Returns the GreatExpectationsDataValidatorConfig config.

Returns:

Type Description
GreatExpectationsDataValidatorConfig

The configuration.

context_config: Optional[DataContextConfig] property

Get the Great Expectations data context configuration.

Raises:

Type Description
ValueError

In case there is an invalid context_config value

Returns:

Type Description
Optional[DataContextConfig]

A dictionary with the GE data context configuration.

data_context: AbstractDataContext property

Returns the Great Expectations data context configured for this component.

Returns:

Type Description
AbstractDataContext

The Great Expectations data context configured for this component.

local_path: Optional[str] property

Return a local path where this component stores information.

If an existing local GE data context is used, it is interpreted as a local path that needs to be accessible in all runtime environments.

Returns:

Type Description
Optional[str]

The local path where this component stores information.

root_directory: str property

Returns path to the root directory for all local files concerning this data validator.

Returns:

Type Description
str

Path to the root directory.

Functions
data_profiling(dataset: pd.DataFrame, comparison_dataset: Optional[Any] = None, profile_list: Optional[Sequence[str]] = None, expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, profiler_kwargs: Optional[Dict[str, Any]] = None, overwrite_existing_suite: bool = True, **kwargs: Any) -> ExpectationSuite

Infer a Great Expectation Expectation Suite from a given dataset.

This Great Expectations specific data profiling method implementation builds an Expectation Suite automatically by running a UserConfigurableProfiler on an input dataset as covered in the official GE documentation.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset from which the expectation suite will be inferred.

required
comparison_dataset Optional[Any]

Optional dataset used to generate data comparison (i.e. data drift) profiles. Not supported by the Great Expectation data validator.

None
profile_list Optional[Sequence[str]]

Optional list identifying the categories of data profiles to be generated. Not supported by the Great Expectation data validator.

None
expectation_suite_name Optional[str]

The name of the expectation suite to create or update. If not supplied, a unique name will be generated from the current pipeline and step name, if running in the context of a pipeline step.

None
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
profiler_kwargs Optional[Dict[str, Any]]

A dictionary of custom keyword arguments to pass to the profiler.

None
overwrite_existing_suite bool

Whether to overwrite an existing expectation suite, if one exists with that name.

True
kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
ExpectationSuite

The inferred Expectation Suite.

Raises:

Type Description
ValueError

if an expectation_suite_name value is not supplied and a name for the expectation suite cannot be generated from the current step name and pipeline name.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def data_profiling(
    self,
    dataset: pd.DataFrame,
    comparison_dataset: Optional[Any] = None,
    profile_list: Optional[Sequence[str]] = None,
    expectation_suite_name: Optional[str] = None,
    data_asset_name: Optional[str] = None,
    profiler_kwargs: Optional[Dict[str, Any]] = None,
    overwrite_existing_suite: bool = True,
    **kwargs: Any,
) -> ExpectationSuite:
    """Infer a Great Expectation Expectation Suite from a given dataset.

    This Great Expectations specific data profiling method implementation
    builds an Expectation Suite automatically by running a
    UserConfigurableProfiler on an input dataset [as covered in the official
    GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).

    Args:
        dataset: The dataset from which the expectation suite will be
            inferred.
        comparison_dataset: Optional dataset used to generate data
            comparison (i.e. data drift) profiles. Not supported by the
            Great Expectation data validator.
        profile_list: Optional list identifying the categories of data
            profiles to be generated. Not supported by the Great Expectation
            data validator.
        expectation_suite_name: The name of the expectation suite to create
            or update. If not supplied, a unique name will be generated from
            the current pipeline and step name, if running in the context of
            a pipeline step.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        profiler_kwargs: A dictionary of custom keyword arguments to pass to
            the profiler.
        overwrite_existing_suite: Whether to overwrite an existing
            expectation suite, if one exists with that name.
        kwargs: Additional keyword arguments (unused).

    Returns:
        The inferred Expectation Suite.

    Raises:
        ValueError: if an `expectation_suite_name` value is not supplied and
            a name for the expectation suite cannot be generated from the
            current step name and pipeline name.
    """
    context = self.data_context

    if comparison_dataset is not None:
        logger.warning(
            "A comparison dataset is not required by Great Expectations "
            "to do data profiling. Silently ignoring the supplied dataset "
        )

    if not expectation_suite_name:
        try:
            step_context = get_step_context()
            pipeline_name = step_context.pipeline.name
            step_name = step_context.step_run.name
            expectation_suite_name = f"{pipeline_name}_{step_name}"
        except RuntimeError:
            raise ValueError(
                "A expectation suite name is required when not running in "
                "the context of a pipeline step."
            )

    suite_exists = False
    if context.expectations_store.has_key(  # noqa
        ExpectationSuiteIdentifier(expectation_suite_name)
    ):
        suite_exists = True
        suite = context.get_expectation_suite(expectation_suite_name)
        if not overwrite_existing_suite:
            logger.info(
                f"Expectation Suite `{expectation_suite_name}` "
                f"already exists and `overwrite_existing_suite` is not set "
                f"in the step configuration. Skipping re-running the "
                f"profiler."
            )
            return suite

    batch_request = create_batch_request(context, dataset, data_asset_name)

    try:
        if suite_exists:
            validator = context.get_validator(
                batch_request=batch_request,
                expectation_suite_name=expectation_suite_name,
            )
        else:
            validator = context.get_validator(
                batch_request=batch_request,
                create_expectation_suite_with_name=expectation_suite_name,
            )

        profiler = UserConfigurableProfiler(
            profile_dataset=validator, **profiler_kwargs
        )

        suite = profiler.build_suite()
        context.save_expectation_suite(
            expectation_suite=suite,
            expectation_suite_name=expectation_suite_name,
        )

        context.build_data_docs()
    finally:
        context.delete_datasource(batch_request.datasource_name)

    return suite
data_validation(dataset: pd.DataFrame, comparison_dataset: Optional[Any] = None, check_list: Optional[Sequence[str]] = None, expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, action_list: Optional[List[Dict[str, Any]]] = None, **kwargs: Any) -> CheckpointResult

Great Expectations data validation.

This Great Expectations specific data validation method implementation validates an input dataset against an Expectation Suite (the GE definition of a profile) as covered in the official GE documentation.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to validate.

required
comparison_dataset Optional[Any]

Optional dataset used to run data comparison (i.e. data drift) checks. Not supported by the Great Expectation data validator.

None
check_list Optional[Sequence[str]]

Optional list identifying the data validation checks to be performed. Not supported by the Great Expectations data validator.

None
expectation_suite_name Optional[str]

The name of the expectation suite to use to validate the dataset. A value must be provided.

None
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
action_list Optional[List[Dict[str, Any]]]

A list of additional Great Expectations actions to run after the validation check.

None
kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
CheckpointResult

The Great Expectations validation (checkpoint) result.

Raises:

Type Description
ValueError

if the expectation_suite_name argument is omitted.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def data_validation(
    self,
    dataset: pd.DataFrame,
    comparison_dataset: Optional[Any] = None,
    check_list: Optional[Sequence[str]] = None,
    expectation_suite_name: Optional[str] = None,
    data_asset_name: Optional[str] = None,
    action_list: Optional[List[Dict[str, Any]]] = None,
    **kwargs: Any,
) -> CheckpointResult:
    """Great Expectations data validation.

    This Great Expectations specific data validation method
    implementation validates an input dataset against an Expectation Suite
    (the GE definition of a profile) [as covered in the official GE
    documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).

    Args:
        dataset: The dataset to validate.
        comparison_dataset: Optional dataset used to run data
            comparison (i.e. data drift) checks. Not supported by the
            Great Expectation data validator.
        check_list: Optional list identifying the data validation checks to
            be performed. Not supported by the Great Expectations data
            validator.
        expectation_suite_name: The name of the expectation suite to use to
            validate the dataset. A value must be provided.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        action_list: A list of additional Great Expectations actions to run after
            the validation check.
        kwargs: Additional keyword arguments (unused).

    Returns:
        The Great Expectations validation (checkpoint) result.

    Raises:
        ValueError: if the `expectation_suite_name` argument is omitted.
    """
    if not expectation_suite_name:
        raise ValueError("Missing expectation_suite_name argument value.")

    if comparison_dataset is not None:
        logger.warning(
            "A comparison dataset is not required by Great Expectations "
            "to do data validation. Silently ignoring the supplied dataset "
        )

    try:
        step_context = get_step_context()
        run_name = step_context.pipeline_run.name
        step_name = step_context.step_run.name
    except RuntimeError:
        # if not running inside a pipeline step, use random values
        run_name = f"pipeline_{random_str(5)}"
        step_name = f"step_{random_str(5)}"

    context = self.data_context

    checkpoint_name = f"{run_name}_{step_name}"

    batch_request = create_batch_request(context, dataset, data_asset_name)

    action_list = action_list or [
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
    ]

    checkpoint_config: Dict[str, Any] = {
        "name": checkpoint_name,
        "run_name_template": run_name,
        "config_version": 1,
        "class_name": "Checkpoint",
        "expectation_suite_name": expectation_suite_name,
        "action_list": action_list,
    }
    context.add_checkpoint(**checkpoint_config)  # type: ignore[has-type]

    try:
        results = context.run_checkpoint(
            checkpoint_name=checkpoint_name,
            validations=[{"batch_request": batch_request}],
        )
    finally:
        context.delete_datasource(batch_request.datasource_name)
        context.delete_checkpoint(checkpoint_name)

    return results
get_data_context() -> AbstractDataContext classmethod

Get the Great Expectations data context managed by ZenML.

Call this method to retrieve the data context managed by ZenML through the active Great Expectations data validator stack component.

Returns:

Type Description
AbstractDataContext

A Great Expectations data context managed by ZenML as configured

AbstractDataContext

through the active data validator stack component.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def get_data_context(cls) -> AbstractDataContext:
    """Get the Great Expectations data context managed by ZenML.

    Call this method to retrieve the data context managed by ZenML
    through the active Great Expectations data validator stack component.

    Returns:
        A Great Expectations data context managed by ZenML as configured
        through the active data validator stack component.
    """
    data_validator = cast(
        "GreatExpectationsDataValidator", cls.get_active_data_validator()
    )
    return data_validator.data_context
get_data_docs_config(prefix: str, local: bool = False) -> Dict[str, Any]

Generate Great Expectations data docs configuration.

Parameters:

Name Type Description Default
prefix str

The path prefix for the ZenML data docs configuration

required
local bool

Whether the data docs site is local or remote.

False

Returns:

Type Description
Dict[str, Any]

A dictionary with the GE data docs site configuration.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def get_data_docs_config(
    self, prefix: str, local: bool = False
) -> Dict[str, Any]:
    """Generate Great Expectations data docs configuration.

    Args:
        prefix: The path prefix for the ZenML data docs configuration
        local: Whether the data docs site is local or remote.

    Returns:
        A dictionary with the GE data docs site configuration.
    """
    if local:
        store_backend = {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": f"{self.root_directory}/{prefix}",
        }
    else:
        store_backend = {
            "module_name": ZenMLArtifactStoreBackend.__module__,
            "class_name": ZenMLArtifactStoreBackend.__name__,
            "prefix": f"{str(self.id)}/{prefix}",
        }

    return {
        "class_name": "SiteBuilder",
        "store_backend": store_backend,
        "site_index_builder": {
            "class_name": "DefaultSiteIndexBuilder",
        },
    }
get_store_config(class_name: str, prefix: str) -> Dict[str, Any]

Generate a Great Expectations store configuration.

Parameters:

Name Type Description Default
class_name str

The store class name

required
prefix str

The path prefix for the ZenML store configuration

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the GE store configuration.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def get_store_config(self, class_name: str, prefix: str) -> Dict[str, Any]:
    """Generate a Great Expectations store configuration.

    Args:
        class_name: The store class name
        prefix: The path prefix for the ZenML store configuration

    Returns:
        A dictionary with the GE store configuration.
    """
    return {
        "class_name": class_name,
        "store_backend": {
            "module_name": ZenMLArtifactStoreBackend.__module__,
            "class_name": ZenMLArtifactStoreBackend.__name__,
            "prefix": f"{str(self.id)}/{prefix}",
        },
    }
Modules
ge_data_validator

Implementation of the Great Expectations data validator.

Classes
GreatExpectationsDataValidator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], workspace: UUID, created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseDataValidator

Great Expectations data validator stack component.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    workspace: UUID,
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        workspace: The ID of the workspace the component belongs to.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.workspace = workspace
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: GreatExpectationsDataValidatorConfig property

Returns the GreatExpectationsDataValidatorConfig config.

Returns:

Type Description
GreatExpectationsDataValidatorConfig

The configuration.

context_config: Optional[DataContextConfig] property

Get the Great Expectations data context configuration.

Raises:

Type Description
ValueError

In case there is an invalid context_config value

Returns:

Type Description
Optional[DataContextConfig]

A dictionary with the GE data context configuration.

data_context: AbstractDataContext property

Returns the Great Expectations data context configured for this component.

Returns:

Type Description
AbstractDataContext

The Great Expectations data context configured for this component.

local_path: Optional[str] property

Return a local path where this component stores information.

If an existing local GE data context is used, it is interpreted as a local path that needs to be accessible in all runtime environments.

Returns:

Type Description
Optional[str]

The local path where this component stores information.

root_directory: str property

Returns path to the root directory for all local files concerning this data validator.

Returns:

Type Description
str

Path to the root directory.

Functions
data_profiling(dataset: pd.DataFrame, comparison_dataset: Optional[Any] = None, profile_list: Optional[Sequence[str]] = None, expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, profiler_kwargs: Optional[Dict[str, Any]] = None, overwrite_existing_suite: bool = True, **kwargs: Any) -> ExpectationSuite

Infer a Great Expectation Expectation Suite from a given dataset.

This Great Expectations specific data profiling method implementation builds an Expectation Suite automatically by running a UserConfigurableProfiler on an input dataset as covered in the official GE documentation.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset from which the expectation suite will be inferred.

required
comparison_dataset Optional[Any]

Optional dataset used to generate data comparison (i.e. data drift) profiles. Not supported by the Great Expectation data validator.

None
profile_list Optional[Sequence[str]]

Optional list identifying the categories of data profiles to be generated. Not supported by the Great Expectation data validator.

None
expectation_suite_name Optional[str]

The name of the expectation suite to create or update. If not supplied, a unique name will be generated from the current pipeline and step name, if running in the context of a pipeline step.

None
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
profiler_kwargs Optional[Dict[str, Any]]

A dictionary of custom keyword arguments to pass to the profiler.

None
overwrite_existing_suite bool

Whether to overwrite an existing expectation suite, if one exists with that name.

True
kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
ExpectationSuite

The inferred Expectation Suite.

Raises:

Type Description
ValueError

if an expectation_suite_name value is not supplied and a name for the expectation suite cannot be generated from the current step name and pipeline name.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def data_profiling(
    self,
    dataset: pd.DataFrame,
    comparison_dataset: Optional[Any] = None,
    profile_list: Optional[Sequence[str]] = None,
    expectation_suite_name: Optional[str] = None,
    data_asset_name: Optional[str] = None,
    profiler_kwargs: Optional[Dict[str, Any]] = None,
    overwrite_existing_suite: bool = True,
    **kwargs: Any,
) -> ExpectationSuite:
    """Infer a Great Expectation Expectation Suite from a given dataset.

    This Great Expectations specific data profiling method implementation
    builds an Expectation Suite automatically by running a
    UserConfigurableProfiler on an input dataset [as covered in the official
    GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).

    Args:
        dataset: The dataset from which the expectation suite will be
            inferred.
        comparison_dataset: Optional dataset used to generate data
            comparison (i.e. data drift) profiles. Not supported by the
            Great Expectation data validator.
        profile_list: Optional list identifying the categories of data
            profiles to be generated. Not supported by the Great Expectation
            data validator.
        expectation_suite_name: The name of the expectation suite to create
            or update. If not supplied, a unique name will be generated from
            the current pipeline and step name, if running in the context of
            a pipeline step.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        profiler_kwargs: A dictionary of custom keyword arguments to pass to
            the profiler.
        overwrite_existing_suite: Whether to overwrite an existing
            expectation suite, if one exists with that name.
        kwargs: Additional keyword arguments (unused).

    Returns:
        The inferred Expectation Suite.

    Raises:
        ValueError: if an `expectation_suite_name` value is not supplied and
            a name for the expectation suite cannot be generated from the
            current step name and pipeline name.
    """
    context = self.data_context

    if comparison_dataset is not None:
        logger.warning(
            "A comparison dataset is not required by Great Expectations "
            "to do data profiling. Silently ignoring the supplied dataset "
        )

    if not expectation_suite_name:
        try:
            step_context = get_step_context()
            pipeline_name = step_context.pipeline.name
            step_name = step_context.step_run.name
            expectation_suite_name = f"{pipeline_name}_{step_name}"
        except RuntimeError:
            raise ValueError(
                "A expectation suite name is required when not running in "
                "the context of a pipeline step."
            )

    suite_exists = False
    if context.expectations_store.has_key(  # noqa
        ExpectationSuiteIdentifier(expectation_suite_name)
    ):
        suite_exists = True
        suite = context.get_expectation_suite(expectation_suite_name)
        if not overwrite_existing_suite:
            logger.info(
                f"Expectation Suite `{expectation_suite_name}` "
                f"already exists and `overwrite_existing_suite` is not set "
                f"in the step configuration. Skipping re-running the "
                f"profiler."
            )
            return suite

    batch_request = create_batch_request(context, dataset, data_asset_name)

    try:
        if suite_exists:
            validator = context.get_validator(
                batch_request=batch_request,
                expectation_suite_name=expectation_suite_name,
            )
        else:
            validator = context.get_validator(
                batch_request=batch_request,
                create_expectation_suite_with_name=expectation_suite_name,
            )

        profiler = UserConfigurableProfiler(
            profile_dataset=validator, **profiler_kwargs
        )

        suite = profiler.build_suite()
        context.save_expectation_suite(
            expectation_suite=suite,
            expectation_suite_name=expectation_suite_name,
        )

        context.build_data_docs()
    finally:
        context.delete_datasource(batch_request.datasource_name)

    return suite
data_validation(dataset: pd.DataFrame, comparison_dataset: Optional[Any] = None, check_list: Optional[Sequence[str]] = None, expectation_suite_name: Optional[str] = None, data_asset_name: Optional[str] = None, action_list: Optional[List[Dict[str, Any]]] = None, **kwargs: Any) -> CheckpointResult

Great Expectations data validation.

This Great Expectations specific data validation method implementation validates an input dataset against an Expectation Suite (the GE definition of a profile) as covered in the official GE documentation.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to validate.

required
comparison_dataset Optional[Any]

Optional dataset used to run data comparison (i.e. data drift) checks. Not supported by the Great Expectation data validator.

None
check_list Optional[Sequence[str]]

Optional list identifying the data validation checks to be performed. Not supported by the Great Expectations data validator.

None
expectation_suite_name Optional[str]

The name of the expectation suite to use to validate the dataset. A value must be provided.

None
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
action_list Optional[List[Dict[str, Any]]]

A list of additional Great Expectations actions to run after the validation check.

None
kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
CheckpointResult

The Great Expectations validation (checkpoint) result.

Raises:

Type Description
ValueError

if the expectation_suite_name argument is omitted.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def data_validation(
    self,
    dataset: pd.DataFrame,
    comparison_dataset: Optional[Any] = None,
    check_list: Optional[Sequence[str]] = None,
    expectation_suite_name: Optional[str] = None,
    data_asset_name: Optional[str] = None,
    action_list: Optional[List[Dict[str, Any]]] = None,
    **kwargs: Any,
) -> CheckpointResult:
    """Great Expectations data validation.

    This Great Expectations specific data validation method
    implementation validates an input dataset against an Expectation Suite
    (the GE definition of a profile) [as covered in the official GE
    documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).

    Args:
        dataset: The dataset to validate.
        comparison_dataset: Optional dataset used to run data
            comparison (i.e. data drift) checks. Not supported by the
            Great Expectation data validator.
        check_list: Optional list identifying the data validation checks to
            be performed. Not supported by the Great Expectations data
            validator.
        expectation_suite_name: The name of the expectation suite to use to
            validate the dataset. A value must be provided.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        action_list: A list of additional Great Expectations actions to run after
            the validation check.
        kwargs: Additional keyword arguments (unused).

    Returns:
        The Great Expectations validation (checkpoint) result.

    Raises:
        ValueError: if the `expectation_suite_name` argument is omitted.
    """
    if not expectation_suite_name:
        raise ValueError("Missing expectation_suite_name argument value.")

    if comparison_dataset is not None:
        logger.warning(
            "A comparison dataset is not required by Great Expectations "
            "to do data validation. Silently ignoring the supplied dataset "
        )

    try:
        step_context = get_step_context()
        run_name = step_context.pipeline_run.name
        step_name = step_context.step_run.name
    except RuntimeError:
        # if not running inside a pipeline step, use random values
        run_name = f"pipeline_{random_str(5)}"
        step_name = f"step_{random_str(5)}"

    context = self.data_context

    checkpoint_name = f"{run_name}_{step_name}"

    batch_request = create_batch_request(context, dataset, data_asset_name)

    action_list = action_list or [
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
    ]

    checkpoint_config: Dict[str, Any] = {
        "name": checkpoint_name,
        "run_name_template": run_name,
        "config_version": 1,
        "class_name": "Checkpoint",
        "expectation_suite_name": expectation_suite_name,
        "action_list": action_list,
    }
    context.add_checkpoint(**checkpoint_config)  # type: ignore[has-type]

    try:
        results = context.run_checkpoint(
            checkpoint_name=checkpoint_name,
            validations=[{"batch_request": batch_request}],
        )
    finally:
        context.delete_datasource(batch_request.datasource_name)
        context.delete_checkpoint(checkpoint_name)

    return results
get_data_context() -> AbstractDataContext classmethod

Get the Great Expectations data context managed by ZenML.

Call this method to retrieve the data context managed by ZenML through the active Great Expectations data validator stack component.

Returns:

Type Description
AbstractDataContext

A Great Expectations data context managed by ZenML as configured

AbstractDataContext

through the active data validator stack component.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def get_data_context(cls) -> AbstractDataContext:
    """Get the Great Expectations data context managed by ZenML.

    Call this method to retrieve the data context managed by ZenML
    through the active Great Expectations data validator stack component.

    Returns:
        A Great Expectations data context managed by ZenML as configured
        through the active data validator stack component.
    """
    data_validator = cast(
        "GreatExpectationsDataValidator", cls.get_active_data_validator()
    )
    return data_validator.data_context
get_data_docs_config(prefix: str, local: bool = False) -> Dict[str, Any]

Generate Great Expectations data docs configuration.

Parameters:

Name Type Description Default
prefix str

The path prefix for the ZenML data docs configuration

required
local bool

Whether the data docs site is local or remote.

False

Returns:

Type Description
Dict[str, Any]

A dictionary with the GE data docs site configuration.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def get_data_docs_config(
    self, prefix: str, local: bool = False
) -> Dict[str, Any]:
    """Generate Great Expectations data docs configuration.

    Args:
        prefix: The path prefix for the ZenML data docs configuration
        local: Whether the data docs site is local or remote.

    Returns:
        A dictionary with the GE data docs site configuration.
    """
    if local:
        store_backend = {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": f"{self.root_directory}/{prefix}",
        }
    else:
        store_backend = {
            "module_name": ZenMLArtifactStoreBackend.__module__,
            "class_name": ZenMLArtifactStoreBackend.__name__,
            "prefix": f"{str(self.id)}/{prefix}",
        }

    return {
        "class_name": "SiteBuilder",
        "store_backend": store_backend,
        "site_index_builder": {
            "class_name": "DefaultSiteIndexBuilder",
        },
    }
get_store_config(class_name: str, prefix: str) -> Dict[str, Any]

Generate a Great Expectations store configuration.

Parameters:

Name Type Description Default
class_name str

The store class name

required
prefix str

The path prefix for the ZenML store configuration

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the GE store configuration.

Source code in src/zenml/integrations/great_expectations/data_validators/ge_data_validator.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def get_store_config(self, class_name: str, prefix: str) -> Dict[str, Any]:
    """Generate a Great Expectations store configuration.

    Args:
        class_name: The store class name
        prefix: The path prefix for the ZenML store configuration

    Returns:
        A dictionary with the GE store configuration.
    """
    return {
        "class_name": class_name,
        "store_backend": {
            "module_name": ZenMLArtifactStoreBackend.__module__,
            "class_name": ZenMLArtifactStoreBackend.__name__,
            "prefix": f"{str(self.id)}/{prefix}",
        },
    }
Functions Modules

flavors

Great Expectations integration flavors.

Classes
GreatExpectationsDataValidatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseDataValidatorConfig

Config for the Great Expectations data validator.

Attributes:

Name Type Description
context_root_dir Optional[str]

location of an already initialized Great Expectations data context. If configured, the data validator will only be usable with local orchestrators.

context_config Optional[Dict[str, Any]]

in-line Great Expectations data context configuration. If the context_root_dir attribute is also set, this configuration will be ignored.

configure_zenml_stores bool

if set, ZenML will automatically configure stores that use the Artifact Store as a backend. If neither context_root_dir nor context_config are set, this is the default behavior.

configure_local_docs bool

configure a local data docs site where Great Expectations docs are generated and can be visualized locally.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

Functions
validate_context_config(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Convert the context configuration if given in JSON/YAML format.

Parameters:

Name Type Description Default
data Dict[str, Any]

The configuration values.

required

Returns:

Type Description
Dict[str, Any]

The validated configuration values.

Raises:

Type Description
ValueError

If the context configuration is not a valid JSON/YAML object.

Source code in src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@model_validator(mode="before")
@classmethod
@before_validator_handler
def validate_context_config(cls, data: Dict[str, Any]) -> Dict[str, Any]:
    """Convert the context configuration if given in JSON/YAML format.

    Args:
        data: The configuration values.

    Returns:
        The validated configuration values.

    Raises:
        ValueError: If the context configuration is not a valid
            JSON/YAML object.
    """
    if isinstance(data.get("context_config"), str):
        try:
            data["context_config"] = yaml.safe_load(data["context_config"])
        except ParserError as e:
            raise ValueError(
                f"Malformed `context_config` value. Only JSON and YAML "
                f"formats are supported: {str(e)}"
            )

    return data
GreatExpectationsDataValidatorFlavor

Bases: BaseDataValidatorFlavor

Great Expectations data validator flavor.

Attributes
config_class: Type[GreatExpectationsDataValidatorConfig] property

Returns GreatExpectationsDataValidatorConfig config class.

Returns:

Type Description
Type[GreatExpectationsDataValidatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GreatExpectationsDataValidator] property

Implementation class for this flavor.

Returns:

Type Description
Type[GreatExpectationsDataValidator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Modules
great_expectations_data_validator_flavor

Great Expectations data validator flavor.

Classes
GreatExpectationsDataValidatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseDataValidatorConfig

Config for the Great Expectations data validator.

Attributes:

Name Type Description
context_root_dir Optional[str]

location of an already initialized Great Expectations data context. If configured, the data validator will only be usable with local orchestrators.

context_config Optional[Dict[str, Any]]

in-line Great Expectations data context configuration. If the context_root_dir attribute is also set, this configuration will be ignored.

configure_zenml_stores bool

if set, ZenML will automatically configure stores that use the Artifact Store as a backend. If neither context_root_dir nor context_config are set, this is the default behavior.

configure_local_docs bool

configure a local data docs site where Great Expectations docs are generated and can be visualized locally.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

Functions
validate_context_config(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Convert the context configuration if given in JSON/YAML format.

Parameters:

Name Type Description Default
data Dict[str, Any]

The configuration values.

required

Returns:

Type Description
Dict[str, Any]

The validated configuration values.

Raises:

Type Description
ValueError

If the context configuration is not a valid JSON/YAML object.

Source code in src/zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@model_validator(mode="before")
@classmethod
@before_validator_handler
def validate_context_config(cls, data: Dict[str, Any]) -> Dict[str, Any]:
    """Convert the context configuration if given in JSON/YAML format.

    Args:
        data: The configuration values.

    Returns:
        The validated configuration values.

    Raises:
        ValueError: If the context configuration is not a valid
            JSON/YAML object.
    """
    if isinstance(data.get("context_config"), str):
        try:
            data["context_config"] = yaml.safe_load(data["context_config"])
        except ParserError as e:
            raise ValueError(
                f"Malformed `context_config` value. Only JSON and YAML "
                f"formats are supported: {str(e)}"
            )

    return data
GreatExpectationsDataValidatorFlavor

Bases: BaseDataValidatorFlavor

Great Expectations data validator flavor.

Attributes
config_class: Type[GreatExpectationsDataValidatorConfig] property

Returns GreatExpectationsDataValidatorConfig config class.

Returns:

Type Description
Type[GreatExpectationsDataValidatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GreatExpectationsDataValidator] property

Implementation class for this flavor.

Returns:

Type Description
Type[GreatExpectationsDataValidator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Functions Modules

ge_store_backend

Great Expectations store plugin for ZenML.

Classes
ZenMLArtifactStoreBackend(prefix: str = '', **kwargs: Any)

Bases: TupleStoreBackend

Great Expectations store backend that uses the active ZenML Artifact Store as a store.

Create a Great Expectations ZenML store backend instance.

Parameters:

Name Type Description Default
prefix str

Subpath prefix to use for this store backend.

''
kwargs Any

Additional keyword arguments passed by the Great Expectations core. These are transparently passed to the TupleStoreBackend constructor.

{}
Source code in src/zenml/integrations/great_expectations/ge_store_backend.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(
    self,
    prefix: str = "",
    **kwargs: Any,
) -> None:
    """Create a Great Expectations ZenML store backend instance.

    Args:
        prefix: Subpath prefix to use for this store backend.
        kwargs: Additional keyword arguments passed by the Great Expectations
            core. These are transparently passed to the `TupleStoreBackend`
            constructor.
    """
    super().__init__(**kwargs)

    client = Client()
    artifact_store = client.active_stack.artifact_store
    self.root_path = os.path.join(
        artifact_store.path, "great_expectations"
    )

    # extract the protocol used in the artifact store root path
    protocols = [
        scheme
        for scheme in artifact_store.config.SUPPORTED_SCHEMES
        if self.root_path.startswith(scheme)
    ]
    if protocols:
        self.proto = protocols[0]
    else:
        self.proto = ""

    if prefix:
        if self.platform_specific_separator:
            prefix = prefix.strip(os.sep)
        prefix = prefix.strip("/")
    self.prefix = prefix

    # Initialize with store_backend_id if not part of an HTMLSiteStore
    if not self._suppress_store_backend_id:
        _ = self.store_backend_id

    self._config = {
        "prefix": prefix,
        "module_name": self.__class__.__module__,
        "class_name": self.__class__.__name__,
    }
    self._config.update(kwargs)
    filter_properties_dict(
        properties=self._config, clean_falsy=True, inplace=True
    )
Attributes
config: Dict[str, Any] property

Get the store configuration.

Returns:

Type Description
Dict[str, Any]

The store configuration.

Functions
get_public_url_for_key(key: str, protocol: Optional[str] = None) -> str

Get the public URL of an object in the store.

Parameters:

Name Type Description Default
key str

object key identifier.

required
protocol Optional[str]

optional protocol to use instead of the store protocol.

None

Returns:

Type Description
str

The public URL where the object can be accessed.

Raises:

Type Description
StoreBackendError

if a base_public_path attribute was not configured for the store.

Source code in src/zenml/integrations/great_expectations/ge_store_backend.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def get_public_url_for_key(
    self, key: str, protocol: Optional[str] = None
) -> str:
    """Get the public URL of an object in the store.

    Args:
        key: object key identifier.
        protocol: optional protocol to use instead of the store protocol.

    Returns:
        The public URL where the object can be accessed.

    Raises:
        StoreBackendError: if a `base_public_path` attribute was not
            configured for the store.
    """
    if not self.base_public_path:
        raise StoreBackendError(
            f"Error: No base_public_path was configured! A public URL was "
            f"requested but `base_public_path` was not configured for the "
            f"{self.__class__.__name__}"
        )
    filepath = self._convert_key_to_filepath(key)  # type: ignore[no-untyped-call]
    public_url = self.base_public_path + filepath.replace(self.proto, "")
    return cast(str, public_url)
get_url_for_key(key: Tuple[str, ...], protocol: Optional[str] = None) -> str

Get the URL of an object in the store.

Parameters:

Name Type Description Default
key Tuple[str, ...]

object key identifier.

required
protocol Optional[str]

optional protocol to use instead of the store protocol.

None

Returns:

Type Description
str

The URL of the object in the store.

Source code in src/zenml/integrations/great_expectations/ge_store_backend.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def get_url_for_key(  # type: ignore[override]
    self, key: Tuple[str, ...], protocol: Optional[str] = None
) -> str:
    """Get the URL of an object in the store.

    Args:
        key: object key identifier.
        protocol: optional protocol to use instead of the store protocol.

    Returns:
        The URL of the object in the store.
    """
    filepath = self._build_object_path(key)
    if not protocol and not io_utils.is_remote(filepath):
        protocol = "file:"
    if protocol:
        filepath = filepath.replace(self.proto, f"{protocol}//", 1)

    return filepath
list_keys(prefix: Tuple[str, ...] = ()) -> List[Tuple[str, ...]]

List the keys of all objects identified by a partial key.

Parameters:

Name Type Description Default
prefix Tuple[str, ...]

partial object key identifier.

()

Returns:

Type Description
List[Tuple[str, ...]]

List of keys identifying all objects present in the store that

List[Tuple[str, ...]]

match the input partial key.

Source code in src/zenml/integrations/great_expectations/ge_store_backend.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def list_keys(self, prefix: Tuple[str, ...] = ()) -> List[Tuple[str, ...]]:
    """List the keys of all objects identified by a partial key.

    Args:
        prefix: partial object key identifier.

    Returns:
        List of keys identifying all objects present in the store that
        match the input partial key.
    """
    key_list = []
    list_path = self._build_object_path(prefix, is_prefix=True)
    root_path = self._build_object_path(tuple(), is_prefix=True)
    for root, dirs, files in fileio.walk(list_path):
        for file_ in files:
            filepath = os.path.relpath(
                os.path.join(str(root), str(file_)), root_path
            )

            if self.filepath_prefix and not filepath.startswith(
                self.filepath_prefix
            ):
                continue
            elif self.filepath_suffix and not filepath.endswith(
                self.filepath_suffix
            ):
                continue
            key = self._convert_filepath_to_key(filepath)  # type: ignore[no-untyped-call]
            if key and not self.is_ignored_key(key):  # type: ignore[no-untyped-call]
                key_list.append(key)
    return key_list
remove_key(key: Tuple[str, ...]) -> bool

Delete an object from the store.

Parameters:

Name Type Description Default
key Tuple[str, ...]

object key identifier.

required

Returns:

Type Description
bool

True if the object existed in the store and was removed, otherwise

bool

False.

Source code in src/zenml/integrations/great_expectations/ge_store_backend.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def remove_key(self, key: Tuple[str, ...]) -> bool:  # type: ignore[override]
    """Delete an object from the store.

    Args:
        key: object key identifier.

    Returns:
        True if the object existed in the store and was removed, otherwise
        False.
    """
    filepath: str = self._build_object_path(key)

    if fileio.exists(filepath):
        fileio.remove(filepath)
        if not io_utils.is_remote(filepath):
            parent_dir = str(Path(filepath).parent)
            self.rrmdir(self.root_path, str(parent_dir))
        return True
    return False
rrmdir(start_path: str, end_path: str) -> None staticmethod

Recursively removes empty dirs between start_path and end_path inclusive.

Parameters:

Name Type Description Default
start_path str

Directory to use as a starting point.

required
end_path str

Directory to use as a destination point.

required
Source code in src/zenml/integrations/great_expectations/ge_store_backend.py
312
313
314
315
316
317
318
319
320
321
322
@staticmethod
def rrmdir(start_path: str, end_path: str) -> None:
    """Recursively removes empty dirs between start_path and end_path inclusive.

    Args:
        start_path: Directory to use as a starting point.
        end_path: Directory to use as a destination point.
    """
    while not os.listdir(end_path) and start_path != end_path:
        os.rmdir(end_path)
        end_path = os.path.dirname(end_path)
Functions
Modules

materializers

Materializers for Great Expectation serializable objects.

Classes
Modules
ge_materializer

Implementation of the Great Expectations materializers.

Classes
GreatExpectationsMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read/write Great Expectation objects.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
extract_metadata(data: Union[ExpectationSuite, CheckpointResult]) -> Dict[str, MetadataType]

Extract metadata from the given Great Expectations object.

Parameters:

Name Type Description Default
data Union[ExpectationSuite, CheckpointResult]

The Great Expectations object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/integrations/great_expectations/materializers/ge_materializer.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def extract_metadata(
    self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given Great Expectations object.

    Args:
        data: The Great Expectations object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    if isinstance(data, CheckpointResult):
        return {
            "checkpoint_result_name": data.name,
            "checkpoint_result_passed": data.success,
        }
    elif isinstance(data, ExpectationSuite):
        return {
            "expectation_suite_name": data.name,
        }
    return {}
load(data_type: Type[Any]) -> SerializableDictDot

Reads and returns a Great Expectations object.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
SerializableDictDot

A loaded Great Expectations object.

Source code in src/zenml/integrations/great_expectations/materializers/ge_materializer.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def load(self, data_type: Type[Any]) -> SerializableDictDot:
    """Reads and returns a Great Expectations object.

    Args:
        data_type: The type of the data to read.

    Returns:
        A loaded Great Expectations object.
    """
    filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
    artifact_dict = yaml_utils.read_json(filepath)
    data_type = source_utils.load(artifact_dict.pop("data_type"))

    if data_type is CheckpointResult:
        self.preprocess_checkpoint_result_dict(artifact_dict)

    return data_type(**artifact_dict)
preprocess_checkpoint_result_dict(artifact_dict: Dict[str, Any]) -> None staticmethod

Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.

The GE CheckpointResult object is not fully de-serializable due to some missing code in the GE codebase. We need to compensate for this by manually converting some of the attributes to their correct data types.

Parameters:

Name Type Description Default
artifact_dict Dict[str, Any]

A dict containing the GE checkpoint result.

required
Source code in src/zenml/integrations/great_expectations/materializers/ge_materializer.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@staticmethod
def preprocess_checkpoint_result_dict(
    artifact_dict: Dict[str, Any],
) -> None:
    """Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.

    The GE CheckpointResult object is not fully de-serializable
    due to some missing code in the GE codebase. We need to compensate
    for this by manually converting some of the attributes to
    their correct data types.

    Args:
        artifact_dict: A dict containing the GE checkpoint result.
    """

    def preprocess_run_result(key: str, value: Any) -> Any:
        if key == "validation_result":
            return ExpectationSuiteValidationResult(**value)
        return value

    artifact_dict["checkpoint_config"] = CheckpointConfig(
        **artifact_dict["checkpoint_config"]
    )
    validation_dict = {}
    for result_ident, results in artifact_dict["run_results"].items():
        validation_ident = (
            ValidationResultIdentifier.from_fixed_length_tuple(  # type: ignore[no-untyped-call]
                result_ident.split("::")[1].split("/")
            )
        )
        validation_results = {
            result_name: preprocess_run_result(result_name, result)
            for result_name, result in results.items()
        }
        validation_dict[validation_ident] = validation_results
    artifact_dict["run_results"] = validation_dict
save(obj: SerializableDictDot) -> None

Writes a Great Expectations object.

Parameters:

Name Type Description Default
obj SerializableDictDot

A Great Expectations object.

required
Source code in src/zenml/integrations/great_expectations/materializers/ge_materializer.py
118
119
120
121
122
123
124
125
126
127
128
129
130
def save(self, obj: SerializableDictDot) -> None:
    """Writes a Great Expectations object.

    Args:
        obj: A Great Expectations object.
    """
    filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
    artifact_dict = obj.to_json_dict()
    artifact_type = type(obj)
    artifact_dict["data_type"] = (
        f"{artifact_type.__module__}.{artifact_type.__name__}"
    )
    yaml_utils.write_json(filepath, artifact_dict)
save_visualizations(data: Union[ExpectationSuite, CheckpointResult]) -> Dict[str, VisualizationType]

Saves visualizations for the given Great Expectations object.

Parameters:

Name Type Description Default
data Union[ExpectationSuite, CheckpointResult]

The Great Expectations object to save visualizations for.

required

Returns:

Type Description
Dict[str, VisualizationType]

A dictionary of visualization URIs and their types.

Source code in src/zenml/integrations/great_expectations/materializers/ge_materializer.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def save_visualizations(
    self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, VisualizationType]:
    """Saves visualizations for the given Great Expectations object.

    Args:
        data: The Great Expectations object to save visualizations for.

    Returns:
        A dictionary of visualization URIs and their types.
    """
    visualizations = {}

    if isinstance(data, CheckpointResult):
        result = cast(CheckpointResult, data)
        identifier = next(iter(result.run_results.keys()))
    else:
        suite = cast(ExpectationSuite, data)
        identifier = ExpectationSuiteIdentifier(
            suite.expectation_suite_name
        )

    context = GreatExpectationsDataValidator.get_data_context()
    sites = context.get_docs_sites_urls(identifier)
    for site in sites:
        url = site["site_url"]
        visualizations[url] = VisualizationType.HTML

    return visualizations
Modules

steps

Great Expectations data profiling and validation standard steps.

Functions
Modules
ge_profiler

Great Expectations data profiling standard step.

Classes Functions
great_expectations_profiler_step(dataset: pd.DataFrame, expectation_suite_name: str, data_asset_name: Optional[str] = None, profiler_kwargs: Optional[Dict[str, Any]] = None, overwrite_existing_suite: bool = True) -> ExpectationSuite

Infer data validation rules from a pandas dataset.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset from which the expectation suite will be inferred.

required
expectation_suite_name str

The name of the expectation suite to infer.

required
data_asset_name Optional[str]

The name of the data asset to profile.

None
profiler_kwargs Optional[Dict[str, Any]]

A dictionary of keyword arguments to pass to the profiler.

None
overwrite_existing_suite bool

Whether to overwrite an existing expectation suite.

True

Returns:

Type Description
ExpectationSuite

The generated Great Expectations suite.

Source code in src/zenml/integrations/great_expectations/steps/ge_profiler.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@step
def great_expectations_profiler_step(
    dataset: pd.DataFrame,
    expectation_suite_name: str,
    data_asset_name: Optional[str] = None,
    profiler_kwargs: Optional[Dict[str, Any]] = None,
    overwrite_existing_suite: bool = True,
) -> ExpectationSuite:
    """Infer data validation rules from a pandas dataset.

    Args:
        dataset: The dataset from which the expectation suite will be inferred.
        expectation_suite_name: The name of the expectation suite to infer.
        data_asset_name: The name of the data asset to profile.
        profiler_kwargs: A dictionary of keyword arguments to pass to the
            profiler.
        overwrite_existing_suite: Whether to overwrite an existing expectation
            suite.

    Returns:
        The generated Great Expectations suite.
    """
    data_validator = GreatExpectationsDataValidator.get_active_data_validator()

    return data_validator.data_profiling(
        dataset,
        expectation_suite_name=expectation_suite_name,
        data_asset_name=data_asset_name,
        profiler_kwargs=profiler_kwargs or {},
        overwrite_existing_suite=overwrite_existing_suite,
    )
ge_validator

Great Expectations data validation standard step.

Classes Functions
great_expectations_validator_step(dataset: pd.DataFrame, expectation_suite_name: str, data_asset_name: Optional[str] = None, action_list: Optional[List[Dict[str, Any]]] = None, exit_on_error: bool = False) -> CheckpointResult

Shortcut function to create a new instance of the GreatExpectationsValidatorStep step.

The returned GreatExpectationsValidatorStep can be used in a pipeline to validate an input pd.DataFrame dataset and return the result as a Great Expectations CheckpointResult object. The validation results are also persisted in the Great Expectations validation store.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to run the expectation suite on.

required
expectation_suite_name str

The name of the expectation suite to use to validate the dataset.

required
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
action_list Optional[List[Dict[str, Any]]]

A list of additional Great Expectations actions to run after the validation check.

None
exit_on_error bool

Set this flag to raise an error and exit the pipeline early if the validation fails.

False

Returns:

Type Description
CheckpointResult

The Great Expectations validation (checkpoint) result.

Raises:

Type Description
RuntimeError

if the step is configured to exit on error and the data validation failed.

Source code in src/zenml/integrations/great_expectations/steps/ge_validator.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@step
def great_expectations_validator_step(
    dataset: pd.DataFrame,
    expectation_suite_name: str,
    data_asset_name: Optional[str] = None,
    action_list: Optional[List[Dict[str, Any]]] = None,
    exit_on_error: bool = False,
) -> CheckpointResult:
    """Shortcut function to create a new instance of the GreatExpectationsValidatorStep step.

    The returned GreatExpectationsValidatorStep can be used in a pipeline to
    validate an input pd.DataFrame dataset and return the result as a Great
    Expectations CheckpointResult object. The validation results are also
    persisted in the Great Expectations validation store.

    Args:
        dataset: The dataset to run the expectation suite on.
        expectation_suite_name: The name of the expectation suite to use to
            validate the dataset.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        action_list: A list of additional Great Expectations actions to run
            after the validation check.
        exit_on_error: Set this flag to raise an error and exit the pipeline
            early if the validation fails.

    Returns:
        The Great Expectations validation (checkpoint) result.

    Raises:
        RuntimeError: if the step is configured to exit on error and the
            data validation failed.
    """
    data_validator = GreatExpectationsDataValidator.get_active_data_validator()

    results = data_validator.data_validation(
        dataset,
        expectation_suite_name=expectation_suite_name,
        data_asset_name=data_asset_name,
        action_list=action_list,
    )

    if exit_on_error and not results.success:
        raise RuntimeError(
            "The Great Expectations validation failed. Check "
            "the logs or the Great Expectations data docs for more "
            "information."
        )

    return results

utils

Great Expectations data profiling standard step.

Functions
create_batch_request(context: AbstractDataContext, dataset: pd.DataFrame, data_asset_name: Optional[str]) -> RuntimeBatchRequest

Create a temporary runtime GE batch request from a dataset step artifact.

Parameters:

Name Type Description Default
context AbstractDataContext

Great Expectations data context.

required
dataset DataFrame

Input dataset.

required
data_asset_name Optional[str]

Optional custom name for the data asset.

required

Returns:

Type Description
RuntimeBatchRequest

A Great Expectations runtime batch request.

Source code in src/zenml/integrations/great_expectations/utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def create_batch_request(
    context: AbstractDataContext,
    dataset: pd.DataFrame,
    data_asset_name: Optional[str],
) -> RuntimeBatchRequest:
    """Create a temporary runtime GE batch request from a dataset step artifact.

    Args:
        context: Great Expectations data context.
        dataset: Input dataset.
        data_asset_name: Optional custom name for the data asset.

    Returns:
        A Great Expectations runtime batch request.
    """
    try:
        # get pipeline name, step name and run id
        step_context = get_step_context()
        pipeline_name = step_context.pipeline.name
        run_name = step_context.pipeline_run.name
        step_name = step_context.step_run.name
    except RuntimeError:
        # if not running inside a pipeline step, use random values
        pipeline_name = f"pipeline_{random_str(5)}"
        run_name = f"pipeline_{random_str(5)}"
        step_name = f"step_{random_str(5)}"

    datasource_name = f"{run_name}_{step_name}"
    data_connector_name = datasource_name
    data_asset_name = data_asset_name or f"{pipeline_name}_{step_name}"
    batch_identifier = "default"

    datasource_config: Dict[str, Any] = {
        "name": datasource_name,
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "module_name": "great_expectations.execution_engine",
            "class_name": "PandasExecutionEngine",
        },
        "data_connectors": {
            data_connector_name: {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": [batch_identifier],
            },
        },
    }

    context.add_datasource(**datasource_config)
    batch_request = RuntimeBatchRequest(
        datasource_name=datasource_name,
        data_connector_name=data_connector_name,
        data_asset_name=data_asset_name,
        runtime_parameters={"batch_data": dataset},
        batch_identifiers={batch_identifier: batch_identifier},
    )

    return batch_request