Great Expectations
zenml.integrations.great_expectations
special
Great Expectation integration for ZenML.
The Great Expectations integration enables you to use Great Expectations as a way of profiling and validating your data.
GreatExpectationsIntegration (Integration)
Definition of Great Expectations integration for ZenML.
Source code in zenml/integrations/great_expectations/__init__.py
class GreatExpectationsIntegration(Integration):
"""Definition of Great Expectations integration for ZenML."""
NAME = GREAT_EXPECTATIONS
REQUIREMENTS = [
"great-expectations>=0.17.15,<1.0",
]
@staticmethod
def activate() -> None:
"""Activate the Great Expectations integration."""
from zenml.integrations.great_expectations import materializers # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Great Expectations integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.great_expectations.flavors import (
GreatExpectationsDataValidatorFlavor,
)
return [GreatExpectationsDataValidatorFlavor]
activate()
staticmethod
Activate the Great Expectations integration.
Source code in zenml/integrations/great_expectations/__init__.py
@staticmethod
def activate() -> None:
"""Activate the Great Expectations integration."""
from zenml.integrations.great_expectations import materializers # noqa
flavors()
classmethod
Declare the stack component flavors for the Great Expectations integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/great_expectations/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Great Expectations integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.great_expectations.flavors import (
GreatExpectationsDataValidatorFlavor,
)
return [GreatExpectationsDataValidatorFlavor]
data_validators
special
Initialization of the Great Expectations data validator for ZenML.
ge_data_validator
Implementation of the Great Expectations data validator.
GreatExpectationsDataValidator (BaseDataValidator)
Great Expectations data validator stack component.
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
class GreatExpectationsDataValidator(BaseDataValidator):
"""Great Expectations data validator stack component."""
NAME: ClassVar[str] = "Great Expectations"
FLAVOR: ClassVar[Type[BaseDataValidatorFlavor]] = (
GreatExpectationsDataValidatorFlavor
)
_context: Optional[AbstractDataContext] = None
_context_config: Optional[DataContextConfig] = None
@property
def config(self) -> GreatExpectationsDataValidatorConfig:
"""Returns the `GreatExpectationsDataValidatorConfig` config.
Returns:
The configuration.
"""
return cast(GreatExpectationsDataValidatorConfig, self._config)
@classmethod
def get_data_context(cls) -> AbstractDataContext:
"""Get the Great Expectations data context managed by ZenML.
Call this method to retrieve the data context managed by ZenML
through the active Great Expectations data validator stack component.
Returns:
A Great Expectations data context managed by ZenML as configured
through the active data validator stack component.
"""
data_validator = cast(
"GreatExpectationsDataValidator", cls.get_active_data_validator()
)
return data_validator.data_context
@property
def context_config(self) -> Optional[DataContextConfig]:
"""Get the Great Expectations data context configuration.
Raises:
ValueError: In case there is an invalid context_config value
Returns:
A dictionary with the GE data context configuration.
"""
# If the context config is already loaded, return it
if self._context_config is not None:
return self._context_config
# Otherwise, use the configuration from the stack component config, if
# set
context_config_dict = self.config.context_config
if context_config_dict is None:
return None
# Validate that the context config is a valid GE config
try:
self._context_config = DataContextConfig(**context_config_dict)
except Exception as e:
raise ValueError(f"Invalid `context_config` value: {str(e)}")
return self._context_config
@property
def local_path(self) -> Optional[str]:
"""Return a local path where this component stores information.
If an existing local GE data context is used, it is
interpreted as a local path that needs to be accessible in
all runtime environments.
Returns:
The local path where this component stores information.
"""
return self.config.context_root_dir
def get_store_config(self, class_name: str, prefix: str) -> Dict[str, Any]:
"""Generate a Great Expectations store configuration.
Args:
class_name: The store class name
prefix: The path prefix for the ZenML store configuration
Returns:
A dictionary with the GE store configuration.
"""
return {
"class_name": class_name,
"store_backend": {
"module_name": ZenMLArtifactStoreBackend.__module__,
"class_name": ZenMLArtifactStoreBackend.__name__,
"prefix": f"{str(self.id)}/{prefix}",
},
}
def get_data_docs_config(
self, prefix: str, local: bool = False
) -> Dict[str, Any]:
"""Generate Great Expectations data docs configuration.
Args:
prefix: The path prefix for the ZenML data docs configuration
local: Whether the data docs site is local or remote.
Returns:
A dictionary with the GE data docs site configuration.
"""
if local:
store_backend = {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": f"{self.root_directory}/{prefix}",
}
else:
store_backend = {
"module_name": ZenMLArtifactStoreBackend.__module__,
"class_name": ZenMLArtifactStoreBackend.__name__,
"prefix": f"{str(self.id)}/{prefix}",
}
return {
"class_name": "SiteBuilder",
"store_backend": store_backend,
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
},
}
@property
def data_context(self) -> AbstractDataContext:
"""Returns the Great Expectations data context configured for this component.
Returns:
The Great Expectations data context configured for this component.
"""
if not self._context:
expectations_store_name = "zenml_expectations_store"
validations_store_name = "zenml_validations_store"
checkpoint_store_name = "zenml_checkpoint_store"
profiler_store_name = "zenml_profiler_store"
evaluation_parameter_store_name = "evaluation_parameter_store"
# Define default configuration options that plug the GX stores
# in the active ZenML artifact store
zenml_context_config: Dict[str, Any] = dict(
stores={
expectations_store_name: self.get_store_config(
"ExpectationsStore", "expectations"
),
validations_store_name: self.get_store_config(
"ValidationsStore", "validations"
),
checkpoint_store_name: self.get_store_config(
"CheckpointStore", "checkpoints"
),
profiler_store_name: self.get_store_config(
"ProfilerStore", "profilers"
),
evaluation_parameter_store_name: {
"class_name": "EvaluationParameterStore"
},
},
expectations_store_name=expectations_store_name,
validations_store_name=validations_store_name,
checkpoint_store_name=checkpoint_store_name,
profiler_store_name=profiler_store_name,
evaluation_parameter_store_name=evaluation_parameter_store_name,
data_docs_sites={
"zenml_artifact_store": self.get_data_docs_config(
"data_docs"
)
},
)
configure_zenml_stores = self.config.configure_zenml_stores
if self.config.context_root_dir:
# initialize the local data context, if a local path was
# configured
self._context = get_context(
context_root_dir=self.config.context_root_dir
)
else:
# create an ephemeral in-memory data context that is not
# backed by a local YAML file (see https://docs.greatexpectations.io/docs/oss/guides/setup/configuring_data_contexts/instantiating_data_contexts/instantiate_data_context/).
if self.context_config:
# Use the data context configuration provided in the stack
# component configuration
context_config = self.context_config
else:
# Initialize the data context with the default ZenML
# configuration options effectively plugging the GX stores
# into the ZenML artifact store
context_config = DataContextConfig(**zenml_context_config)
# skip adding the stores after initialization, as they are
# already baked in the initial configuration
configure_zenml_stores = False
self._context = EphemeralDataContext(
project_config=context_config
)
if configure_zenml_stores:
self._context.config.expectations_store_name = (
expectations_store_name
)
self._context.config.validations_store_name = (
validations_store_name
)
self._context.config.checkpoint_store_name = (
checkpoint_store_name
)
self._context.config.profiler_store_name = profiler_store_name
self._context.config.evaluation_parameter_store_name = (
evaluation_parameter_store_name
)
for store_name, store_config in zenml_context_config[
"stores"
].items():
self._context.add_store(
store_name=store_name,
store_config=store_config,
)
if self._context.config.data_docs_sites is not None:
for site_name, site_config in zenml_context_config[
"data_docs_sites"
].items():
self._context.config.data_docs_sites[site_name] = (
site_config
)
if (
self.config.configure_local_docs
and self._context.config.data_docs_sites is not None
):
client = Client()
artifact_store = client.active_stack.artifact_store
if artifact_store.flavor != "local":
self._context.config.data_docs_sites["zenml_local"] = (
self.get_data_docs_config("data_docs", local=True)
)
return self._context
@property
def root_directory(self) -> str:
"""Returns path to the root directory for all local files concerning this data validator.
Returns:
Path to the root directory.
"""
path = os.path.join(
io_utils.get_global_config_directory(),
self.flavor,
str(self.id),
)
if not os.path.exists(path):
fileio.makedirs(path)
return path
def data_profiling(
self,
dataset: pd.DataFrame,
comparison_dataset: Optional[Any] = None,
profile_list: Optional[Sequence[str]] = None,
expectation_suite_name: Optional[str] = None,
data_asset_name: Optional[str] = None,
profiler_kwargs: Optional[Dict[str, Any]] = None,
overwrite_existing_suite: bool = True,
**kwargs: Any,
) -> ExpectationSuite:
"""Infer a Great Expectation Expectation Suite from a given dataset.
This Great Expectations specific data profiling method implementation
builds an Expectation Suite automatically by running a
UserConfigurableProfiler on an input dataset [as covered in the official
GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).
Args:
dataset: The dataset from which the expectation suite will be
inferred.
comparison_dataset: Optional dataset used to generate data
comparison (i.e. data drift) profiles. Not supported by the
Great Expectation data validator.
profile_list: Optional list identifying the categories of data
profiles to be generated. Not supported by the Great Expectation
data validator.
expectation_suite_name: The name of the expectation suite to create
or update. If not supplied, a unique name will be generated from
the current pipeline and step name, if running in the context of
a pipeline step.
data_asset_name: The name of the data asset to use to identify the
dataset in the Great Expectations docs.
profiler_kwargs: A dictionary of custom keyword arguments to pass to
the profiler.
overwrite_existing_suite: Whether to overwrite an existing
expectation suite, if one exists with that name.
kwargs: Additional keyword arguments (unused).
Returns:
The inferred Expectation Suite.
Raises:
ValueError: if an `expectation_suite_name` value is not supplied and
a name for the expectation suite cannot be generated from the
current step name and pipeline name.
"""
context = self.data_context
if comparison_dataset is not None:
logger.warning(
"A comparison dataset is not required by Great Expectations "
"to do data profiling. Silently ignoring the supplied dataset "
)
if not expectation_suite_name:
try:
step_context = get_step_context()
pipeline_name = step_context.pipeline.name
step_name = step_context.step_run.name
expectation_suite_name = f"{pipeline_name}_{step_name}"
except RuntimeError:
raise ValueError(
"A expectation suite name is required when not running in "
"the context of a pipeline step."
)
suite_exists = False
if context.expectations_store.has_key( # noqa
ExpectationSuiteIdentifier(expectation_suite_name)
):
suite_exists = True
suite = context.get_expectation_suite(expectation_suite_name)
if not overwrite_existing_suite:
logger.info(
f"Expectation Suite `{expectation_suite_name}` "
f"already exists and `overwrite_existing_suite` is not set "
f"in the step configuration. Skipping re-running the "
f"profiler."
)
return suite
batch_request = create_batch_request(context, dataset, data_asset_name)
try:
if suite_exists:
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
else:
validator = context.get_validator(
batch_request=batch_request,
create_expectation_suite_with_name=expectation_suite_name,
)
profiler = UserConfigurableProfiler(
profile_dataset=validator, **profiler_kwargs
)
suite = profiler.build_suite()
context.save_expectation_suite(
expectation_suite=suite,
expectation_suite_name=expectation_suite_name,
)
context.build_data_docs()
finally:
context.delete_datasource(batch_request.datasource_name)
return suite
def data_validation(
self,
dataset: pd.DataFrame,
comparison_dataset: Optional[Any] = None,
check_list: Optional[Sequence[str]] = None,
expectation_suite_name: Optional[str] = None,
data_asset_name: Optional[str] = None,
action_list: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> CheckpointResult:
"""Great Expectations data validation.
This Great Expectations specific data validation method
implementation validates an input dataset against an Expectation Suite
(the GE definition of a profile) [as covered in the official GE
documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).
Args:
dataset: The dataset to validate.
comparison_dataset: Optional dataset used to run data
comparison (i.e. data drift) checks. Not supported by the
Great Expectation data validator.
check_list: Optional list identifying the data validation checks to
be performed. Not supported by the Great Expectations data
validator.
expectation_suite_name: The name of the expectation suite to use to
validate the dataset. A value must be provided.
data_asset_name: The name of the data asset to use to identify the
dataset in the Great Expectations docs.
action_list: A list of additional Great Expectations actions to run after
the validation check.
kwargs: Additional keyword arguments (unused).
Returns:
The Great Expectations validation (checkpoint) result.
Raises:
ValueError: if the `expectation_suite_name` argument is omitted.
"""
if not expectation_suite_name:
raise ValueError("Missing expectation_suite_name argument value.")
if comparison_dataset is not None:
logger.warning(
"A comparison dataset is not required by Great Expectations "
"to do data validation. Silently ignoring the supplied dataset "
)
try:
step_context = get_step_context()
run_name = step_context.pipeline_run.name
step_name = step_context.step_run.name
except RuntimeError:
# if not running inside a pipeline step, use random values
run_name = f"pipeline_{random_str(5)}"
step_name = f"step_{random_str(5)}"
context = self.data_context
checkpoint_name = f"{run_name}_{step_name}"
batch_request = create_batch_request(context, dataset, data_asset_name)
action_list = action_list or [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
]
checkpoint_config: Dict[str, Any] = {
"name": checkpoint_name,
"run_name_template": run_name,
"config_version": 1,
"class_name": "Checkpoint",
"expectation_suite_name": expectation_suite_name,
"action_list": action_list,
}
context.add_checkpoint(**checkpoint_config) # type: ignore[has-type]
try:
results = context.run_checkpoint(
checkpoint_name=checkpoint_name,
validations=[{"batch_request": batch_request}],
)
finally:
context.delete_datasource(batch_request.datasource_name)
context.delete_checkpoint(checkpoint_name)
return results
config: GreatExpectationsDataValidatorConfig
property
readonly
Returns the GreatExpectationsDataValidatorConfig
config.
Returns:
Type | Description |
---|---|
GreatExpectationsDataValidatorConfig |
The configuration. |
context_config: Optional[great_expectations.data_context.types.base.DataContextConfig]
property
readonly
Get the Great Expectations data context configuration.
Exceptions:
Type | Description |
---|---|
ValueError |
In case there is an invalid context_config value |
Returns:
Type | Description |
---|---|
Optional[great_expectations.data_context.types.base.DataContextConfig] |
A dictionary with the GE data context configuration. |
data_context: great_expectations.data_context.data_context.abstract_data_context.AbstractDataContext
property
readonly
Returns the Great Expectations data context configured for this component.
Returns:
Type | Description |
---|---|
great_expectations.data_context.data_context.abstract_data_context.AbstractDataContext |
The Great Expectations data context configured for this component. |
local_path: Optional[str]
property
readonly
Return a local path where this component stores information.
If an existing local GE data context is used, it is interpreted as a local path that needs to be accessible in all runtime environments.
Returns:
Type | Description |
---|---|
Optional[str] |
The local path where this component stores information. |
root_directory: str
property
readonly
Returns path to the root directory for all local files concerning this data validator.
Returns:
Type | Description |
---|---|
str |
Path to the root directory. |
FLAVOR (BaseDataValidatorFlavor)
Great Expectations data validator flavor.
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
class GreatExpectationsDataValidatorFlavor(BaseDataValidatorFlavor):
"""Great Expectations data validator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return GREAT_EXPECTATIONS_DATA_VALIDATOR_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/data_validator/greatexpectations.jpeg"
@property
def config_class(self) -> Type[GreatExpectationsDataValidatorConfig]:
"""Returns `GreatExpectationsDataValidatorConfig` config class.
Returns:
The config class.
"""
return GreatExpectationsDataValidatorConfig
@property
def implementation_class(self) -> Type["GreatExpectationsDataValidator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.great_expectations.data_validators import (
GreatExpectationsDataValidator,
)
return GreatExpectationsDataValidator
config_class: Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig]
property
readonly
Returns GreatExpectationsDataValidatorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[GreatExpectationsDataValidator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[GreatExpectationsDataValidator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
data_profiling(self, dataset, comparison_dataset=None, profile_list=None, expectation_suite_name=None, data_asset_name=None, profiler_kwargs=None, overwrite_existing_suite=True, **kwargs)
Infer a Great Expectation Expectation Suite from a given dataset.
This Great Expectations specific data profiling method implementation builds an Expectation Suite automatically by running a UserConfigurableProfiler on an input dataset as covered in the official GE documentation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
DataFrame |
The dataset from which the expectation suite will be inferred. |
required |
comparison_dataset |
Optional[Any] |
Optional dataset used to generate data comparison (i.e. data drift) profiles. Not supported by the Great Expectation data validator. |
None |
profile_list |
Optional[Sequence[str]] |
Optional list identifying the categories of data profiles to be generated. Not supported by the Great Expectation data validator. |
None |
expectation_suite_name |
Optional[str] |
The name of the expectation suite to create or update. If not supplied, a unique name will be generated from the current pipeline and step name, if running in the context of a pipeline step. |
None |
data_asset_name |
Optional[str] |
The name of the data asset to use to identify the dataset in the Great Expectations docs. |
None |
profiler_kwargs |
Optional[Dict[str, Any]] |
A dictionary of custom keyword arguments to pass to the profiler. |
None |
overwrite_existing_suite |
bool |
Whether to overwrite an existing expectation suite, if one exists with that name. |
True |
kwargs |
Any |
Additional keyword arguments (unused). |
{} |
Returns:
Type | Description |
---|---|
great_expectations.core.ExpectationSuite |
The inferred Expectation Suite. |
Exceptions:
Type | Description |
---|---|
ValueError |
if an |
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def data_profiling(
self,
dataset: pd.DataFrame,
comparison_dataset: Optional[Any] = None,
profile_list: Optional[Sequence[str]] = None,
expectation_suite_name: Optional[str] = None,
data_asset_name: Optional[str] = None,
profiler_kwargs: Optional[Dict[str, Any]] = None,
overwrite_existing_suite: bool = True,
**kwargs: Any,
) -> ExpectationSuite:
"""Infer a Great Expectation Expectation Suite from a given dataset.
This Great Expectations specific data profiling method implementation
builds an Expectation Suite automatically by running a
UserConfigurableProfiler on an input dataset [as covered in the official
GE documentation](https://docs.greatexpectations.io/docs/guides/expectations/how_to_create_and_edit_expectations_with_a_profiler).
Args:
dataset: The dataset from which the expectation suite will be
inferred.
comparison_dataset: Optional dataset used to generate data
comparison (i.e. data drift) profiles. Not supported by the
Great Expectation data validator.
profile_list: Optional list identifying the categories of data
profiles to be generated. Not supported by the Great Expectation
data validator.
expectation_suite_name: The name of the expectation suite to create
or update. If not supplied, a unique name will be generated from
the current pipeline and step name, if running in the context of
a pipeline step.
data_asset_name: The name of the data asset to use to identify the
dataset in the Great Expectations docs.
profiler_kwargs: A dictionary of custom keyword arguments to pass to
the profiler.
overwrite_existing_suite: Whether to overwrite an existing
expectation suite, if one exists with that name.
kwargs: Additional keyword arguments (unused).
Returns:
The inferred Expectation Suite.
Raises:
ValueError: if an `expectation_suite_name` value is not supplied and
a name for the expectation suite cannot be generated from the
current step name and pipeline name.
"""
context = self.data_context
if comparison_dataset is not None:
logger.warning(
"A comparison dataset is not required by Great Expectations "
"to do data profiling. Silently ignoring the supplied dataset "
)
if not expectation_suite_name:
try:
step_context = get_step_context()
pipeline_name = step_context.pipeline.name
step_name = step_context.step_run.name
expectation_suite_name = f"{pipeline_name}_{step_name}"
except RuntimeError:
raise ValueError(
"A expectation suite name is required when not running in "
"the context of a pipeline step."
)
suite_exists = False
if context.expectations_store.has_key( # noqa
ExpectationSuiteIdentifier(expectation_suite_name)
):
suite_exists = True
suite = context.get_expectation_suite(expectation_suite_name)
if not overwrite_existing_suite:
logger.info(
f"Expectation Suite `{expectation_suite_name}` "
f"already exists and `overwrite_existing_suite` is not set "
f"in the step configuration. Skipping re-running the "
f"profiler."
)
return suite
batch_request = create_batch_request(context, dataset, data_asset_name)
try:
if suite_exists:
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
else:
validator = context.get_validator(
batch_request=batch_request,
create_expectation_suite_with_name=expectation_suite_name,
)
profiler = UserConfigurableProfiler(
profile_dataset=validator, **profiler_kwargs
)
suite = profiler.build_suite()
context.save_expectation_suite(
expectation_suite=suite,
expectation_suite_name=expectation_suite_name,
)
context.build_data_docs()
finally:
context.delete_datasource(batch_request.datasource_name)
return suite
data_validation(self, dataset, comparison_dataset=None, check_list=None, expectation_suite_name=None, data_asset_name=None, action_list=None, **kwargs)
Great Expectations data validation.
This Great Expectations specific data validation method implementation validates an input dataset against an Expectation Suite (the GE definition of a profile) as covered in the official GE documentation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
DataFrame |
The dataset to validate. |
required |
comparison_dataset |
Optional[Any] |
Optional dataset used to run data comparison (i.e. data drift) checks. Not supported by the Great Expectation data validator. |
None |
check_list |
Optional[Sequence[str]] |
Optional list identifying the data validation checks to be performed. Not supported by the Great Expectations data validator. |
None |
expectation_suite_name |
Optional[str] |
The name of the expectation suite to use to validate the dataset. A value must be provided. |
None |
data_asset_name |
Optional[str] |
The name of the data asset to use to identify the dataset in the Great Expectations docs. |
None |
action_list |
Optional[List[Dict[str, Any]]] |
A list of additional Great Expectations actions to run after the validation check. |
None |
kwargs |
Any |
Additional keyword arguments (unused). |
{} |
Returns:
Type | Description |
---|---|
great_expectations.checkpoint.types.checkpoint_result.CheckpointResult |
The Great Expectations validation (checkpoint) result. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the |
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def data_validation(
self,
dataset: pd.DataFrame,
comparison_dataset: Optional[Any] = None,
check_list: Optional[Sequence[str]] = None,
expectation_suite_name: Optional[str] = None,
data_asset_name: Optional[str] = None,
action_list: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> CheckpointResult:
"""Great Expectations data validation.
This Great Expectations specific data validation method
implementation validates an input dataset against an Expectation Suite
(the GE definition of a profile) [as covered in the official GE
documentation](https://docs.greatexpectations.io/docs/guides/validation/how_to_validate_data_by_running_a_checkpoint).
Args:
dataset: The dataset to validate.
comparison_dataset: Optional dataset used to run data
comparison (i.e. data drift) checks. Not supported by the
Great Expectation data validator.
check_list: Optional list identifying the data validation checks to
be performed. Not supported by the Great Expectations data
validator.
expectation_suite_name: The name of the expectation suite to use to
validate the dataset. A value must be provided.
data_asset_name: The name of the data asset to use to identify the
dataset in the Great Expectations docs.
action_list: A list of additional Great Expectations actions to run after
the validation check.
kwargs: Additional keyword arguments (unused).
Returns:
The Great Expectations validation (checkpoint) result.
Raises:
ValueError: if the `expectation_suite_name` argument is omitted.
"""
if not expectation_suite_name:
raise ValueError("Missing expectation_suite_name argument value.")
if comparison_dataset is not None:
logger.warning(
"A comparison dataset is not required by Great Expectations "
"to do data validation. Silently ignoring the supplied dataset "
)
try:
step_context = get_step_context()
run_name = step_context.pipeline_run.name
step_name = step_context.step_run.name
except RuntimeError:
# if not running inside a pipeline step, use random values
run_name = f"pipeline_{random_str(5)}"
step_name = f"step_{random_str(5)}"
context = self.data_context
checkpoint_name = f"{run_name}_{step_name}"
batch_request = create_batch_request(context, dataset, data_asset_name)
action_list = action_list or [
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{
"name": "store_evaluation_params",
"action": {"class_name": "StoreEvaluationParametersAction"},
},
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"},
},
]
checkpoint_config: Dict[str, Any] = {
"name": checkpoint_name,
"run_name_template": run_name,
"config_version": 1,
"class_name": "Checkpoint",
"expectation_suite_name": expectation_suite_name,
"action_list": action_list,
}
context.add_checkpoint(**checkpoint_config) # type: ignore[has-type]
try:
results = context.run_checkpoint(
checkpoint_name=checkpoint_name,
validations=[{"batch_request": batch_request}],
)
finally:
context.delete_datasource(batch_request.datasource_name)
context.delete_checkpoint(checkpoint_name)
return results
get_data_context()
classmethod
Get the Great Expectations data context managed by ZenML.
Call this method to retrieve the data context managed by ZenML through the active Great Expectations data validator stack component.
Returns:
Type | Description |
---|---|
great_expectations.data_context.data_context.abstract_data_context.AbstractDataContext |
A Great Expectations data context managed by ZenML as configured through the active data validator stack component. |
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
@classmethod
def get_data_context(cls) -> AbstractDataContext:
"""Get the Great Expectations data context managed by ZenML.
Call this method to retrieve the data context managed by ZenML
through the active Great Expectations data validator stack component.
Returns:
A Great Expectations data context managed by ZenML as configured
through the active data validator stack component.
"""
data_validator = cast(
"GreatExpectationsDataValidator", cls.get_active_data_validator()
)
return data_validator.data_context
get_data_docs_config(self, prefix, local=False)
Generate Great Expectations data docs configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix |
str |
The path prefix for the ZenML data docs configuration |
required |
local |
bool |
Whether the data docs site is local or remote. |
False |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary with the GE data docs site configuration. |
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def get_data_docs_config(
self, prefix: str, local: bool = False
) -> Dict[str, Any]:
"""Generate Great Expectations data docs configuration.
Args:
prefix: The path prefix for the ZenML data docs configuration
local: Whether the data docs site is local or remote.
Returns:
A dictionary with the GE data docs site configuration.
"""
if local:
store_backend = {
"class_name": "TupleFilesystemStoreBackend",
"base_directory": f"{self.root_directory}/{prefix}",
}
else:
store_backend = {
"module_name": ZenMLArtifactStoreBackend.__module__,
"class_name": ZenMLArtifactStoreBackend.__name__,
"prefix": f"{str(self.id)}/{prefix}",
}
return {
"class_name": "SiteBuilder",
"store_backend": store_backend,
"site_index_builder": {
"class_name": "DefaultSiteIndexBuilder",
},
}
get_store_config(self, class_name, prefix)
Generate a Great Expectations store configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_name |
str |
The store class name |
required |
prefix |
str |
The path prefix for the ZenML store configuration |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary with the GE store configuration. |
Source code in zenml/integrations/great_expectations/data_validators/ge_data_validator.py
def get_store_config(self, class_name: str, prefix: str) -> Dict[str, Any]:
"""Generate a Great Expectations store configuration.
Args:
class_name: The store class name
prefix: The path prefix for the ZenML store configuration
Returns:
A dictionary with the GE store configuration.
"""
return {
"class_name": class_name,
"store_backend": {
"module_name": ZenMLArtifactStoreBackend.__module__,
"class_name": ZenMLArtifactStoreBackend.__name__,
"prefix": f"{str(self.id)}/{prefix}",
},
}
flavors
special
Great Expectations integration flavors.
great_expectations_data_validator_flavor
Great Expectations data validator flavor.
GreatExpectationsDataValidatorConfig (BaseDataValidatorConfig)
Config for the Great Expectations data validator.
Attributes:
Name | Type | Description |
---|---|---|
context_root_dir |
Optional[str] |
location of an already initialized Great Expectations data context. If configured, the data validator will only be usable with local orchestrators. |
context_config |
Optional[Dict[str, Any]] |
in-line Great Expectations data context configuration.
If the |
configure_zenml_stores |
bool |
if set, ZenML will automatically configure
stores that use the Artifact Store as a backend. If neither
|
configure_local_docs |
bool |
configure a local data docs site where Great Expectations docs are generated and can be visualized locally. |
Source code in zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
class GreatExpectationsDataValidatorConfig(BaseDataValidatorConfig):
"""Config for the Great Expectations data validator.
Attributes:
context_root_dir: location of an already initialized Great Expectations
data context. If configured, the data validator will only be usable
with local orchestrators.
context_config: in-line Great Expectations data context configuration.
If the `context_root_dir` attribute is also set, this configuration
will be ignored.
configure_zenml_stores: if set, ZenML will automatically configure
stores that use the Artifact Store as a backend. If neither
`context_root_dir` nor `context_config` are set, this is the default
behavior.
configure_local_docs: configure a local data docs site where Great
Expectations docs are generated and can be visualized locally.
"""
context_root_dir: Optional[str] = None
context_config: Optional[Dict[str, Any]] = None
configure_zenml_stores: bool = False
configure_local_docs: bool = True
@field_validator("context_root_dir")
@classmethod
def _ensure_valid_context_root_dir(
cls, context_root_dir: Optional[str] = None
) -> Optional[str]:
"""Ensures that the root directory is an absolute path and points to an existing path.
Args:
context_root_dir: The context_root_dir value to validate.
Returns:
The context_root_dir if it is valid.
Raises:
ValueError: If the context_root_dir is not valid.
"""
if context_root_dir:
context_root_dir = os.path.abspath(context_root_dir)
if not fileio.exists(context_root_dir):
raise ValueError(
f"The Great Expectations context_root_dir value doesn't "
f"point to an existing data context path: {context_root_dir}"
)
return context_root_dir
@model_validator(mode="before")
@classmethod
@before_validator_handler
def validate_context_config(cls, data: Dict[str, Any]) -> Dict[str, Any]:
"""Convert the context configuration if given in JSON/YAML format.
Args:
data: The configuration values.
Returns:
The validated configuration values.
Raises:
ValueError: If the context configuration is not a valid
JSON/YAML object.
"""
if isinstance(data.get("context_config"), str):
try:
data["context_config"] = yaml.safe_load(data["context_config"])
except ParserError as e:
raise ValueError(
f"Malformed `context_config` value. Only JSON and YAML "
f"formats are supported: {str(e)}"
)
return data
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
# If an existing local GE data context is used, it is
# interpreted as a local path that needs to be accessible in
# all runtime environments.
return self.context_root_dir is not None
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
validate_context_config(data, validation_info)
classmethod
Wrapper method to handle the raw data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls |
the class handler |
required | |
data |
Any |
the raw input data |
required |
validation_info |
ValidationInfo |
the context of the validation. |
required |
Returns:
Type | Description |
---|---|
Any |
the validated data |
Source code in zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
def before_validator(
cls: Type[BaseModel], data: Any, validation_info: ValidationInfo
) -> Any:
"""Wrapper method to handle the raw data.
Args:
cls: the class handler
data: the raw input data
validation_info: the context of the validation.
Returns:
the validated data
"""
data = model_validator_data_handler(
raw_data=data, base_class=cls, validation_info=validation_info
)
return method(cls=cls, data=data)
GreatExpectationsDataValidatorFlavor (BaseDataValidatorFlavor)
Great Expectations data validator flavor.
Source code in zenml/integrations/great_expectations/flavors/great_expectations_data_validator_flavor.py
class GreatExpectationsDataValidatorFlavor(BaseDataValidatorFlavor):
"""Great Expectations data validator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return GREAT_EXPECTATIONS_DATA_VALIDATOR_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/data_validator/greatexpectations.jpeg"
@property
def config_class(self) -> Type[GreatExpectationsDataValidatorConfig]:
"""Returns `GreatExpectationsDataValidatorConfig` config class.
Returns:
The config class.
"""
return GreatExpectationsDataValidatorConfig
@property
def implementation_class(self) -> Type["GreatExpectationsDataValidator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.great_expectations.data_validators import (
GreatExpectationsDataValidator,
)
return GreatExpectationsDataValidator
config_class: Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig]
property
readonly
Returns GreatExpectationsDataValidatorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.great_expectations.flavors.great_expectations_data_validator_flavor.GreatExpectationsDataValidatorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[GreatExpectationsDataValidator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[GreatExpectationsDataValidator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
ge_store_backend
Great Expectations store plugin for ZenML.
ZenMLArtifactStoreBackend (TupleStoreBackend)
Great Expectations store backend that uses the active ZenML Artifact Store as a store.
Source code in zenml/integrations/great_expectations/ge_store_backend.py
class ZenMLArtifactStoreBackend(TupleStoreBackend):
"""Great Expectations store backend that uses the active ZenML Artifact Store as a store."""
def __init__(
self,
prefix: str = "",
**kwargs: Any,
) -> None:
"""Create a Great Expectations ZenML store backend instance.
Args:
prefix: Subpath prefix to use for this store backend.
kwargs: Additional keyword arguments passed by the Great Expectations
core. These are transparently passed to the `TupleStoreBackend`
constructor.
"""
super().__init__(**kwargs)
client = Client()
artifact_store = client.active_stack.artifact_store
self.root_path = os.path.join(
artifact_store.path, "great_expectations"
)
# extract the protocol used in the artifact store root path
protocols = [
scheme
for scheme in artifact_store.config.SUPPORTED_SCHEMES
if self.root_path.startswith(scheme)
]
if protocols:
self.proto = protocols[0]
else:
self.proto = ""
if prefix:
if self.platform_specific_separator:
prefix = prefix.strip(os.sep)
prefix = prefix.strip("/")
self.prefix = prefix
# Initialize with store_backend_id if not part of an HTMLSiteStore
if not self._suppress_store_backend_id:
_ = self.store_backend_id
self._config = {
"prefix": prefix,
"module_name": self.__class__.__module__,
"class_name": self.__class__.__name__,
}
self._config.update(kwargs)
filter_properties_dict(
properties=self._config, clean_falsy=True, inplace=True
)
def _build_object_path(
self, key: Tuple[str, ...], is_prefix: bool = False
) -> str:
"""Build a filepath corresponding to an object key.
Args:
key: Great Expectation object key.
is_prefix: If True, the key will be interpreted as a prefix instead
of a full key identifier.
Returns:
The file path pointing to where the object is stored.
"""
if not isinstance(key, tuple):
key = key.to_tuple()
if not is_prefix:
object_relative_path = self._convert_key_to_filepath(key) # type: ignore[no-untyped-call]
elif key:
object_relative_path = os.path.join(*key)
else:
object_relative_path = ""
if self.prefix:
object_key = os.path.join(self.prefix, object_relative_path)
else:
object_key = object_relative_path
return os.path.join(self.root_path, object_key)
def _get(self, key: Tuple[str, ...]) -> str: # type: ignore[override]
"""Get the value of an object from the store.
Args:
key: object key identifier.
Raises:
InvalidKeyError: if the key doesn't point to an existing object.
Returns:
str: the object's contents
"""
filepath: str = self._build_object_path(key)
if fileio.exists(filepath):
contents = io_utils.read_file_contents_as_string(filepath).rstrip(
"\n"
)
else:
raise InvalidKeyError(
f"Unable to retrieve object from {self.__class__.__name__} with "
f"the following Key: {str(filepath)}"
)
return contents
def _get_all(self) -> List[Any]:
"""Get all objects in the store.
Raises:
NotImplementedError: if the method is not implemented for this store
backend.
"""
raise NotImplementedError(
"Method `_get_all` is not implemented for this store backend."
)
def _set(self, key: Tuple[str, ...], value: str, **kwargs: Any) -> str: # type: ignore[override]
"""Set the value of an object in the store.
Args:
key: object key identifier.
value: object value to set.
kwargs: additional keyword arguments (ignored).
Returns:
The file path where the object was stored.
"""
filepath: str = self._build_object_path(key)
if not io_utils.is_remote(filepath):
parent_dir = str(Path(filepath).parent)
os.makedirs(parent_dir, exist_ok=True)
with fileio.open(filepath, "wb") as outfile:
if isinstance(value, str):
outfile.write(value.encode("utf-8"))
else:
outfile.write(value)
return filepath
def _move(
self,
source_key: Tuple[str, ...],
dest_key: Tuple[str, ...],
**kwargs: Any,
) -> None:
"""Associate an object with a different key in the store.
Args:
source_key: current object key identifier.
dest_key: new object key identifier.
kwargs: additional keyword arguments (ignored).
"""
source_path = self._build_object_path(source_key)
dest_path = self._build_object_path(dest_key)
if fileio.exists(source_path):
if not io_utils.is_remote(dest_path):
parent_dir = str(Path(dest_path).parent)
os.makedirs(parent_dir, exist_ok=True)
fileio.rename(source_path, dest_path, overwrite=True)
def list_keys(self, prefix: Tuple[str, ...] = ()) -> List[Tuple[str, ...]]:
"""List the keys of all objects identified by a partial key.
Args:
prefix: partial object key identifier.
Returns:
List of keys identifying all objects present in the store that
match the input partial key.
"""
key_list = []
list_path = self._build_object_path(prefix, is_prefix=True)
root_path = self._build_object_path(tuple(), is_prefix=True)
for root, dirs, files in fileio.walk(list_path):
for file_ in files:
filepath = os.path.relpath(
os.path.join(str(root), str(file_)), root_path
)
if self.filepath_prefix and not filepath.startswith(
self.filepath_prefix
):
continue
elif self.filepath_suffix and not filepath.endswith(
self.filepath_suffix
):
continue
key = self._convert_filepath_to_key(filepath) # type: ignore[no-untyped-call]
if key and not self.is_ignored_key(key): # type: ignore[no-untyped-call]
key_list.append(key)
return key_list
def remove_key(self, key: Tuple[str, ...]) -> bool: # type: ignore[override]
"""Delete an object from the store.
Args:
key: object key identifier.
Returns:
True if the object existed in the store and was removed, otherwise
False.
"""
filepath: str = self._build_object_path(key)
if fileio.exists(filepath):
fileio.remove(filepath)
if not io_utils.is_remote(filepath):
parent_dir = str(Path(filepath).parent)
self.rrmdir(self.root_path, str(parent_dir))
return True
return False
def _has_key(self, key: Tuple[str, ...]) -> bool:
"""Check if an object is present in the store.
Args:
key: object key identifier.
Returns:
True if the object is present in the store, otherwise False.
"""
filepath: str = self._build_object_path(key)
result = fileio.exists(filepath)
return result
def get_url_for_key( # type: ignore[override]
self, key: Tuple[str, ...], protocol: Optional[str] = None
) -> str:
"""Get the URL of an object in the store.
Args:
key: object key identifier.
protocol: optional protocol to use instead of the store protocol.
Returns:
The URL of the object in the store.
"""
filepath = self._build_object_path(key)
if not protocol and not io_utils.is_remote(filepath):
protocol = "file:"
if protocol:
filepath = filepath.replace(self.proto, f"{protocol}//", 1)
return filepath
def get_public_url_for_key(
self, key: str, protocol: Optional[str] = None
) -> str:
"""Get the public URL of an object in the store.
Args:
key: object key identifier.
protocol: optional protocol to use instead of the store protocol.
Returns:
The public URL where the object can be accessed.
Raises:
StoreBackendError: if a `base_public_path` attribute was not
configured for the store.
"""
if not self.base_public_path:
raise StoreBackendError(
f"Error: No base_public_path was configured! A public URL was "
f"requested but `base_public_path` was not configured for the "
f"{self.__class__.__name__}"
)
filepath = self._convert_key_to_filepath(key) # type: ignore[no-untyped-call]
public_url = self.base_public_path + filepath.replace(self.proto, "")
return cast(str, public_url)
@staticmethod
def rrmdir(start_path: str, end_path: str) -> None:
"""Recursively removes empty dirs between start_path and end_path inclusive.
Args:
start_path: Directory to use as a starting point.
end_path: Directory to use as a destination point.
"""
while not os.listdir(end_path) and start_path != end_path:
os.rmdir(end_path)
end_path = os.path.dirname(end_path)
@property
def config(self) -> Dict[str, Any]:
"""Get the store configuration.
Returns:
The store configuration.
"""
return self._config
config: Dict[str, Any]
property
readonly
Get the store configuration.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The store configuration. |
__init__(self, prefix='', **kwargs)
special
Create a Great Expectations ZenML store backend instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix |
str |
Subpath prefix to use for this store backend. |
'' |
kwargs |
Any |
Additional keyword arguments passed by the Great Expectations
core. These are transparently passed to the |
{} |
Source code in zenml/integrations/great_expectations/ge_store_backend.py
def __init__(
self,
prefix: str = "",
**kwargs: Any,
) -> None:
"""Create a Great Expectations ZenML store backend instance.
Args:
prefix: Subpath prefix to use for this store backend.
kwargs: Additional keyword arguments passed by the Great Expectations
core. These are transparently passed to the `TupleStoreBackend`
constructor.
"""
super().__init__(**kwargs)
client = Client()
artifact_store = client.active_stack.artifact_store
self.root_path = os.path.join(
artifact_store.path, "great_expectations"
)
# extract the protocol used in the artifact store root path
protocols = [
scheme
for scheme in artifact_store.config.SUPPORTED_SCHEMES
if self.root_path.startswith(scheme)
]
if protocols:
self.proto = protocols[0]
else:
self.proto = ""
if prefix:
if self.platform_specific_separator:
prefix = prefix.strip(os.sep)
prefix = prefix.strip("/")
self.prefix = prefix
# Initialize with store_backend_id if not part of an HTMLSiteStore
if not self._suppress_store_backend_id:
_ = self.store_backend_id
self._config = {
"prefix": prefix,
"module_name": self.__class__.__module__,
"class_name": self.__class__.__name__,
}
self._config.update(kwargs)
filter_properties_dict(
properties=self._config, clean_falsy=True, inplace=True
)
get_public_url_for_key(self, key, protocol=None)
Get the public URL of an object in the store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
object key identifier. |
required |
protocol |
Optional[str] |
optional protocol to use instead of the store protocol. |
None |
Returns:
Type | Description |
---|---|
str |
The public URL where the object can be accessed. |
Exceptions:
Type | Description |
---|---|
StoreBackendError |
if a |
Source code in zenml/integrations/great_expectations/ge_store_backend.py
def get_public_url_for_key(
self, key: str, protocol: Optional[str] = None
) -> str:
"""Get the public URL of an object in the store.
Args:
key: object key identifier.
protocol: optional protocol to use instead of the store protocol.
Returns:
The public URL where the object can be accessed.
Raises:
StoreBackendError: if a `base_public_path` attribute was not
configured for the store.
"""
if not self.base_public_path:
raise StoreBackendError(
f"Error: No base_public_path was configured! A public URL was "
f"requested but `base_public_path` was not configured for the "
f"{self.__class__.__name__}"
)
filepath = self._convert_key_to_filepath(key) # type: ignore[no-untyped-call]
public_url = self.base_public_path + filepath.replace(self.proto, "")
return cast(str, public_url)
get_url_for_key(self, key, protocol=None)
Get the URL of an object in the store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Tuple[str, ...] |
object key identifier. |
required |
protocol |
Optional[str] |
optional protocol to use instead of the store protocol. |
None |
Returns:
Type | Description |
---|---|
str |
The URL of the object in the store. |
Source code in zenml/integrations/great_expectations/ge_store_backend.py
def get_url_for_key( # type: ignore[override]
self, key: Tuple[str, ...], protocol: Optional[str] = None
) -> str:
"""Get the URL of an object in the store.
Args:
key: object key identifier.
protocol: optional protocol to use instead of the store protocol.
Returns:
The URL of the object in the store.
"""
filepath = self._build_object_path(key)
if not protocol and not io_utils.is_remote(filepath):
protocol = "file:"
if protocol:
filepath = filepath.replace(self.proto, f"{protocol}//", 1)
return filepath
list_keys(self, prefix=())
List the keys of all objects identified by a partial key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
prefix |
Tuple[str, ...] |
partial object key identifier. |
() |
Returns:
Type | Description |
---|---|
List[Tuple[str, ...]] |
List of keys identifying all objects present in the store that match the input partial key. |
Source code in zenml/integrations/great_expectations/ge_store_backend.py
def list_keys(self, prefix: Tuple[str, ...] = ()) -> List[Tuple[str, ...]]:
"""List the keys of all objects identified by a partial key.
Args:
prefix: partial object key identifier.
Returns:
List of keys identifying all objects present in the store that
match the input partial key.
"""
key_list = []
list_path = self._build_object_path(prefix, is_prefix=True)
root_path = self._build_object_path(tuple(), is_prefix=True)
for root, dirs, files in fileio.walk(list_path):
for file_ in files:
filepath = os.path.relpath(
os.path.join(str(root), str(file_)), root_path
)
if self.filepath_prefix and not filepath.startswith(
self.filepath_prefix
):
continue
elif self.filepath_suffix and not filepath.endswith(
self.filepath_suffix
):
continue
key = self._convert_filepath_to_key(filepath) # type: ignore[no-untyped-call]
if key and not self.is_ignored_key(key): # type: ignore[no-untyped-call]
key_list.append(key)
return key_list
remove_key(self, key)
Delete an object from the store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Tuple[str, ...] |
object key identifier. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the object existed in the store and was removed, otherwise False. |
Source code in zenml/integrations/great_expectations/ge_store_backend.py
def remove_key(self, key: Tuple[str, ...]) -> bool: # type: ignore[override]
"""Delete an object from the store.
Args:
key: object key identifier.
Returns:
True if the object existed in the store and was removed, otherwise
False.
"""
filepath: str = self._build_object_path(key)
if fileio.exists(filepath):
fileio.remove(filepath)
if not io_utils.is_remote(filepath):
parent_dir = str(Path(filepath).parent)
self.rrmdir(self.root_path, str(parent_dir))
return True
return False
rrmdir(start_path, end_path)
staticmethod
Recursively removes empty dirs between start_path and end_path inclusive.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start_path |
str |
Directory to use as a starting point. |
required |
end_path |
str |
Directory to use as a destination point. |
required |
Source code in zenml/integrations/great_expectations/ge_store_backend.py
@staticmethod
def rrmdir(start_path: str, end_path: str) -> None:
"""Recursively removes empty dirs between start_path and end_path inclusive.
Args:
start_path: Directory to use as a starting point.
end_path: Directory to use as a destination point.
"""
while not os.listdir(end_path) and start_path != end_path:
os.rmdir(end_path)
end_path = os.path.dirname(end_path)
materializers
special
Materializers for Great Expectation serializable objects.
ge_materializer
Implementation of the Great Expectations materializers.
GreatExpectationsMaterializer (BaseMaterializer)
Materializer to read/write Great Expectation objects.
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
class GreatExpectationsMaterializer(BaseMaterializer):
"""Materializer to read/write Great Expectation objects."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
ExpectationSuite,
CheckpointResult,
)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = (
ArtifactType.DATA_ANALYSIS
)
@staticmethod
def preprocess_checkpoint_result_dict(
artifact_dict: Dict[str, Any],
) -> None:
"""Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.
The GE CheckpointResult object is not fully de-serializable
due to some missing code in the GE codebase. We need to compensate
for this by manually converting some of the attributes to
their correct data types.
Args:
artifact_dict: A dict containing the GE checkpoint result.
"""
def preprocess_run_result(key: str, value: Any) -> Any:
if key == "validation_result":
return ExpectationSuiteValidationResult(**value)
return value
artifact_dict["checkpoint_config"] = CheckpointConfig(
**artifact_dict["checkpoint_config"]
)
validation_dict = {}
for result_ident, results in artifact_dict["run_results"].items():
validation_ident = (
ValidationResultIdentifier.from_fixed_length_tuple( # type: ignore[no-untyped-call]
result_ident.split("::")[1].split("/")
)
)
validation_results = {
result_name: preprocess_run_result(result_name, result)
for result_name, result in results.items()
}
validation_dict[validation_ident] = validation_results
artifact_dict["run_results"] = validation_dict
def load(self, data_type: Type[Any]) -> SerializableDictDot:
"""Reads and returns a Great Expectations object.
Args:
data_type: The type of the data to read.
Returns:
A loaded Great Expectations object.
"""
filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
artifact_dict = yaml_utils.read_json(filepath)
data_type = source_utils.load(artifact_dict.pop("data_type"))
if data_type is CheckpointResult:
self.preprocess_checkpoint_result_dict(artifact_dict)
return data_type(**artifact_dict)
def save(self, obj: SerializableDictDot) -> None:
"""Writes a Great Expectations object.
Args:
obj: A Great Expectations object.
"""
filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
artifact_dict = obj.to_json_dict()
artifact_type = type(obj)
artifact_dict["data_type"] = (
f"{artifact_type.__module__}.{artifact_type.__name__}"
)
yaml_utils.write_json(filepath, artifact_dict)
def save_visualizations(
self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, VisualizationType]:
"""Saves visualizations for the given Great Expectations object.
Args:
data: The Great Expectations object to save visualizations for.
Returns:
A dictionary of visualization URIs and their types.
"""
visualizations = {}
if isinstance(data, CheckpointResult):
result = cast(CheckpointResult, data)
identifier = next(iter(result.run_results.keys()))
else:
suite = cast(ExpectationSuite, data)
identifier = ExpectationSuiteIdentifier(
suite.expectation_suite_name
)
context = GreatExpectationsDataValidator.get_data_context()
sites = context.get_docs_sites_urls(identifier)
for site in sites:
url = site["site_url"]
visualizations[url] = VisualizationType.HTML
return visualizations
def extract_metadata(
self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given Great Expectations object.
Args:
data: The Great Expectations object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
if isinstance(data, CheckpointResult):
return {
"checkpoint_result_name": data.name,
"checkpoint_result_passed": data.success,
}
elif isinstance(data, ExpectationSuite):
return {
"expectation_suite_name": data.name,
}
return {}
extract_metadata(self, data)
Extract metadata from the given Great Expectations object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[great_expectations.core.ExpectationSuite, great_expectations.checkpoint.types.checkpoint_result.CheckpointResult] |
The Great Expectations object to extract metadata from. |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
The extracted metadata as a dictionary. |
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def extract_metadata(
self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given Great Expectations object.
Args:
data: The Great Expectations object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
if isinstance(data, CheckpointResult):
return {
"checkpoint_result_name": data.name,
"checkpoint_result_passed": data.success,
}
elif isinstance(data, ExpectationSuite):
return {
"expectation_suite_name": data.name,
}
return {}
load(self, data_type)
Reads and returns a Great Expectations object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
great_expectations.types.SerializableDictDot |
A loaded Great Expectations object. |
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def load(self, data_type: Type[Any]) -> SerializableDictDot:
"""Reads and returns a Great Expectations object.
Args:
data_type: The type of the data to read.
Returns:
A loaded Great Expectations object.
"""
filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
artifact_dict = yaml_utils.read_json(filepath)
data_type = source_utils.load(artifact_dict.pop("data_type"))
if data_type is CheckpointResult:
self.preprocess_checkpoint_result_dict(artifact_dict)
return data_type(**artifact_dict)
preprocess_checkpoint_result_dict(artifact_dict)
staticmethod
Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.
The GE CheckpointResult object is not fully de-serializable due to some missing code in the GE codebase. We need to compensate for this by manually converting some of the attributes to their correct data types.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_dict |
Dict[str, Any] |
A dict containing the GE checkpoint result. |
required |
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
@staticmethod
def preprocess_checkpoint_result_dict(
artifact_dict: Dict[str, Any],
) -> None:
"""Pre-processes a GE checkpoint dict before it is used to de-serialize a GE CheckpointResult object.
The GE CheckpointResult object is not fully de-serializable
due to some missing code in the GE codebase. We need to compensate
for this by manually converting some of the attributes to
their correct data types.
Args:
artifact_dict: A dict containing the GE checkpoint result.
"""
def preprocess_run_result(key: str, value: Any) -> Any:
if key == "validation_result":
return ExpectationSuiteValidationResult(**value)
return value
artifact_dict["checkpoint_config"] = CheckpointConfig(
**artifact_dict["checkpoint_config"]
)
validation_dict = {}
for result_ident, results in artifact_dict["run_results"].items():
validation_ident = (
ValidationResultIdentifier.from_fixed_length_tuple( # type: ignore[no-untyped-call]
result_ident.split("::")[1].split("/")
)
)
validation_results = {
result_name: preprocess_run_result(result_name, result)
for result_name, result in results.items()
}
validation_dict[validation_ident] = validation_results
artifact_dict["run_results"] = validation_dict
save(self, obj)
Writes a Great Expectations object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
great_expectations.types.SerializableDictDot |
A Great Expectations object. |
required |
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def save(self, obj: SerializableDictDot) -> None:
"""Writes a Great Expectations object.
Args:
obj: A Great Expectations object.
"""
filepath = os.path.join(self.uri, ARTIFACT_FILENAME)
artifact_dict = obj.to_json_dict()
artifact_type = type(obj)
artifact_dict["data_type"] = (
f"{artifact_type.__module__}.{artifact_type.__name__}"
)
yaml_utils.write_json(filepath, artifact_dict)
save_visualizations(self, data)
Saves visualizations for the given Great Expectations object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[great_expectations.core.ExpectationSuite, great_expectations.checkpoint.types.checkpoint_result.CheckpointResult] |
The Great Expectations object to save visualizations for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.enums.VisualizationType] |
A dictionary of visualization URIs and their types. |
Source code in zenml/integrations/great_expectations/materializers/ge_materializer.py
def save_visualizations(
self, data: Union[ExpectationSuite, CheckpointResult]
) -> Dict[str, VisualizationType]:
"""Saves visualizations for the given Great Expectations object.
Args:
data: The Great Expectations object to save visualizations for.
Returns:
A dictionary of visualization URIs and their types.
"""
visualizations = {}
if isinstance(data, CheckpointResult):
result = cast(CheckpointResult, data)
identifier = next(iter(result.run_results.keys()))
else:
suite = cast(ExpectationSuite, data)
identifier = ExpectationSuiteIdentifier(
suite.expectation_suite_name
)
context = GreatExpectationsDataValidator.get_data_context()
sites = context.get_docs_sites_urls(identifier)
for site in sites:
url = site["site_url"]
visualizations[url] = VisualizationType.HTML
return visualizations
steps
special
Great Expectations data profiling and validation standard steps.
ge_profiler
Great Expectations data profiling standard step.
ge_validator
Great Expectations data validation standard step.
utils
Great Expectations data profiling standard step.
create_batch_request(context, dataset, data_asset_name)
Create a temporary runtime GE batch request from a dataset step artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
great_expectations.data_context.data_context.abstract_data_context.AbstractDataContext |
Great Expectations data context. |
required |
dataset |
DataFrame |
Input dataset. |
required |
data_asset_name |
Optional[str] |
Optional custom name for the data asset. |
required |
Returns:
Type | Description |
---|---|
great_expectations.core.batch.RuntimeBatchRequest |
A Great Expectations runtime batch request. |
Source code in zenml/integrations/great_expectations/utils.py
def create_batch_request(
context: AbstractDataContext,
dataset: pd.DataFrame,
data_asset_name: Optional[str],
) -> RuntimeBatchRequest:
"""Create a temporary runtime GE batch request from a dataset step artifact.
Args:
context: Great Expectations data context.
dataset: Input dataset.
data_asset_name: Optional custom name for the data asset.
Returns:
A Great Expectations runtime batch request.
"""
try:
# get pipeline name, step name and run id
step_context = get_step_context()
pipeline_name = step_context.pipeline.name
run_name = step_context.pipeline_run.name
step_name = step_context.step_run.name
except RuntimeError:
# if not running inside a pipeline step, use random values
pipeline_name = f"pipeline_{random_str(5)}"
run_name = f"pipeline_{random_str(5)}"
step_name = f"step_{random_str(5)}"
datasource_name = f"{run_name}_{step_name}"
data_connector_name = datasource_name
data_asset_name = data_asset_name or f"{pipeline_name}_{step_name}"
batch_identifier = "default"
datasource_config: Dict[str, Any] = {
"name": datasource_name,
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "PandasExecutionEngine",
},
"data_connectors": {
data_connector_name: {
"class_name": "RuntimeDataConnector",
"batch_identifiers": [batch_identifier],
},
},
}
context.add_datasource(**datasource_config)
batch_request = RuntimeBatchRequest(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
runtime_parameters={"batch_data": dataset},
batch_identifiers={batch_identifier: batch_identifier},
)
return batch_request