Skip to content

Great Expectations

zenml.integrations.great_expectations special

Great Expectation integration for ZenML.

The Great Expectations integration enables you to use Great Expectations as a way of profiling and validating your data.

GreatExpectationsIntegration (Integration)

Definition of Great Expectations integration for ZenML.

Source code in zenml/integrations/great_expectations/__init__.py
class GreatExpectationsIntegration(Integration):
    """Definition of Great Expectations integration for ZenML."""

    NAME = GREAT_EXPECTATIONS
    REQUIREMENTS = [
        "great-expectations>=0.15.0,<=0.15.47",
    ]

    @staticmethod
    def activate() -> None:
        """Activate the Great Expectations integration."""
        from zenml.integrations.great_expectations import materializers  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Great Expectations integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.great_expectations.flavors import (
            GreatExpectationsDataValidatorFlavor,
        )

        return [GreatExpectationsDataValidatorFlavor]

activate() staticmethod

Activate the Great Expectations integration.

Source code in zenml/integrations/great_expectations/__init__.py
@staticmethod
def activate() -> None:
    """Activate the Great Expectations integration."""
    from zenml.integrations.great_expectations import materializers  # noqa

flavors() classmethod

Declare the stack component flavors for the Great Expectations integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/great_expectations/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Great Expectations integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.great_expectations.flavors import (
        GreatExpectationsDataValidatorFlavor,
    )

    return [GreatExpectationsDataValidatorFlavor]

data_validators special

Initialization of the Great Expectations data validator for ZenML.

ge_data_validator

Implementation of the Great Expectations data validator.

GreatExpectationsDataValidator (BaseDataValidator)

Great Expectations data validator stack component.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
class GreatExpectationsDataValidator(BaseDataValidator):
    """Great Expectations data validator stack component."""

    NAME: ClassVar[str] = "Great Expectations"
    FLAVOR: ClassVar[
        Type[BaseDataValidatorFlavor]
    ] = GreatExpectationsDataValidatorFlavor

    _context: BaseDataContext = None
    _context_config: Optional[Dict[str, Any]] = None

    @property
    def config(self) -> GreatExpectationsDataValidatorConfig:
        """Returns the `GreatExpectationsDataValidatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(GreatExpectationsDataValidatorConfig, self._config)

    @classmethod
    def get_data_context(cls) -> BaseDataContext:
        """Get the Great Expectations data context managed by ZenML.

        Call this method to retrieve the data context managed by ZenML
        through the active Great Expectations data validator stack component.

        Returns:
            A Great Expectations data context managed by ZenML as configured
            through the active data validator stack component.
        """
        data_validator = cast(
            "GreatExpectationsDataValidator", cls.get_active_data_validator()
        )
        return data_validator.data_context

    @property
    def context_config(self) -> Optional[Dict[str, Any]]:
        """Get the Great Expectations data context configuration.

        The first time the context config is loaded from the stack component
        config, it is converted from JSON/YAML string format to a dict.

        Raises:
            ValueError: If the context_config value is not a valid JSON/YAML or
                if the GE configuration extracted from it fails GE validation.

        Returns:
            A dictionary with the GE data context configuration.
        """
        # If the context config is already loaded, return it
        if self._context_config is not None:
            return self._context_config

        # Otherwise, load it from the stack component config
        context_config = self.config.context_config
        if context_config is None:
            return None
        if isinstance(context_config, dict):
            self._context_config = context_config
            return self._context_config

        # If the context config is a string, try to parse it as JSON/YAML
        try:
            context_config_dict = yaml.safe_load(context_config)
        except yaml.parser.ParserError as e:
            raise ValueError(
                f"Malformed `context_config` value. Only JSON and YAML "
                f"formats are supported: {str(e)}"
            )

        # Validate that the context config is a valid GE config
        try:
            context_config = DataContextConfig(**context_config_dict)
            BaseDataContext(project_config=context_config)
        except Exception as e:
            raise ValueError(f"Invalid `context_config` value: {str(e)}")

        self._context_config = cast(Dict[str, Any], context_config_dict)
        return self._context_config

    @property
    def local_path(self) -> Optional[str]:
        """Return a local path where this component stores information.

        If an existing local GE data context is used, it is
        interpreted as a local path that needs to be accessible in
        all runtime environments.

        Returns:
            The local path where this component stores information.
        """
        return self.config.context_root_dir

    def get_store_config(self, class_name: str, prefix: str) -> Dict[str, Any]:
        """Generate a Great Expectations store configuration.

        Args:
            class_name: The store class name
            prefix: The path prefix for the ZenML store configuration

        Returns:
            A dictionary with the GE store configuration.
        """
        return {
            "class_name": class_name,
            "store_backend": {
                "module_name": ZenMLArtifactStoreBackend.__module__,
                "class_name": ZenMLArtifactStoreBackend.__name__,
                "prefix": f"{str(self.id)}/{prefix}",
            },
        }

    def get_data_docs_config(
        self, prefix: str, local: bool = False
    ) -> Dict[str, Any]:
        """Generate Great Expectations data docs configuration.

        Args:
            prefix: The path prefix for the ZenML data docs configuration
            local: Whether the data docs site is local or remote.

        Returns:
            A dictionary with the GE data docs site configuration.
        """
        if local:
            store_backend = {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory": f"{self.root_directory}/{prefix}",
            }
        else:
            store_backend = {
                "module_name": ZenMLArtifactStoreBackend.__module__,
                "class_name": ZenMLArtifactStoreBackend.__name__,
                "prefix": f"{str(self.id)}/{prefix}",
            }

        return {
            "class_name": "SiteBuilder",
            "store_backend": store_backend,
            "site_index_builder": {
                "class_name": "DefaultSiteIndexBuilder",
            },
        }

    @property
    def data_context(self) -> BaseDataContext:
        """Returns the Great Expectations data context configured for this component.

        Returns:
            The Great Expectations data context configured for this component.
        """
        if not self._context:
            expectations_store_name = "zenml_expectations_store"
            validations_store_name = "zenml_validations_store"
            checkpoint_store_name = "zenml_checkpoint_store"
            profiler_store_name = "zenml_profiler_store"
            evaluation_parameter_store_name = "evaluation_parameter_store"

            zenml_context_config = dict(
                stores={
                    expectations_store_name: self.get_store_config(
                        "ExpectationsStore", "expectations"
                    ),
                    validations_store_name: self.get_store_config(
                        "ValidationsStore", "validations"
                    ),
                    checkpoint_store_name: self.get_store_config(
                        "CheckpointStore", "checkpoints"
                    ),
                    profiler_store_name: self.get_store_config(
                        "ProfilerStore", "profilers"
                    ),
                    evaluation_parameter_store_name: {
                        "class_name": "EvaluationParameterStore"
                    },
                },
                expectations_store_name=expectations_store_name,
                validations_store_name=validations_store_name,
                checkpoint_store_name=checkpoint_store_name,
                profiler_store_name=profiler_store_name,
                evaluation_parameter_store_name=evaluation_parameter_store_name,
                data_docs_sites={
                    "zenml_artifact_store": self.get_data_docs_config(
                        "data_docs"
                    )
                },
            )

            configure_zenml_stores = self.config.configure_zenml_stores
            if self.config.context_root_dir:
                # initialize the local data context, if a local path was
                # configured
                self._context = DataContext(self.config.context_root_dir)
            else:
                # create an in-memory data context configuration that is not
                # backed by a local YAML file (see https://docs.greatexpectations.io/docs/guides/setup/configuring_data_contexts/how_to_instantiate_a_data_context_without_a_yml_file/).
                if self.context_config:
                    context_config = DataContextConfig(**self.context_config)
                else:
                    context_config = DataContextConfig(**zenml_context_config)
                    # skip adding the stores after initialization, as they are
                    # already baked in the initial configuration
                    configure_zenml_stores = False
                self._context = BaseDataContext(project_config=context_config)

            if configure_zenml_stores:
                self._context.config.expectations_store_name = (
                    expectations_store_name
                )
                self._context.config.validations_store_name = (
                    validations_store_name
                )
                self._context.config.checkpoint_store_name = (
                    checkpoint_store_name
                )
                self._context.config.profiler_store_name = profiler_store_name
                self._context.config.evaluation_parameter_store_name = (
                    evaluation_parameter_store_name
                )
                for store_name, store_config in zenml_context_config[  # type: ignore[attr-defined]
                    "stores"
                ].items():
                    self._context.add_store(
                        store_name=store_name,
                        store_config=store_config,
                    )
                for site_name, site_config in zenml_context_config[  # type: ignore[attr-defined]
                    "data_docs_sites"
                ].items():
                    self._context.config.data_docs_sites[
                        site_name
                    ] = site_config

            if self.config.configure_local_docs:

                client = Client()
                artifact_store = client.active_stack.artifact_store
                if artifact_store.flavor != "local":
                    self._context.config.data_docs_sites[
                        "zenml_local"
                    ] = self.get_data_docs_config("data_docs", local=True)

        return self._context

    @property
    def root_directory(self) -> str:
        """Returns path to the root directory for all local files concerning this data validator.

        Returns:
            Path to the root directory.
        """
        path = os.path.join(
            io_utils.get_global_config_directory(),
            self.flavor,
            str(self.id),
        )

        if not os.path.exists(path):
            fileio.makedirs(path)

        return path

    def data_profiling(
        self,
        dataset: pd.DataFrame,
        comparison_dataset: Optional[Any] = None,
        profile_list: Optional[Sequence[str]] = None,
        expectation_suite_name: Optional[str] = None,
        data_asset_name: Optional[str] = None,
        profiler_kwargs: Optional[Dict[str, Any]] = None,
        overwrite_existing_suite: bool = True,
        **kwargs: Any,
    ) -> ExpectationSuite:
        """Infer a Great Expectation Expectation Suite from a given dataset.

        This Great Expectations specific data profiling method implementation
        builds an Expectation Suite automatically by running a
        UserConfigurableProfiler on an input dataset [as covered in the official
        GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).

        Args:
            dataset: The dataset from which the expectation suite will be
                inferred.
            comparison_dataset: Optional dataset used to generate data
                comparison (i.e. data drift) profiles. Not supported by the
                Great Expectation data validator.
            profile_list: Optional list identifying the categories of data
                profiles to be generated. Not supported by the Great Expectation
                data validator.
            expectation_suite_name: The name of the expectation suite to create
                or update. If not supplied, a unique name will be generated from
                the current pipeline and step name, if running in the context of
                a pipeline step.
            data_asset_name: The name of the data asset to use to identify the
                dataset in the Great Expectations docs.
            profiler_kwargs: A dictionary of custom keyword arguments to pass to
                the profiler.
            overwrite_existing_suite: Whether to overwrite an existing
                expectation suite, if one exists with that name.
            kwargs: Additional keyword arguments (unused).

        Returns:
            The inferred Expectation Suite.

        Raises:
            ValueError: if an `expectation_suite_name` value is not supplied and
                a name for the expectation suite cannot be generated from the
                current step name and pipeline name.
        """
        context = self.data_context

        if comparison_dataset is not None:
            logger.warning(
                "A comparison dataset is not required by Great Expectations "
                "to do data profiling. Silently ignoring the supplied dataset "
            )

        if not expectation_suite_name:
            try:
                # get pipeline name and step name
                step_env = cast(
                    StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME]
                )
                pipeline_name = step_env.pipeline_name
                step_name = step_env.step_name
                expectation_suite_name = f"{pipeline_name}_{step_name}"
            except KeyError:
                raise ValueError(
                    "A expectation suite name is required when not running in "
                    "the context of a pipeline step."
                )

        suite_exists = False
        if context.expectations_store.has_key(  # noqa
            ExpectationSuiteIdentifier(expectation_suite_name)
        ):
            suite_exists = True
            suite = context.get_expectation_suite(expectation_suite_name)
            if not overwrite_existing_suite:
                logger.info(
                    f"Expectation Suite `{expectation_suite_name}` "
                    f"already exists and `overwrite_existing_suite` is not set "
                    f"in the step configuration. Skipping re-running the "
                    f"profiler."
                )
                return suite

        batch_request = create_batch_request(context, dataset, data_asset_name)

        try:
            if suite_exists:
                validator = context.get_validator(
                    batch_request=batch_request,
                    expectation_suite_name=expectation_suite_name,
                )
            else:
                validator = context.get_validator(
                    batch_request=batch_request,
                    create_expectation_suite_with_name=expectation_suite_name,
                )

            profiler = UserConfigurableProfiler(
                profile_dataset=validator, **profiler_kwargs
            )

            suite = profiler.build_suite()
            context.save_expectation_suite(
                expectation_suite=suite,
                expectation_suite_name=expectation_suite_name,
            )

            context.build_data_docs()
        finally:
            context.delete_datasource(batch_request.datasource_name)

        return suite

    def data_validation(
        self,
        dataset: pd.DataFrame,
        comparison_dataset: Optional[Any] = None,
        check_list: Optional[Sequence[str]] = None,
        expectation_suite_name: Optional[str] = None,
        data_asset_name: Optional[str] = None,
        action_list: Optional[List[Dict[str, Any]]] = None,
        **kwargs: Any,
    ) -> CheckpointResult:
        """Great Expectations data validation.

        This Great Expectations specific data validation method
        implementation validates an input dataset against an Expectation Suite
        (the GE definition of a profile) [as covered in the official GE
        documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).

        Args:
            dataset: The dataset to validate.
            comparison_dataset: Optional dataset used to run data
                comparison (i.e. data drift) checks. Not supported by the
                Great Expectation data validator.
            check_list: Optional list identifying the data validation checks to
                be performed. Not supported by the Great Expectations data
                validator.
            expectation_suite_name: The name of the expectation suite to use to
                validate the dataset. A value must be provided.
            data_asset_name: The name of the data asset to use to identify the
                dataset in the Great Expectations docs.
            action_list: A list of additional Great Expectations actions to run after
                the validation check.
            kwargs: Additional keyword arguments (unused).

        Returns:
            The Great Expectations validation (checkpoint) result.

        Raises:
            ValueError: if the `expectation_suite_name` argument is omitted.
        """
        if not expectation_suite_name:
            raise ValueError("Missing expectation_suite_name argument value.")

        if comparison_dataset is not None:
            logger.warning(
                "A comparison dataset is not required by Great Expectations "
                "to do data validation. Silently ignoring the supplied dataset "
            )

        try:
            # get pipeline name, step name and run id
            step_env = cast(
                StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME]
            )
            run_name = step_env.run_name
            step_name = step_env.step_name
        except KeyError:
            # if not running inside a pipeline step, use random values
            run_name = f"pipeline_{random_str(5)}"
            step_name = f"step_{random_str(5)}"

        context = self.data_context

        checkpoint_name = f"{run_name}_{step_name}"

        batch_request = create_batch_request(context, dataset, data_asset_name)

        action_list = action_list or [
            {
                "name": "store_validation_result",
                "action": {"class_name": "StoreValidationResultAction"},
            },
            {
                "name": "store_evaluation_params",
                "action": {"class_name": "StoreEvaluationParametersAction"},
            },
            {
                "name": "update_data_docs",
                "action": {"class_name": "UpdateDataDocsAction"},
            },
        ]

        checkpoint_config = {
            "name": checkpoint_name,
            "run_name_template": run_name,
            "config_version": 1,
            "class_name": "Checkpoint",
            "expectation_suite_name": expectation_suite_name,
            "action_list": action_list,
        }
        context.add_checkpoint(**checkpoint_config)

        try:
            results = context.run_checkpoint(
                checkpoint_name=checkpoint_name,
                validations=[{"batch_request": batch_request}],
            )
        finally:
            context.delete_datasource(batch_request.datasource_name)
            context.delete_checkpoint(checkpoint_name)

        return results
config: GreatExpectationsDataValidatorConfig property readonly

Returns the GreatExpectationsDataValidatorConfig config.

Returns:

Type Description
GreatExpectationsDataValidatorConfig

The configuration.

context_config: Optional[Dict[str, Any]] property readonly

Get the Great Expectations data context configuration.

The first time the context config is loaded from the stack component config, it is converted from JSON/YAML string format to a dict.

Exceptions:

Type Description
ValueError

If the context_config value is not a valid JSON/YAML or if the GE configuration extracted from it fails GE validation.

Returns:

Type Description
Optional[Dict[str, Any]]

A dictionary with the GE data context configuration.

data_context: <function BaseDataContext at 0x7f5d84753790> property readonly

Returns the Great Expectations data context configured for this component.

Returns:

Type Description
<function BaseDataContext at 0x7f5d84753790>

The Great Expectations data context configured for this component.

local_path: Optional[str] property readonly

Return a local path where this component stores information.

If an existing local GE data context is used, it is interpreted as a local path that needs to be accessible in all runtime environments.

Returns:

Type Description
Optional[str]

The local path where this component stores information.

root_directory: str property readonly

Returns path to the root directory for all local files concerning this data validator.

Returns:

Type Description
str

Path to the root directory.

FLAVOR (BaseDataValidatorFlavor)

Great Expectations data validator flavor.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
class GreatExpectationsDataValidatorFlavor(BaseDataValidatorFlavor):
    """Great Expectations data validator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return GREAT_EXPECTATIONS_DATA_VALIDATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/data_validator/greatexpectations.jpeg"

    @property
    def config_class(self) -> Type[GreatExpectationsDataValidatorConfig]:
        """Returns `GreatExpectationsDataValidatorConfig` config class.

        Returns:
                The config class.
        """
        return GreatExpectationsDataValidatorConfig

    @property
    def implementation_class(self) -> Type["GreatExpectationsDataValidator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.great_expectations.data_validators import (
            GreatExpectationsDataValidator,
        )

        return GreatExpectationsDataValidator
config_class: Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig] property readonly

Returns GreatExpectationsDataValidatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GreatExpectationsDataValidator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[GreatExpectationsDataValidator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

data_profiling(self, dataset, comparison_dataset=None, profile_list=None, expectation_suite_name=None, data_asset_name=None, profiler_kwargs=None, overwrite_existing_suite=True, **kwargs)

Infer a Great Expectation Expectation Suite from a given dataset.

This Great Expectations specific data profiling method implementation builds an Expectation Suite automatically by running a UserConfigurableProfiler on an input dataset as covered in the official GE documentation.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset from which the expectation suite will be inferred.

required
comparison_dataset Optional[Any]

Optional dataset used to generate data comparison (i.e. data drift) profiles. Not supported by the Great Expectation data validator.

None
profile_list Optional[Sequence[str]]

Optional list identifying the categories of data profiles to be generated. Not supported by the Great Expectation data validator.

None
expectation_suite_name Optional[str]

The name of the expectation suite to create or update. If not supplied, a unique name will be generated from the current pipeline and step name, if running in the context of a pipeline step.

None
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
profiler_kwargs Optional[Dict[str, Any]]

A dictionary of custom keyword arguments to pass to the profiler.

None
overwrite_existing_suite bool

Whether to overwrite an existing expectation suite, if one exists with that name.

True
kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
ExpectationSuite

The inferred Expectation Suite.

Exceptions:

Type Description
ValueError

if an expectation_suite_name value is not supplied and a name for the expectation suite cannot be generated from the current step name and pipeline name.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def data_profiling(
    self,
    dataset: pd.DataFrame,
    comparison_dataset: Optional[Any] = None,
    profile_list: Optional[Sequence[str]] = None,
    expectation_suite_name: Optional[str] = None,
    data_asset_name: Optional[str] = None,
    profiler_kwargs: Optional[Dict[str, Any]] = None,
    overwrite_existing_suite: bool = True,
    **kwargs: Any,
) -> ExpectationSuite:
    """Infer a Great Expectation Expectation Suite from a given dataset.

    This Great Expectations specific data profiling method implementation
    builds an Expectation Suite automatically by running a
    UserConfigurableProfiler on an input dataset [as covered in the official
    GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).

    Args:
        dataset: The dataset from which the expectation suite will be
            inferred.
        comparison_dataset: Optional dataset used to generate data
            comparison (i.e. data drift) profiles. Not supported by the
            Great Expectation data validator.
        profile_list: Optional list identifying the categories of data
            profiles to be generated. Not supported by the Great Expectation
            data validator.
        expectation_suite_name: The name of the expectation suite to create
            or update. If not supplied, a unique name will be generated from
            the current pipeline and step name, if running in the context of
            a pipeline step.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        profiler_kwargs: A dictionary of custom keyword arguments to pass to
            the profiler.
        overwrite_existing_suite: Whether to overwrite an existing
            expectation suite, if one exists with that name.
        kwargs: Additional keyword arguments (unused).

    Returns:
        The inferred Expectation Suite.

    Raises:
        ValueError: if an `expectation_suite_name` value is not supplied and
            a name for the expectation suite cannot be generated from the
            current step name and pipeline name.
    """
    context = self.data_context

    if comparison_dataset is not None:
        logger.warning(
            "A comparison dataset is not required by Great Expectations "
            "to do data profiling. Silently ignoring the supplied dataset "
        )

    if not expectation_suite_name:
        try:
            # get pipeline name and step name
            step_env = cast(
                StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME]
            )
            pipeline_name = step_env.pipeline_name
            step_name = step_env.step_name
            expectation_suite_name = f"{pipeline_name}_{step_name}"
        except KeyError:
            raise ValueError(
                "A expectation suite name is required when not running in "
                "the context of a pipeline step."
            )

    suite_exists = False
    if context.expectations_store.has_key(  # noqa
        ExpectationSuiteIdentifier(expectation_suite_name)
    ):
        suite_exists = True
        suite = context.get_expectation_suite(expectation_suite_name)
        if not overwrite_existing_suite:
            logger.info(
                f"Expectation Suite `{expectation_suite_name}` "
                f"already exists and `overwrite_existing_suite` is not set "
                f"in the step configuration. Skipping re-running the "
                f"profiler."
            )
            return suite

    batch_request = create_batch_request(context, dataset, data_asset_name)

    try:
        if suite_exists:
            validator = context.get_validator(
                batch_request=batch_request,
                expectation_suite_name=expectation_suite_name,
            )
        else:
            validator = context.get_validator(
                batch_request=batch_request,
                create_expectation_suite_with_name=expectation_suite_name,
            )

        profiler = UserConfigurableProfiler(
            profile_dataset=validator, **profiler_kwargs
        )

        suite = profiler.build_suite()
        context.save_expectation_suite(
            expectation_suite=suite,
            expectation_suite_name=expectation_suite_name,
        )

        context.build_data_docs()
    finally:
        context.delete_datasource(batch_request.datasource_name)

    return suite
data_validation(self, dataset, comparison_dataset=None, check_list=None, expectation_suite_name=None, data_asset_name=None, action_list=None, **kwargs)

Great Expectations data validation.

This Great Expectations specific data validation method implementation validates an input dataset against an Expectation Suite (the GE definition of a profile) as covered in the official GE documentation.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to validate.

required
comparison_dataset Optional[Any]

Optional dataset used to run data comparison (i.e. data drift) checks. Not supported by the Great Expectation data validator.

None
check_list Optional[Sequence[str]]

Optional list identifying the data validation checks to be performed. Not supported by the Great Expectations data validator.

None
expectation_suite_name Optional[str]

The name of the expectation suite to use to validate the dataset. A value must be provided.

None
data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

None
action_list Optional[List[Dict[str, Any]]]

A list of additional Great Expectations actions to run after the validation check.

None
kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
CheckpointResult

The Great Expectations validation (checkpoint) result.

Exceptions:

Type Description
ValueError

if the expectation_suite_name argument is omitted.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def data_validation(
    self,
    dataset: pd.DataFrame,
    comparison_dataset: Optional[Any] = None,
    check_list: Optional[Sequence[str]] = None,
    expectation_suite_name: Optional[str] = None,
    data_asset_name: Optional[str] = None,
    action_list: Optional[List[Dict[str, Any]]] = None,
    **kwargs: Any,
) -> CheckpointResult:
    """Great Expectations data validation.

    This Great Expectations specific data validation method
    implementation validates an input dataset against an Expectation Suite
    (the GE definition of a profile) [as covered in the official GE
    documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).

    Args:
        dataset: The dataset to validate.
        comparison_dataset: Optional dataset used to run data
            comparison (i.e. data drift) checks. Not supported by the
            Great Expectation data validator.
        check_list: Optional list identifying the data validation checks to
            be performed. Not supported by the Great Expectations data
            validator.
        expectation_suite_name: The name of the expectation suite to use to
            validate the dataset. A value must be provided.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        action_list: A list of additional Great Expectations actions to run after
            the validation check.
        kwargs: Additional keyword arguments (unused).

    Returns:
        The Great Expectations validation (checkpoint) result.

    Raises:
        ValueError: if the `expectation_suite_name` argument is omitted.
    """
    if not expectation_suite_name:
        raise ValueError("Missing expectation_suite_name argument value.")

    if comparison_dataset is not None:
        logger.warning(
            "A comparison dataset is not required by Great Expectations "
            "to do data validation. Silently ignoring the supplied dataset "
        )

    try:
        # get pipeline name, step name and run id
        step_env = cast(
            StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME]
        )
        run_name = step_env.run_name
        step_name = step_env.step_name
    except KeyError:
        # if not running inside a pipeline step, use random values
        run_name = f"pipeline_{random_str(5)}"
        step_name = f"step_{random_str(5)}"

    context = self.data_context

    checkpoint_name = f"{run_name}_{step_name}"

    batch_request = create_batch_request(context, dataset, data_asset_name)

    action_list = action_list or [
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"},
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"},
        },
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"},
        },
    ]

    checkpoint_config = {
        "name": checkpoint_name,
        "run_name_template": run_name,
        "config_version": 1,
        "class_name": "Checkpoint",
        "expectation_suite_name": expectation_suite_name,
        "action_list": action_list,
    }
    context.add_checkpoint(**checkpoint_config)

    try:
        results = context.run_checkpoint(
            checkpoint_name=checkpoint_name,
            validations=[{"batch_request": batch_request}],
        )
    finally:
        context.delete_datasource(batch_request.datasource_name)
        context.delete_checkpoint(checkpoint_name)

    return results
get_data_context() classmethod

Get the Great Expectations data context managed by ZenML.

Call this method to retrieve the data context managed by ZenML through the active Great Expectations data validator stack component.

Returns:

Type Description
<function BaseDataContext at 0x7f5d84753790>

A Great Expectations data context managed by ZenML as configured through the active data validator stack component.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
@classmethod
def get_data_context(cls) -> BaseDataContext:
    """Get the Great Expectations data context managed by ZenML.

    Call this method to retrieve the data context managed by ZenML
    through the active Great Expectations data validator stack component.

    Returns:
        A Great Expectations data context managed by ZenML as configured
        through the active data validator stack component.
    """
    data_validator = cast(
        "GreatExpectationsDataValidator", cls.get_active_data_validator()
    )
    return data_validator.data_context
get_data_docs_config(self, prefix, local=False)

Generate Great Expectations data docs configuration.

Parameters:

Name Type Description Default
prefix str

The path prefix for the ZenML data docs configuration

required
local bool

Whether the data docs site is local or remote.

False

Returns:

Type Description
Dict[str, Any]

A dictionary with the GE data docs site configuration.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def get_data_docs_config(
    self, prefix: str, local: bool = False
) -> Dict[str, Any]:
    """Generate Great Expectations data docs configuration.

    Args:
        prefix: The path prefix for the ZenML data docs configuration
        local: Whether the data docs site is local or remote.

    Returns:
        A dictionary with the GE data docs site configuration.
    """
    if local:
        store_backend = {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory": f"{self.root_directory}/{prefix}",
        }
    else:
        store_backend = {
            "module_name": ZenMLArtifactStoreBackend.__module__,
            "class_name": ZenMLArtifactStoreBackend.__name__,
            "prefix": f"{str(self.id)}/{prefix}",
        }

    return {
        "class_name": "SiteBuilder",
        "store_backend": store_backend,
        "site_index_builder": {
            "class_name": "DefaultSiteIndexBuilder",
        },
    }
get_store_config(self, class_name, prefix)

Generate a Great Expectations store configuration.

Parameters:

Name Type Description Default
class_name str

The store class name

required
prefix str

The path prefix for the ZenML store configuration

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the GE store configuration.

Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def get_store_config(self, class_name: str, prefix: str) -> Dict[str, Any]:
    """Generate a Great Expectations store configuration.

    Args:
        class_name: The store class name
        prefix: The path prefix for the ZenML store configuration

    Returns:
        A dictionary with the GE store configuration.
    """
    return {
        "class_name": class_name,
        "store_backend": {
            "module_name": ZenMLArtifactStoreBackend.__module__,
            "class_name": ZenMLArtifactStoreBackend.__name__,
            "prefix": f"{str(self.id)}/{prefix}",
        },
    }

flavors special

Great Expectations integration flavors.

great_expectations_data_validator_flavor

Great Expectations data validator flavor.

GreatExpectationsDataValidatorConfig (BaseDataValidatorConfig) pydantic-model

Config for the Great Expectations data validator.

Attributes:

Name Type Description
context_root_dir Optional[str]

location of an already initialized Great Expectations data context. If configured, the data validator will only be usable with local orchestrators.

context_config Optional[Dict[str, Any]]

in-line Great Expectations data context configuration.

configure_zenml_stores bool

if set, ZenML will automatically configure stores that use the Artifact Store as a backend. If neither context_root_dir nor context_config are set, this is the default behavior.

configure_local_docs bool

configure a local data docs site where Great Expectations docs are generated and can be visualized locally.

Source code in zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
class GreatExpectationsDataValidatorConfig(BaseDataValidatorConfig):
    """Config for the Great Expectations data validator.

    Attributes:
        context_root_dir: location of an already initialized Great Expectations
            data context. If configured, the data validator will only be usable
            with local orchestrators.
        context_config: in-line Great Expectations data context configuration.
        configure_zenml_stores: if set, ZenML will automatically configure
            stores that use the Artifact Store as a backend. If neither
            `context_root_dir` nor `context_config` are set, this is the default
            behavior.
        configure_local_docs: configure a local data docs site where Great
            Expectations docs are generated and can be visualized locally.
    """

    context_root_dir: Optional[str] = None
    context_config: Optional[Dict[str, Any]] = None
    configure_zenml_stores: bool = False
    configure_local_docs: bool = True

    @validator("context_root_dir")
    def _ensure_valid_context_root_dir(
        cls, context_root_dir: Optional[str] = None
    ) -> Optional[str]:
        """Ensures that the root directory is an absolute path and points to an existing path.

        Args:
            context_root_dir: The context_root_dir value to validate.

        Returns:
            The context_root_dir if it is valid.

        Raises:
            ValueError: If the context_root_dir is not valid.
        """
        if context_root_dir:
            context_root_dir = os.path.abspath(context_root_dir)
            if not fileio.exists(context_root_dir):
                raise ValueError(
                    f"The Great Expectations context_root_dir value doesn't "
                    f"point to an existing data context path: {context_root_dir}"
                )
        return context_root_dir

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        # If an existing local GE data context is used, it is
        # interpreted as a local path that needs to be accessible in
        # all runtime environments.
        return self.context_root_dir is not None
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

GreatExpectationsDataValidatorFlavor (BaseDataValidatorFlavor)

Great Expectations data validator flavor.

Source code in zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
class GreatExpectationsDataValidatorFlavor(BaseDataValidatorFlavor):
    """Great Expectations data validator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return GREAT_EXPECTATIONS_DATA_VALIDATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/data_validator/greatexpectations.jpeg"

    @property
    def config_class(self) -> Type[GreatExpectationsDataValidatorConfig]:
        """Returns `GreatExpectationsDataValidatorConfig` config class.

        Returns:
                The config class.
        """
        return GreatExpectationsDataValidatorConfig

    @property
    def implementation_class(self) -> Type["GreatExpectationsDataValidator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.great_expectations.data_validators import (
            GreatExpectationsDataValidator,
        )

        return GreatExpectationsDataValidator
config_class: Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig] property readonly

Returns GreatExpectationsDataValidatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GreatExpectationsDataValidator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[GreatExpectationsDataValidator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

ge_store_backend

Great Expectations store plugin for ZenML.

ZenMLArtifactStoreBackend (TupleStoreBackend)

Great Expectations store backend that uses the active ZenML Artifact Store as a store.

Source code in zenml/integrations/great_expectations/ge_store_backend.py
class ZenMLArtifactStoreBackend(TupleStoreBackend):  # type: ignore[misc]
    """Great Expectations store backend that uses the active ZenML Artifact Store as a store."""

    def __init__(
        self,
        prefix: str = "",
        **kwargs: Any,
    ) -> None:
        """Create a Great Expectations ZenML store backend instance.

        Args:
            prefix: Subpath prefix to use for this store backend.
            kwargs: Additional keyword arguments passed by the Great Expectations
                core. These are transparently passed to the `TupleStoreBackend`
                constructor.
        """
        super().__init__(**kwargs)

        client = Client()
        artifact_store = client.active_stack.artifact_store
        self.root_path = os.path.join(
            artifact_store.path, "great_expectations"
        )

        # extract the protocol used in the artifact store root path
        protocols = [
            scheme
            for scheme in artifact_store.config.SUPPORTED_SCHEMES
            if self.root_path.startswith(scheme)
        ]
        if protocols:
            self.proto = protocols[0]
        else:
            self.proto = ""

        if prefix:
            if self.platform_specific_separator:
                prefix = prefix.strip(os.sep)
            prefix = prefix.strip("/")
        self.prefix = prefix

        # Initialize with store_backend_id if not part of an HTMLSiteStore
        if not self._suppress_store_backend_id:
            _ = self.store_backend_id

        self._config = {
            "prefix": prefix,
            "module_name": self.__class__.__module__,
            "class_name": self.__class__.__name__,
        }
        self._config.update(kwargs)
        filter_properties_dict(
            properties=self._config, clean_falsy=True, inplace=True
        )

    def _build_object_path(
        self, key: Tuple[str, ...], is_prefix: bool = False
    ) -> str:
        """Build a filepath corresponding to an object key.

        Args:
            key: Great Expectation object key.
            is_prefix: If True, the key will be interpreted as a prefix instead
                of a full key identifier.

        Returns:
            The file path pointing to where the object is stored.
        """
        if not isinstance(key, tuple):
            key = key.to_tuple()
        if not is_prefix:
            object_relative_path = self._convert_key_to_filepath(key)
        elif key:
            object_relative_path = os.path.join(*key)
        else:
            object_relative_path = ""
        if self.prefix:
            object_key = os.path.join(self.prefix, object_relative_path)
        else:
            object_key = object_relative_path
        return os.path.join(self.root_path, object_key)

    def _get(self, key: Tuple[str, ...]) -> str:
        """Get the value of an object from the store.

        Args:
            key: object key identifier.

        Raises:
            InvalidKeyError: if the key doesn't point to an existing object.

        Returns:
            str: the object's contents
        """
        filepath: str = self._build_object_path(key)
        if fileio.exists(filepath):
            contents = io_utils.read_file_contents_as_string(filepath).rstrip(
                "\n"
            )
        else:
            raise InvalidKeyError(
                f"Unable to retrieve object from {self.__class__.__name__} with "
                f"the following Key: {str(filepath)}"
            )
        return contents

    def _set(self, key: Tuple[str, ...], value: str, **kwargs: Any) -> str:
        """Set the value of an object in the store.

        Args:
            key: object key identifier.
            value: object value to set.
            kwargs: additional keyword arguments (ignored).

        Returns:
            The file path where the object was stored.
        """
        filepath: str = self._build_object_path(key)
        if not io_utils.is_remote(filepath):
            parent_dir = str(Path(filepath).parent)
            os.makedirs(parent_dir, exist_ok=True)

        with fileio.open(filepath, "wb") as outfile:
            if isinstance(value, str):
                outfile.write(value.encode("utf-8"))
            else:
                outfile.write(value)
        return filepath

    def _move(
        self,
        source_key: Tuple[str, ...],
        dest_key: Tuple[str, ...],
        **kwargs: Any,
    ) -> None:
        """Associate an object with a different key in the store.

        Args:
            source_key: current object key identifier.
            dest_key: new object key identifier.
            kwargs: additional keyword arguments (ignored).
        """
        source_path = self._build_object_path(source_key)
        dest_path = self._build_object_path(dest_key)

        if fileio.exists(source_path):
            if not io_utils.is_remote(dest_path):
                parent_dir = str(Path(dest_path).parent)
                os.makedirs(parent_dir, exist_ok=True)
            fileio.rename(source_path, dest_path, overwrite=True)

    def list_keys(self, prefix: Tuple[str, ...] = ()) -> List[Tuple[str, ...]]:
        """List the keys of all objects identified by a partial key.

        Args:
            prefix: partial object key identifier.

        Returns:
            List of keys identifying all objects present in the store that
            match the input partial key.
        """
        key_list = []
        list_path = self._build_object_path(prefix, is_prefix=True)
        root_path = self._build_object_path(tuple(), is_prefix=True)
        for root, dirs, files in fileio.walk(list_path):
            for file_ in files:
                filepath = os.path.relpath(
                    os.path.join(str(root), str(file_)), root_path
                )

                if self.filepath_prefix and not filepath.startswith(
                    self.filepath_prefix
                ):
                    continue
                elif self.filepath_suffix and not filepath.endswith(
                    self.filepath_suffix
                ):
                    continue
                key = self._convert_filepath_to_key(filepath)
                if key and not self.is_ignored_key(key):
                    key_list.append(key)
        return key_list

    def remove_key(self, key: Tuple[str, ...]) -> bool:
        """Delete an object from the store.

        Args:
            key: object key identifier.

        Returns:
            True if the object existed in the store and was removed, otherwise
            False.
        """
        filepath: str = self._build_object_path(key)

        if fileio.exists(filepath):
            fileio.remove(filepath)
            if not io_utils.is_remote(filepath):
                parent_dir = str(Path(filepath).parent)
                self.rrmdir(self.root_path, str(parent_dir))
            return True
        return False

    def _has_key(self, key: Tuple[str, ...]) -> bool:
        """Check if an object is present in the store.

        Args:
            key: object key identifier.

        Returns:
            True if the object is present in the store, otherwise False.
        """
        filepath: str = self._build_object_path(key)
        result = fileio.exists(filepath)
        return result

    def get_url_for_key(
        self, key: Tuple[str, ...], protocol: Optional[str] = None
    ) -> str:
        """Get the URL of an object in the store.

        Args:
            key: object key identifier.
            protocol: optional protocol to use instead of the store protocol.

        Returns:
            The URL of the object in the store.
        """
        filepath = self._build_object_path(key)
        if not protocol and not io_utils.is_remote(filepath):
            protocol = "file:"
        if protocol:
            filepath = filepath.replace(self.proto, f"{protocol}//", 1)

        return filepath

    def get_public_url_for_key(
        self, key: str, protocol: Optional[str] = None
    ) -> str:
        """Get the public URL of an object in the store.

        Args:
            key: object key identifier.
            protocol: optional protocol to use instead of the store protocol.

        Returns:
            The public URL where the object can be accessed.

        Raises:
            StoreBackendError: if a `base_public_path` attribute was not
                configured for the store.
        """
        if not self.base_public_path:
            raise StoreBackendError(
                f"Error: No base_public_path was configured! A public URL was "
                f"requested but `base_public_path` was not configured for the "
                f"{self.__class__.__name__}"
            )
        filepath = self._convert_key_to_filepath(key)
        public_url = self.base_public_path + filepath.replace(self.proto, "")
        return cast(str, public_url)

    @staticmethod
    def rrmdir(start_path: str, end_path: str) -> None:
        """Recursively removes empty dirs between start_path and end_path inclusive.

        Args:
            start_path: Directory to use as a starting point.
            end_path: Directory to use as a destination point.
        """
        while not os.listdir(end_path) and start_path != end_path:
            os.rmdir(end_path)
            end_path = os.path.dirname(end_path)

    @property
    def config(self) -> Dict[str, Any]:
        """Get the store configuration.

        Returns:
            The store configuration.
        """
        return self._config
config: Dict[str, Any] property readonly

Get the store configuration.

Returns:

Type Description
Dict[str, Any]

The store configuration.

__init__(self, prefix='', **kwargs) special

Create a Great Expectations ZenML store backend instance.

Parameters:

Name Type Description Default
prefix str

Subpath prefix to use for this store backend.

''
kwargs Any

Additional keyword arguments passed by the Great Expectations core. These are transparently passed to the TupleStoreBackend constructor.

{}
Source code in zenml/integrations/great_expectations/ge_store_backend.py
def __init__(
    self,
    prefix: str = "",
    **kwargs: Any,
) -> None:
    """Create a Great Expectations ZenML store backend instance.

    Args:
        prefix: Subpath prefix to use for this store backend.
        kwargs: Additional keyword arguments passed by the Great Expectations
            core. These are transparently passed to the `TupleStoreBackend`
            constructor.
    """
    super().__init__(**kwargs)

    client = Client()
    artifact_store = client.active_stack.artifact_store
    self.root_path = os.path.join(
        artifact_store.path, "great_expectations"
    )

    # extract the protocol used in the artifact store root path
    protocols = [
        scheme
        for scheme in artifact_store.config.SUPPORTED_SCHEMES
        if self.root_path.startswith(scheme)
    ]
    if protocols:
        self.proto = protocols[0]
    else:
        self.proto = ""

    if prefix:
        if self.platform_specific_separator:
            prefix = prefix.strip(os.sep)
        prefix = prefix.strip("/")
    self.prefix = prefix

    # Initialize with store_backend_id if not part of an HTMLSiteStore
    if not self._suppress_store_backend_id:
        _ = self.store_backend_id

    self._config = {
        "prefix": prefix,
        "module_name": self.__class__.__module__,
        "class_name": self.__class__.__name__,
    }
    self._config.update(kwargs)
    filter_properties_dict(
        properties=self._config, clean_falsy=True, inplace=True
    )
get_public_url_for_key(self, key, protocol=None)

Get the public URL of an object in the store.

Parameters:

Name Type Description Default
key str

object key identifier.

required
protocol Optional[str]

optional protocol to use instead of the store protocol.

None

Returns:

Type Description
str

The public URL where the object can be accessed.

Exceptions:

Type Description
StoreBackendError

if a base_public_path attribute was not configured for the store.

Source code in zenml/integrations/great_expectations/ge_store_backend.py
def get_public_url_for_key(
    self, key: str, protocol: Optional[str] = None
) -> str:
    """Get the public URL of an object in the store.

    Args:
        key: object key identifier.
        protocol: optional protocol to use instead of the store protocol.

    Returns:
        The public URL where the object can be accessed.

    Raises:
        StoreBackendError: if a `base_public_path` attribute was not
            configured for the store.
    """
    if not self.base_public_path:
        raise StoreBackendError(
            f"Error: No base_public_path was configured! A public URL was "
            f"requested but `base_public_path` was not configured for the "
            f"{self.__class__.__name__}"
        )
    filepath = self._convert_key_to_filepath(key)
    public_url = self.base_public_path + filepath.replace(self.proto, "")
    return cast(str, public_url)
get_url_for_key(self, key, protocol=None)

Get the URL of an object in the store.

Parameters:

Name Type Description Default
key Tuple[str, ...]

object key identifier.

required
protocol Optional[str]

optional protocol to use instead of the store protocol.

None

Returns:

Type Description
str

The URL of the object in the store.

Source code in zenml/integrations/great_expectations/ge_store_backend.py
def get_url_for_key(
    self, key: Tuple[str, ...], protocol: Optional[str] = None
) -> str:
    """Get the URL of an object in the store.

    Args:
        key: object key identifier.
        protocol: optional protocol to use instead of the store protocol.

    Returns:
        The URL of the object in the store.
    """
    filepath = self._build_object_path(key)
    if not protocol and not io_utils.is_remote(filepath):
        protocol = "file:"
    if protocol:
        filepath = filepath.replace(self.proto, f"{protocol}//", 1)

    return filepath
list_keys(self, prefix=())

List the keys of all objects identified by a partial key.

Parameters:

Name Type Description Default
prefix Tuple[str, ...]

partial object key identifier.

()

Returns:

Type Description
List[Tuple[str, ...]]

List of keys identifying all objects present in the store that match the input partial key.

Source code in zenml/integrations/great_expectations/ge_store_backend.py
def list_keys(self, prefix: Tuple[str, ...] = ()) -> List[Tuple[str, ...]]:
    """List the keys of all objects identified by a partial key.

    Args:
        prefix: partial object key identifier.

    Returns:
        List of keys identifying all objects present in the store that
        match the input partial key.
    """
    key_list = []
    list_path = self._build_object_path(prefix, is_prefix=True)
    root_path = self._build_object_path(tuple(), is_prefix=True)
    for root, dirs, files in fileio.walk(list_path):
        for file_ in files:
            filepath = os.path.relpath(
                os.path.join(str(root), str(file_)), root_path
            )

            if self.filepath_prefix and not filepath.startswith(
                self.filepath_prefix
            ):
                continue
            elif self.filepath_suffix and not filepath.endswith(
                self.filepath_suffix
            ):
                continue
            key = self._convert_filepath_to_key(filepath)
            if key and not self.is_ignored_key(key):
                key_list.append(key)
    return key_list
remove_key(self, key)

Delete an object from the store.

Parameters:

Name Type Description Default
key Tuple[str, ...]

object key identifier.

required

Returns:

Type Description
bool

True if the object existed in the store and was removed, otherwise False.

Source code in zenml/integrations/great_expectations/ge_store_backend.py
def remove_key(self, key: Tuple[str, ...]) -> bool:
    """Delete an object from the store.

    Args:
        key: object key identifier.

    Returns:
        True if the object existed in the store and was removed, otherwise
        False.
    """
    filepath: str = self._build_object_path(key)

    if fileio.exists(filepath):
        fileio.remove(filepath)
        if not io_utils.is_remote(filepath):
            parent_dir = str(Path(filepath).parent)
            self.rrmdir(self.root_path, str(parent_dir))
        return True
    return False
rrmdir(start_path, end_path) staticmethod

Recursively removes empty dirs between start_path and end_path inclusive.

Parameters:

Name Type Description Default
start_path str

Directory to use as a starting point.

required
end_path str

Directory to use as a destination point.

required
Source code in zenml/integrations/great_expectations/ge_store_backend.py
@staticmethod
def rrmdir(start_path: str, end_path: str) -> None:
    """Recursively removes empty dirs between start_path and end_path inclusive.

    Args:
        start_path: Directory to use as a starting point.
        end_path: Directory to use as a destination point.
    """
    while not os.listdir(end_path) and start_path != end_path:
        os.rmdir(end_path)
        end_path = os.path.dirname(end_path)

materializers special

Materializers for Great Expectation serializable objects.

ge_materializer

Implementation of the Great Expectations materializers.

GreatExpectationsMaterializer (BaseMaterializer)

Materializer to read/write Great Expectation objects.

Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
class GreatExpectationsMaterializer(BaseMaterializer):
    """Materializer to read/write Great Expectation objects."""

    ASSOCIATED_TYPES = (
        ExpectationSuite,
        CheckpointResult,
    )
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA_ANALYSIS

    @staticmethod
    def preprocess_checkpoint_result_dict(
        artifact_dict: Dict[str, Any]
    ) -> None:
        """Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.

        The GE CheckpointResult object is not fully de-serializable
        due to some missing code in the GE codebase. We need to compensate
        for this by manually converting some of the attributes to
        their correct data types.

        Args:
            artifact_dict: A dict containing the GE checkpoint result.
        """

        def preprocess_run_result(key: str, value: Any) -> Any:
            if key == "validation_result":
                return ExpectationSuiteValidationResult(**value)
            return value

        artifact_dict["checkpoint_config"] = CheckpointConfig(
            **artifact_dict["checkpoint_config"]
        )
        validation_dict = {}
        for result_ident, results in artifact_dict["run_results"].items():
            validation_ident = (
                ValidationResultIdentifier.from_fixed_length_tuple(
                    result_ident.split("::")[1].split("/")
                )
            )
            validation_results = {
                result_name: preprocess_run_result(result_name, result)
                for result_name, result in results.items()
            }
            validation_dict[validation_ident] = validation_results
        artifact_dict["run_results"] = validation_dict

    def load(self, data_type: Type[Any]) -> SerializableDictDot:
        """Reads and returns a Great Expectations object.

        Args:
            data_type: The type of the data to read.

        Returns:
            A loaded Great Expectations object.
        """
        super().load(data_type)
        filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
        artifact_dict = yaml_utils.read_json(filepath)
        data_type = import_class_by_path(artifact_dict.pop("data_type"))

        if data_type is CheckpointResult:
            self.preprocess_checkpoint_result_dict(artifact_dict)

        return data_type(**artifact_dict)

    def save(self, obj: SerializableDictDot) -> None:
        """Writes a Great Expectations object.

        Args:
            obj: A Great Expectations object.
        """
        super().save(obj)
        filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
        artifact_dict = obj.to_json_dict()
        artifact_type = type(obj)
        artifact_dict[
            "data_type"
        ] = f"{artifact_type.__module__}.{artifact_type.__name__}"
        yaml_utils.write_json(filepath, artifact_dict)

    def extract_metadata(
        self, data: Union[ExpectationSuite, CheckpointResult]
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given Great Expectations object.

        Args:
            data: The Great Expectations object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        base_metadata = super().extract_metadata(data)
        ge_metadata: Dict[str, "MetadataType"] = {}
        if isinstance(data, CheckpointResult):
            ge_metadata = {
                "checkpoint_result_name": data.name,
                "checkpoint_result_passed": data.success,
            }
        elif isinstance(data, ExpectationSuite):
            ge_metadata = {
                "expectation_suite_name": data.name,
            }
        return {**base_metadata, **ge_metadata}
extract_metadata(self, data)

Extract metadata from the given Great Expectations object.

Parameters:

Name Type Description Default
data Union[great_expectations.core.expectation_suite.ExpectationSuite, great_expectations.checkpoint.types.checkpoint_result.CheckpointResult]

The Great Expectations object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def extract_metadata(
    self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given Great Expectations object.

    Args:
        data: The Great Expectations object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    base_metadata = super().extract_metadata(data)
    ge_metadata: Dict[str, "MetadataType"] = {}
    if isinstance(data, CheckpointResult):
        ge_metadata = {
            "checkpoint_result_name": data.name,
            "checkpoint_result_passed": data.success,
        }
    elif isinstance(data, ExpectationSuite):
        ge_metadata = {
            "expectation_suite_name": data.name,
        }
    return {**base_metadata, **ge_metadata}
load(self, data_type)

Reads and returns a Great Expectations object.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
SerializableDictDot

A loaded Great Expectations object.

Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def load(self, data_type: Type[Any]) -> SerializableDictDot:
    """Reads and returns a Great Expectations object.

    Args:
        data_type: The type of the data to read.

    Returns:
        A loaded Great Expectations object.
    """
    super().load(data_type)
    filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
    artifact_dict = yaml_utils.read_json(filepath)
    data_type = import_class_by_path(artifact_dict.pop("data_type"))

    if data_type is CheckpointResult:
        self.preprocess_checkpoint_result_dict(artifact_dict)

    return data_type(**artifact_dict)
preprocess_checkpoint_result_dict(artifact_dict) staticmethod

Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.

The GE CheckpointResult object is not fully de-serializable due to some missing code in the GE codebase. We need to compensate for this by manually converting some of the attributes to their correct data types.

Parameters:

Name Type Description Default
artifact_dict Dict[str, Any]

A dict containing the GE checkpoint result.

required
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
@staticmethod
def preprocess_checkpoint_result_dict(
    artifact_dict: Dict[str, Any]
) -> None:
    """Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.

    The GE CheckpointResult object is not fully de-serializable
    due to some missing code in the GE codebase. We need to compensate
    for this by manually converting some of the attributes to
    their correct data types.

    Args:
        artifact_dict: A dict containing the GE checkpoint result.
    """

    def preprocess_run_result(key: str, value: Any) -> Any:
        if key == "validation_result":
            return ExpectationSuiteValidationResult(**value)
        return value

    artifact_dict["checkpoint_config"] = CheckpointConfig(
        **artifact_dict["checkpoint_config"]
    )
    validation_dict = {}
    for result_ident, results in artifact_dict["run_results"].items():
        validation_ident = (
            ValidationResultIdentifier.from_fixed_length_tuple(
                result_ident.split("::")[1].split("/")
            )
        )
        validation_results = {
            result_name: preprocess_run_result(result_name, result)
            for result_name, result in results.items()
        }
        validation_dict[validation_ident] = validation_results
    artifact_dict["run_results"] = validation_dict
save(self, obj)

Writes a Great Expectations object.

Parameters:

Name Type Description Default
obj SerializableDictDot

A Great Expectations object.

required
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def save(self, obj: SerializableDictDot) -> None:
    """Writes a Great Expectations object.

    Args:
        obj: A Great Expectations object.
    """
    super().save(obj)
    filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
    artifact_dict = obj.to_json_dict()
    artifact_type = type(obj)
    artifact_dict[
        "data_type"
    ] = f"{artifact_type.__module__}.{artifact_type.__name__}"
    yaml_utils.write_json(filepath, artifact_dict)

steps special

Great Expectations data profiling and validation standard steps.

ge_profiler

Great Expectations data profiling standard step.

GreatExpectationsProfilerParameters (BaseParameters) pydantic-model

Parameters class for a Great Expectations profiler step.

Attributes:

Name Type Description
expectation_suite_name str

The name of the expectation suite to create or update.

data_asset_name Optional[str]

The name of the data asset to run the expectation suite on.

profiler_kwargs Optional[Dict[str, Any]]

A dictionary of keyword arguments to pass to the profiler.

overwrite_existing_suite bool

Whether to overwrite an existing expectation suite.

Source code in zenml/integrations/great_expectations/steps/ge_profiler.py
class GreatExpectationsProfilerParameters(BaseParameters):
    """Parameters class for a Great Expectations profiler step.

    Attributes:
        expectation_suite_name: The name of the expectation suite to create
            or update.
        data_asset_name: The name of the data asset to run the expectation suite on.
        profiler_kwargs: A dictionary of keyword arguments to pass to the profiler.
        overwrite_existing_suite: Whether to overwrite an existing expectation suite.
    """

    expectation_suite_name: str
    data_asset_name: Optional[str] = None
    profiler_kwargs: Optional[Dict[str, Any]] = Field(default_factory=dict)
    overwrite_existing_suite: bool = True
GreatExpectationsProfilerStep (BaseStep)

Standard Great Expectations profiling step implementation.

Use this standard Great Expectations profiling step to build an Expectation Suite automatically by running a UserConfigurableProfiler on an input dataset as covered in the official GE documentation.

Source code in zenml/integrations/great_expectations/steps/ge_profiler.py
class GreatExpectationsProfilerStep(BaseStep):
    """Standard Great Expectations profiling step implementation.

    Use this standard Great Expectations profiling step to build an Expectation
    Suite automatically by running a UserConfigurableProfiler on an input
    dataset [as covered in the official GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).
    """

    def entrypoint(
        self,
        dataset: pd.DataFrame,
        params: GreatExpectationsProfilerParameters,
    ) -> ExpectationSuite:
        """Standard Great Expectations data profiling step entrypoint.

        Args:
            dataset: The dataset from which the expectation suite will be inferred.
            params: The parameters for the step.

        Returns:
            The generated Great Expectations suite.
        """
        data_validator = (
            GreatExpectationsDataValidator.get_active_data_validator()
        )

        return data_validator.data_profiling(
            dataset,
            expectation_suite_name=params.expectation_suite_name,
            data_asset_name=params.data_asset_name,
            profiler_kwargs=params.profiler_kwargs,
            overwrite_existing_suite=params.overwrite_existing_suite,
        )
PARAMETERS_CLASS (BaseParameters) pydantic-model

Parameters class for a Great Expectations profiler step.

Attributes:

Name Type Description
expectation_suite_name str

The name of the expectation suite to create or update.

data_asset_name Optional[str]

The name of the data asset to run the expectation suite on.

profiler_kwargs Optional[Dict[str, Any]]

A dictionary of keyword arguments to pass to the profiler.

overwrite_existing_suite bool

Whether to overwrite an existing expectation suite.

Source code in zenml/integrations/great_expectations/steps/ge_profiler.py
class GreatExpectationsProfilerParameters(BaseParameters):
    """Parameters class for a Great Expectations profiler step.

    Attributes:
        expectation_suite_name: The name of the expectation suite to create
            or update.
        data_asset_name: The name of the data asset to run the expectation suite on.
        profiler_kwargs: A dictionary of keyword arguments to pass to the profiler.
        overwrite_existing_suite: Whether to overwrite an existing expectation suite.
    """

    expectation_suite_name: str
    data_asset_name: Optional[str] = None
    profiler_kwargs: Optional[Dict[str, Any]] = Field(default_factory=dict)
    overwrite_existing_suite: bool = True
entrypoint(self, dataset, params)

Standard Great Expectations data profiling step entrypoint.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset from which the expectation suite will be inferred.

required
params GreatExpectationsProfilerParameters

The parameters for the step.

required

Returns:

Type Description
ExpectationSuite

The generated Great Expectations suite.

Source code in zenml/integrations/great_expectations/steps/ge_profiler.py
def entrypoint(
    self,
    dataset: pd.DataFrame,
    params: GreatExpectationsProfilerParameters,
) -> ExpectationSuite:
    """Standard Great Expectations data profiling step entrypoint.

    Args:
        dataset: The dataset from which the expectation suite will be inferred.
        params: The parameters for the step.

    Returns:
        The generated Great Expectations suite.
    """
    data_validator = (
        GreatExpectationsDataValidator.get_active_data_validator()
    )

    return data_validator.data_profiling(
        dataset,
        expectation_suite_name=params.expectation_suite_name,
        data_asset_name=params.data_asset_name,
        profiler_kwargs=params.profiler_kwargs,
        overwrite_existing_suite=params.overwrite_existing_suite,
    )
great_expectations_profiler_step(step_name, params)

Shortcut function to create a new instance of the GreatExpectationsProfilerStep step.

The returned GreatExpectationsProfilerStep can be used in a pipeline to infer data validation rules from an input pd.DataFrame dataset and return them as an Expectation Suite. The Expectation Suite is also persisted in the Great Expectations expectation store.

Parameters:

Name Type Description Default
step_name str

The name of the step

required
params GreatExpectationsProfilerParameters

The parameters for the step

required

Returns:

Type Description
BaseStep

a GreatExpectationsProfilerStep step instance

Source code in zenml/integrations/great_expectations/steps/ge_profiler.py
def great_expectations_profiler_step(
    step_name: str,
    params: GreatExpectationsProfilerParameters,
) -> BaseStep:
    """Shortcut function to create a new instance of the GreatExpectationsProfilerStep step.

    The returned GreatExpectationsProfilerStep can be used in a pipeline to
    infer data validation rules from an input pd.DataFrame dataset and return
    them as an Expectation Suite. The Expectation Suite is also persisted in the
    Great Expectations expectation store.

    Args:
        step_name: The name of the step
        params: The parameters for the step

    Returns:
        a GreatExpectationsProfilerStep step instance
    """
    return GreatExpectationsProfilerStep(name=step_name, params=params)

ge_validator

Great Expectations data validation standard step.

GreatExpectationsValidatorParameters (BaseParameters) pydantic-model

Parameters class for a Great Expectations checkpoint step.

Attributes:

Name Type Description
expectation_suite_name str

The name of the expectation suite to use to validate the dataset.

data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

action_list Optional[List[Dict[str, Any]]]

A list of additional Great Expectations actions to run after the validation check.

exit_on_error bool

Set this flag to raise an error and exit the pipeline early if the validation fails.

Source code in zenml/integrations/great_expectations/steps/ge_validator.py
class GreatExpectationsValidatorParameters(BaseParameters):
    """Parameters class for a Great Expectations checkpoint step.

    Attributes:
        expectation_suite_name: The name of the expectation suite to use to
            validate the dataset.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        action_list: A list of additional Great Expectations actions to run
            after the validation check.
        exit_on_error: Set this flag to raise an error and exit the pipeline
            early if the validation fails.
    """

    expectation_suite_name: str
    data_asset_name: Optional[str] = None
    action_list: Optional[List[Dict[str, Any]]] = None
    exit_on_error: bool = False
GreatExpectationsValidatorStep (BaseStep)

Standard Great Expectations data validation step implementation.

Use this standard Great Expectations data validation step to run an existing Expectation Suite on an input dataset as covered in the official GE documentation.

Source code in zenml/integrations/great_expectations/steps/ge_validator.py
class GreatExpectationsValidatorStep(BaseStep):
    """Standard Great Expectations data validation step implementation.

    Use this standard Great Expectations data validation step to run an
    existing Expectation Suite on an input dataset [as covered in the official GE documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).
    """

    def entrypoint(
        self,
        dataset: pd.DataFrame,
        condition: bool,
        params: GreatExpectationsValidatorParameters,
    ) -> CheckpointResult:
        """Standard Great Expectations data validation step entrypoint.

        Args:
            dataset: The dataset to run the expectation suite on.
            condition: This dummy argument can be used as a condition to enforce
                that this step is only run after another step has completed. This
                is useful for example if the Expectation Suite used to validate
                the data is computed in a `GreatExpectationsProfilerStep` that
                is part of the same pipeline.
            params: The parameters for the step.

        Returns:
            The Great Expectations validation (checkpoint) result.

        Raises:
            RuntimeError: if the step is configured to exit on error and the
                data validation failed.
        """
        data_validator = (
            GreatExpectationsDataValidator.get_active_data_validator()
        )

        results = data_validator.data_validation(
            dataset,
            expectation_suite_name=params.expectation_suite_name,
            data_asset_name=params.data_asset_name,
            action_list=params.action_list,
        )

        if params.exit_on_error and not results.success:
            raise RuntimeError(
                "The Great Expectations validation failed. Check "
                "the logs or the Great Expectations data docs for more "
                "information."
            )

        return results
PARAMETERS_CLASS (BaseParameters) pydantic-model

Parameters class for a Great Expectations checkpoint step.

Attributes:

Name Type Description
expectation_suite_name str

The name of the expectation suite to use to validate the dataset.

data_asset_name Optional[str]

The name of the data asset to use to identify the dataset in the Great Expectations docs.

action_list Optional[List[Dict[str, Any]]]

A list of additional Great Expectations actions to run after the validation check.

exit_on_error bool

Set this flag to raise an error and exit the pipeline early if the validation fails.

Source code in zenml/integrations/great_expectations/steps/ge_validator.py
class GreatExpectationsValidatorParameters(BaseParameters):
    """Parameters class for a Great Expectations checkpoint step.

    Attributes:
        expectation_suite_name: The name of the expectation suite to use to
            validate the dataset.
        data_asset_name: The name of the data asset to use to identify the
            dataset in the Great Expectations docs.
        action_list: A list of additional Great Expectations actions to run
            after the validation check.
        exit_on_error: Set this flag to raise an error and exit the pipeline
            early if the validation fails.
    """

    expectation_suite_name: str
    data_asset_name: Optional[str] = None
    action_list: Optional[List[Dict[str, Any]]] = None
    exit_on_error: bool = False
entrypoint(self, dataset, condition, params)

Standard Great Expectations data validation step entrypoint.

Parameters:

Name Type Description Default
dataset DataFrame

The dataset to run the expectation suite on.

required
condition bool

This dummy argument can be used as a condition to enforce that this step is only run after another step has completed. This is useful for example if the Expectation Suite used to validate the data is computed in a GreatExpectationsProfilerStep that is part of the same pipeline.

required
params GreatExpectationsValidatorParameters

The parameters for the step.

required

Returns:

Type Description
CheckpointResult

The Great Expectations validation (checkpoint) result.

Exceptions:

Type Description
RuntimeError

if the step is configured to exit on error and the data validation failed.

Source code in zenml/integrations/great_expectations/steps/ge_validator.py
def entrypoint(
    self,
    dataset: pd.DataFrame,
    condition: bool,
    params: GreatExpectationsValidatorParameters,
) -> CheckpointResult:
    """Standard Great Expectations data validation step entrypoint.

    Args:
        dataset: The dataset to run the expectation suite on.
        condition: This dummy argument can be used as a condition to enforce
            that this step is only run after another step has completed. This
            is useful for example if the Expectation Suite used to validate
            the data is computed in a `GreatExpectationsProfilerStep` that
            is part of the same pipeline.
        params: The parameters for the step.

    Returns:
        The Great Expectations validation (checkpoint) result.

    Raises:
        RuntimeError: if the step is configured to exit on error and the
            data validation failed.
    """
    data_validator = (
        GreatExpectationsDataValidator.get_active_data_validator()
    )

    results = data_validator.data_validation(
        dataset,
        expectation_suite_name=params.expectation_suite_name,
        data_asset_name=params.data_asset_name,
        action_list=params.action_list,
    )

    if params.exit_on_error and not results.success:
        raise RuntimeError(
            "The Great Expectations validation failed. Check "
            "the logs or the Great Expectations data docs for more "
            "information."
        )

    return results
great_expectations_validator_step(step_name, params)

Shortcut function to create a new instance of the GreatExpectationsValidatorStep step.

The returned GreatExpectationsValidatorStep can be used in a pipeline to validate an input pd.DataFrame dataset and return the result as a Great Expectations CheckpointResult object. The validation results are also persisted in the Great Expectations validation store.

Parameters:

Name Type Description Default
step_name str

The name of the step

required
params GreatExpectationsValidatorParameters

The parameters for the step

required

Returns:

Type Description
BaseStep

a GreatExpectationsProfilerStep step instance

Source code in zenml/integrations/great_expectations/steps/ge_validator.py
def great_expectations_validator_step(
    step_name: str,
    params: GreatExpectationsValidatorParameters,
) -> BaseStep:
    """Shortcut function to create a new instance of the GreatExpectationsValidatorStep step.

    The returned GreatExpectationsValidatorStep can be used in a pipeline to
    validate an input pd.DataFrame dataset and return the result as a Great
    Expectations CheckpointResult object. The validation results are also
    persisted in the Great Expectations validation store.

    Args:
        step_name: The name of the step
        params: The parameters for the step

    Returns:
        a GreatExpectationsProfilerStep step instance
    """
    return GreatExpectationsValidatorStep(name=step_name, params=params)

utils

Great Expectations data profiling standard step.

create_batch_request(context, dataset, data_asset_name)

Create a temporary runtime GE batch request from a dataset step artifact.

Parameters:

Name Type Description Default
context <function BaseDataContext at 0x7f5d84753790>

Great Expectations data context.

required
dataset DataFrame

Input dataset.

required
data_asset_name Optional[str]

Optional custom name for the data asset.

required

Returns:

Type Description
RuntimeBatchRequest

A Great Expectations runtime batch request.

Source code in zenml/integrations/great_expectations/utils.py
def create_batch_request(
    context: BaseDataContext,
    dataset: pd.DataFrame,
    data_asset_name: Optional[str],
) -> RuntimeBatchRequest:
    """Create a temporary runtime GE batch request from a dataset step artifact.

    Args:
        context: Great Expectations data context.
        dataset: Input dataset.
        data_asset_name: Optional custom name for the data asset.

    Returns:
        A Great Expectations runtime batch request.
    """
    try:
        # get pipeline name, step name and run id
        step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
        pipeline_name = step_env.pipeline_name
        run_name = step_env.run_name
        step_name = step_env.step_name
    except KeyError:
        # if not running inside a pipeline step, use random values
        pipeline_name = f"pipeline_{random_str(5)}"
        run_name = f"pipeline_{random_str(5)}"
        step_name = f"step_{random_str(5)}"

    datasource_name = f"{run_name}_{step_name}"
    data_connector_name = datasource_name
    data_asset_name = data_asset_name or f"{pipeline_name}_{step_name}"
    batch_identifier = "default"

    datasource_config = {
        "name": datasource_name,
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "module_name": "great_expectations.execution_engine",
            "class_name": "PandasExecutionEngine",
        },
        "data_connectors": {
            data_connector_name: {
                "class_name": "RuntimeDataConnector",
                "batch_identifiers": [batch_identifier],
            },
        },
    }

    context.add_datasource(**datasource_config)
    batch_request = RuntimeBatchRequest(
        datasource_name=datasource_name,
        data_connector_name=data_connector_name,
        data_asset_name=data_asset_name,
        runtime_parameters={"batch_data": dataset},
        batch_identifiers={batch_identifier: batch_identifier},
    )

    return batch_request

visualizers special

Visualizers for expectation suites and validation results.

ge_visualizer

Great Expectations visualizers for expectation suites and validation results.

GreatExpectationsVisualizer (BaseVisualizer)

The implementation of a Great Expectations Visualizer.

Source code in zenml/integrations/great_expectations/visualizers/ge_visualizer.py
class GreatExpectationsVisualizer(BaseVisualizer):
    """The implementation of a Great Expectations Visualizer."""

    def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
        """Method to visualize a Great Expectations resource.

        Args:
            object: StepView fetched from run.get_step().
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        """
        for artifact_view in object.outputs.values():
            # filter out anything but Great Expectations data analysis artifacts
            if (
                artifact_view.type == ArtifactType.DATA_ANALYSIS
                and artifact_view.data_type.startswith("great_expectations.")
            ):
                artifact = artifact_view.read()
                if isinstance(artifact, CheckpointResult):
                    result = cast(CheckpointResult, artifact)
                    identifier = next(iter(result.run_results.keys()))
                else:
                    suite = cast(ExpectationSuite, artifact)
                    identifier = ExpectationSuiteIdentifier(
                        suite.expectation_suite_name
                    )

                context = GreatExpectationsDataValidator.get_data_context()
                context.open_data_docs(identifier)
visualize(self, object, *args, **kwargs)

Method to visualize a Great Expectations resource.

Parameters:

Name Type Description Default
object StepView

StepView fetched from run.get_step().

required
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in zenml/integrations/great_expectations/visualizers/ge_visualizer.py
def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
    """Method to visualize a Great Expectations resource.

    Args:
        object: StepView fetched from run.get_step().
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    """
    for artifact_view in object.outputs.values():
        # filter out anything but Great Expectations data analysis artifacts
        if (
            artifact_view.type == ArtifactType.DATA_ANALYSIS
            and artifact_view.data_type.startswith("great_expectations.")
        ):
            artifact = artifact_view.read()
            if isinstance(artifact, CheckpointResult):
                result = cast(CheckpointResult, artifact)
                identifier = next(iter(result.run_results.keys()))
            else:
                suite = cast(ExpectationSuite, artifact)
                identifier = ExpectationSuiteIdentifier(
                    suite.expectation_suite_name
                )

            context = GreatExpectationsDataValidator.get_data_context()
            context.open_data_docs(identifier)