Config
zenml.config
special
The config
module contains classes and functions that manage user-specific configuration.
ZenML's configuration is stored in a file called
config.yaml
, located on the user's directory for configuration files.
(The exact location differs from operating system to operating system.)
The GlobalConfiguration
class is the main class in this module. It provides
a Pydantic configuration object that is used to store and retrieve
configuration. This GlobalConfiguration
object handles the serialization and
deserialization of the configuration options that are stored in the file in
order to persist the configuration across sessions.
base_settings
Base class for all ZenML settings.
BaseSettings (SecretReferenceMixin)
pydantic-model
Base class for settings.
The LEVEL
class variable defines on which level the settings can be
specified. By default, subclasses can be defined on both pipelines and
steps.
Source code in zenml/config/base_settings.py
class BaseSettings(SecretReferenceMixin):
"""Base class for settings.
The `LEVEL` class variable defines on which level the settings can be
specified. By default, subclasses can be defined on both pipelines and
steps.
"""
LEVEL: ClassVar[ConfigurationLevel] = (
ConfigurationLevel.PIPELINE | ConfigurationLevel.STEP
)
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# allow extra attributes so this class can be used to parse dicts
# of arbitrary subclasses
extra = Extra.allow
Config
Pydantic configuration class.
Source code in zenml/config/base_settings.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# allow extra attributes so this class can be used to parse dicts
# of arbitrary subclasses
extra = Extra.allow
ConfigurationLevel (IntFlag)
Settings configuration level.
Bit flag that can be used to specify where a BaseSettings
subclass
can be specified.
Source code in zenml/config/base_settings.py
class ConfigurationLevel(IntFlag):
"""Settings configuration level.
Bit flag that can be used to specify where a `BaseSettings` subclass
can be specified.
"""
STEP = auto()
PIPELINE = auto()
compiler
Class for compiling ZenML pipelines into a serializable format.
Compiler
Compiles ZenML pipelines to serializable representations.
Source code in zenml/config/compiler.py
class Compiler:
"""Compiles ZenML pipelines to serializable representations."""
def compile(
self,
pipeline: "BasePipeline",
stack: "Stack",
run_configuration: PipelineRunConfiguration,
) -> PipelineDeployment:
"""Compiles a ZenML pipeline to a serializable representation.
Args:
pipeline: The pipeline to compile.
stack: The stack on which the pipeline will run.
run_configuration: The run configuration for this pipeline.
Returns:
The compiled pipeline.
"""
logger.debug("Compiling pipeline `%s`.", pipeline.name)
# Copy the pipeline before we apply any run-level configurations so
# we don't mess with the pipeline object/step objects in any way
pipeline = copy.deepcopy(pipeline)
self._apply_run_configuration(
pipeline=pipeline, config=run_configuration
)
pipeline.connect(**pipeline.steps)
pb2_pipeline = self._compile_proto_pipeline(
pipeline=pipeline, stack=stack
)
pipeline_settings = self._filter_and_validate_settings(
settings=pipeline.configuration.settings,
configuration_level=ConfigurationLevel.PIPELINE,
stack=stack,
)
pipeline.configure(settings=pipeline_settings, merge=False)
settings_to_passdown = {
key: settings
for key, settings in pipeline_settings.items()
if ConfigurationLevel.STEP in settings.LEVEL
}
steps = {
name: self._compile_step(
step=step,
pipeline_settings=settings_to_passdown,
pipeline_extra=pipeline.configuration.extra,
stack=stack,
)
for name, step in self._get_sorted_steps(
pb2_pipeline, steps=pipeline.steps
)
}
self._ensure_required_stack_components_exist(
stack=stack, steps=list(steps.values())
)
run_name = run_configuration.run_name or self._get_default_run_name(
pipeline_name=pipeline.name
)
encoded_pb2_pipeline = string_utils.b64_encode(
json_format.MessageToJson(pb2_pipeline)
)
deployment = PipelineDeployment(
run_name=run_name,
stack_id=stack.id,
schedule=run_configuration.schedule,
pipeline=pipeline.configuration,
proto_pipeline=encoded_pb2_pipeline,
steps=steps,
)
logger.debug("Compiled pipeline deployment: %s", deployment)
return deployment
def _apply_run_configuration(
self, pipeline: "BasePipeline", config: PipelineRunConfiguration
) -> None:
"""Applies run configurations to the pipeline and its steps.
Args:
pipeline: The pipeline to configure.
config: The run configurations.
Raises:
KeyError: If the run configuration contains options for a
non-existent step.
"""
pipeline.configure(
enable_cache=config.enable_cache,
settings=config.settings,
extra=config.extra,
)
for step_name, step_config in config.steps.items():
if step_name not in pipeline.steps:
raise KeyError(f"No step with name {step_name}.")
pipeline.steps[step_name]._apply_configuration(step_config)
def _filter_and_validate_settings(
self,
settings: Dict[str, "BaseSettings"],
configuration_level: ConfigurationLevel,
stack: "Stack",
) -> Dict[str, "BaseSettings"]:
"""Filters and validates settings.
Args:
settings: The settings to check.
configuration_level: The level on which these settings
were configured.
stack: The stack on which the pipeline will run.
Raises:
TypeError: If settings with an unsupported configuration
level were specified.
Returns:
The filtered settings.
"""
validated_settings = {}
for key, settings_instance in settings.items():
resolver = SettingsResolver(key=key, settings=settings_instance)
try:
settings_instance = resolver.resolve(stack=stack)
except KeyError:
logger.info(
"Not including stack component settings with key `%s`.", key
)
if configuration_level not in settings_instance.LEVEL:
raise TypeError(
f"The settings class {settings_instance.__class__} can not "
f"be specified on a {configuration_level.name} level."
)
validated_settings[key] = settings_instance
return validated_settings
def _get_step_spec(self, step: "BaseStep") -> StepSpec:
"""Gets the spec for a step.
Args:
step: The step for which to get the spec.
Returns:
The step spec.
"""
return StepSpec(
source=source_utils.resolve_class(step.__class__),
upstream_steps=sorted(step.upstream_steps),
)
def _compile_step(
self,
step: "BaseStep",
pipeline_settings: Dict[str, "BaseSettings"],
pipeline_extra: Dict[str, Any],
stack: "Stack",
) -> Step:
"""Compiles a ZenML step.
Args:
step: The step to compile.
pipeline_settings: settings configured on the
pipeline of the step.
pipeline_extra: Extra values configured on the pipeline of the step.
stack: The stack on which the pipeline will be run.
Returns:
The compiled step.
"""
step_spec = self._get_step_spec(step=step)
step_settings = self._filter_and_validate_settings(
settings=step.configuration.settings,
configuration_level=ConfigurationLevel.STEP,
stack=stack,
)
merged_settings = {
**pipeline_settings,
**step_settings,
}
merged_extras = {**pipeline_extra, **step.configuration.extra}
step.configure(
settings=merged_settings,
extra=merged_extras,
merge=False,
)
complete_step_configuration = StepConfiguration(
docstring=step.__doc__, **step.configuration.dict()
)
return Step(spec=step_spec, config=complete_step_configuration)
def _compile_proto_pipeline(
self, pipeline: "BasePipeline", stack: "Stack"
) -> Pb2Pipeline:
"""Compiles a ZenML pipeline into a TFX protobuf pipeline.
Args:
pipeline: The pipeline to compile.
stack: The stack on which the pipeline will run.
Raises:
KeyError: If any step of the pipeline contains an invalid upstream
step.
Returns:
The compiled proto pipeline.
"""
# Connect the inputs/outputs of all steps in the pipeline
tfx_components = {
step.name: step.component for step in pipeline.steps.values()
}
# Add potential task dependencies that users specified
for step in pipeline.steps.values():
for upstream_step in step.upstream_steps:
try:
upstream_node = tfx_components[upstream_step]
except KeyError:
raise KeyError(
f"Unable to find upstream step `{upstream_step}` for step "
f"`{step.name}`. Available steps: {set(tfx_components)}."
)
step.component.add_upstream_node(upstream_node)
artifact_store = stack.artifact_store
# We do not pass the metadata connection config here as it might not be
# accessible. Instead it is queried from the active stack right before a
# step is executed (see `BaseOrchestrator.run_step(...)`)
intermediate_tfx_pipeline = tfx_pipeline.Pipeline(
pipeline_name=pipeline.name,
components=list(tfx_components.values()),
pipeline_root=artifact_store.path,
enable_cache=pipeline.enable_cache,
)
return TFXCompiler().compile(intermediate_tfx_pipeline)
@staticmethod
def _get_default_run_name(pipeline_name: str) -> str:
"""Gets the default name for a pipeline run.
Args:
pipeline_name: Name of the pipeline which will be run.
Returns:
Run name.
"""
return (
f"{pipeline_name}-"
f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
)
@staticmethod
def _get_sorted_steps(
pb2_pipeline: Pb2Pipeline, steps: Dict[str, "BaseStep"]
) -> List[Tuple[str, "BaseStep"]]:
"""Sorts the steps of a pipeline.
The resulting list of steps will be in an order that can be executed
sequentially without any conflicts.
Args:
pb2_pipeline: Pipeline proto representation.
steps: ZenML pipeline steps.
Returns:
The sorted steps.
"""
mapping = {
step.name: (name_in_pipeline, step)
for name_in_pipeline, step in steps.items()
}
sorted_steps = []
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node
sorted_steps.append(mapping[pipeline_node.node_info.id])
return sorted_steps
@staticmethod
def _ensure_required_stack_components_exist(
stack: "Stack", steps: Sequence["Step"]
) -> None:
"""Ensures that the stack components required for each step exist.
Args:
stack: The stack on which the pipeline should be deployed.
steps: The steps of the pipeline.
Raises:
StackValidationError: If a required stack component is missing.
"""
available_step_operators = (
{stack.step_operator.name} if stack.step_operator else set()
)
available_experiment_trackers = (
{stack.experiment_tracker.name}
if stack.experiment_tracker
else set()
)
for step in steps:
step_operator = step.config.step_operator
if step_operator and step_operator not in available_step_operators:
raise StackValidationError(
f"Step '{step.config.name}' requires step operator "
f"'{step_operator}' which is not configured in "
f"the stack '{stack.name}'. Available step operators: "
f"{available_step_operators}."
)
experiment_tracker = step.config.experiment_tracker
if (
experiment_tracker
and experiment_tracker not in available_experiment_trackers
):
raise StackValidationError(
f"Step '{step.config.name}' requires experiment tracker "
f"'{experiment_tracker}' which is not "
f"configured in the stack '{stack.name}'. Available "
f"experiment trackers: {available_experiment_trackers}."
)
compile(self, pipeline, stack, run_configuration)
Compiles a ZenML pipeline to a serializable representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline to compile. |
required |
stack |
Stack |
The stack on which the pipeline will run. |
required |
run_configuration |
PipelineRunConfiguration |
The run configuration for this pipeline. |
required |
Returns:
Type | Description |
---|---|
PipelineDeployment |
The compiled pipeline. |
Source code in zenml/config/compiler.py
def compile(
self,
pipeline: "BasePipeline",
stack: "Stack",
run_configuration: PipelineRunConfiguration,
) -> PipelineDeployment:
"""Compiles a ZenML pipeline to a serializable representation.
Args:
pipeline: The pipeline to compile.
stack: The stack on which the pipeline will run.
run_configuration: The run configuration for this pipeline.
Returns:
The compiled pipeline.
"""
logger.debug("Compiling pipeline `%s`.", pipeline.name)
# Copy the pipeline before we apply any run-level configurations so
# we don't mess with the pipeline object/step objects in any way
pipeline = copy.deepcopy(pipeline)
self._apply_run_configuration(
pipeline=pipeline, config=run_configuration
)
pipeline.connect(**pipeline.steps)
pb2_pipeline = self._compile_proto_pipeline(
pipeline=pipeline, stack=stack
)
pipeline_settings = self._filter_and_validate_settings(
settings=pipeline.configuration.settings,
configuration_level=ConfigurationLevel.PIPELINE,
stack=stack,
)
pipeline.configure(settings=pipeline_settings, merge=False)
settings_to_passdown = {
key: settings
for key, settings in pipeline_settings.items()
if ConfigurationLevel.STEP in settings.LEVEL
}
steps = {
name: self._compile_step(
step=step,
pipeline_settings=settings_to_passdown,
pipeline_extra=pipeline.configuration.extra,
stack=stack,
)
for name, step in self._get_sorted_steps(
pb2_pipeline, steps=pipeline.steps
)
}
self._ensure_required_stack_components_exist(
stack=stack, steps=list(steps.values())
)
run_name = run_configuration.run_name or self._get_default_run_name(
pipeline_name=pipeline.name
)
encoded_pb2_pipeline = string_utils.b64_encode(
json_format.MessageToJson(pb2_pipeline)
)
deployment = PipelineDeployment(
run_name=run_name,
stack_id=stack.id,
schedule=run_configuration.schedule,
pipeline=pipeline.configuration,
proto_pipeline=encoded_pb2_pipeline,
steps=steps,
)
logger.debug("Compiled pipeline deployment: %s", deployment)
return deployment
config_keys
Validates global configuration values.
ConfigKeys
Class to validate dictionary configurations.
Source code in zenml/config/config_keys.py
class ConfigKeys:
"""Class to validate dictionary configurations."""
@classmethod
def get_keys(cls) -> Tuple[List[str], List[str]]:
"""Gets all the required and optional config keys for this class.
Returns:
A tuple (required, optional) which are lists of the
required/optional keys for this class.
"""
keys = {
key: value
for key, value in cls.__dict__.items()
if not isinstance(value, classmethod)
and not isinstance(value, staticmethod)
and not callable(value)
and not key.startswith("__")
}
required = [v for k, v in keys.items() if not k.endswith("_")]
optional = [v for k, v in keys.items() if k.endswith("_")]
return required, optional
@classmethod
def key_check(cls, config: Dict[str, Any]) -> None:
"""Checks whether a configuration dict contains all required keys and no unknown keys.
Args:
config: The configuration dict to verify.
Raises:
TypeError: If no config dictionary is passed.
ValueError: If required keys are missing or unknown keys are found.
"""
if not isinstance(config, dict):
raise TypeError(f"Please specify a dict for {cls.__name__}")
# Required and optional keys for the config dict
required, optional = cls.get_keys()
# Check for missing keys
missing_keys = [k for k in required if k not in config.keys()]
if missing_keys:
raise ValueError(f"Missing key(s) {missing_keys} in {cls.__name__}")
# Check for unknown keys
unknown_keys = [
k for k in config.keys() if k not in required and k not in optional
]
if unknown_keys:
raise ValueError(
f"Unknown key(s) {unknown_keys} in {cls.__name__}. "
f"Required keys: {required}, optional keys: {optional}."
)
get_keys()
classmethod
Gets all the required and optional config keys for this class.
Returns:
Type | Description |
---|---|
Tuple[List[str], List[str]] |
A tuple (required, optional) which are lists of the required/optional keys for this class. |
Source code in zenml/config/config_keys.py
@classmethod
def get_keys(cls) -> Tuple[List[str], List[str]]:
"""Gets all the required and optional config keys for this class.
Returns:
A tuple (required, optional) which are lists of the
required/optional keys for this class.
"""
keys = {
key: value
for key, value in cls.__dict__.items()
if not isinstance(value, classmethod)
and not isinstance(value, staticmethod)
and not callable(value)
and not key.startswith("__")
}
required = [v for k, v in keys.items() if not k.endswith("_")]
optional = [v for k, v in keys.items() if k.endswith("_")]
return required, optional
key_check(config)
classmethod
Checks whether a configuration dict contains all required keys and no unknown keys.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Dict[str, Any] |
The configuration dict to verify. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If no config dictionary is passed. |
ValueError |
If required keys are missing or unknown keys are found. |
Source code in zenml/config/config_keys.py
@classmethod
def key_check(cls, config: Dict[str, Any]) -> None:
"""Checks whether a configuration dict contains all required keys and no unknown keys.
Args:
config: The configuration dict to verify.
Raises:
TypeError: If no config dictionary is passed.
ValueError: If required keys are missing or unknown keys are found.
"""
if not isinstance(config, dict):
raise TypeError(f"Please specify a dict for {cls.__name__}")
# Required and optional keys for the config dict
required, optional = cls.get_keys()
# Check for missing keys
missing_keys = [k for k in required if k not in config.keys()]
if missing_keys:
raise ValueError(f"Missing key(s) {missing_keys} in {cls.__name__}")
# Check for unknown keys
unknown_keys = [
k for k in config.keys() if k not in required and k not in optional
]
if unknown_keys:
raise ValueError(
f"Unknown key(s) {unknown_keys} in {cls.__name__}. "
f"Required keys: {required}, optional keys: {optional}."
)
PipelineConfigurationKeys (ConfigKeys)
Keys for a pipeline configuration dict.
Source code in zenml/config/config_keys.py
class PipelineConfigurationKeys(ConfigKeys):
"""Keys for a pipeline configuration dict."""
NAME = "name"
STEPS = "steps"
SourceConfigurationKeys (ConfigKeys)
Keys for a step configuration source dict.
Source code in zenml/config/config_keys.py
class SourceConfigurationKeys(ConfigKeys):
"""Keys for a step configuration source dict."""
FILE_ = "file"
NAME_ = "name"
StepConfigurationKeys (ConfigKeys)
Keys for a step configuration dict.
Source code in zenml/config/config_keys.py
class StepConfigurationKeys(ConfigKeys):
"""Keys for a step configuration dict."""
SOURCE_ = "source"
PARAMETERS_ = "parameters"
MATERIALIZERS_ = "materializers"
constants
ZenML settings constants.
docker_settings
Docker settings.
DockerSettings (BaseSettings)
pydantic-model
Settings for building Docker images to run ZenML pipelines.
Build process:
- No
dockerfile
specified: If any of the options regarding requirements, environment variables or copying files require us to build an image, ZenML will build this image. Otherwise theparent_image
will be used to run the pipeline. dockerfile
specified: ZenML will first build an image based on the specified Dockerfile. If any of the options regarding requirements, environment variables or copying files require an additional image built on top of that, ZenML will build a second image. If not, the image build from the specified Dockerfile will be used to run the pipeline.
Requirements installation order:
Depending on the configuration of this object, requirements will be
installed in the following order (each step optional):
- The packages installed in your local python environment
- The packages specified via the requirements
attribute
- The packages specified via the required_integrations
and potentially
stack requirements
Attributes:
Name | Type | Description |
---|---|---|
parent_image |
Optional[str] |
Full name of the Docker image that should be used as the parent for the image that will be built. Defaults to a ZenML image built for the active Python and ZenML version. Additional notes:
* If you specify a custom image here, you need to make sure it has
ZenML installed.
* If this is a non-local image, the environment which is running
the pipeline and building the Docker image needs to be able to pull
this image.
* If a custom |
dockerfile |
Optional[str] |
Path to a custom Dockerfile that should be built. Depending on the other values you specify in this object, the resulting image will be used directly to run your pipeline or ZenML will use it as a parent image to build on top of. See the general docstring of this class for more information. Additional notes:
* If you specify this, the |
build_context_root |
Optional[str] |
Build context root for the Docker build, only used
when the |
build_options |
Dict[str, Any] |
Additional options that will be passed unmodified to the
Docker build call when building an image using the specified
|
docker_target_repository |
Name of the Docker repository to which the image should be pushed. This repository will be appended to the registry URI of the container registry of your stack and should therefore not include any registry. |
|
replicate_local_python_environment |
Optional[zenml.config.docker_settings.PythonEnvironmentExportMethod] |
If not |
requirements |
Union[NoneType, str, List[str]] |
Path to a requirements file or a list of required pip
packages. During the image build, these requirements will be
installed using pip. If you need to use a different tool to
resolve and/or install your packages, please use a custom parent
image or specify a custom |
required_integrations |
List[str] |
List of ZenML integrations that should be installed. All requirements for the specified integrations will be installed inside the Docker image. |
install_stack_requirements |
bool |
If |
environment |
Dict[str, Any] |
Dictionary of environment variables to set inside the Docker image. |
dockerignore |
Optional[str] |
Path to a dockerignore file to use when building the Docker image. |
copy_files |
bool |
If |
copy_global_config |
bool |
If |
user |
Optional[str] |
If not |
Source code in zenml/config/docker_settings.py
class DockerSettings(BaseSettings):
"""Settings for building Docker images to run ZenML pipelines.
Build process:
--------------
* No `dockerfile` specified: If any of the options regarding
requirements, environment variables or copying files require us to build an
image, ZenML will build this image. Otherwise the `parent_image` will be
used to run the pipeline.
* `dockerfile` specified: ZenML will first build an image based on the
specified Dockerfile. If any of the options regarding
requirements, environment variables or copying files require an additional
image built on top of that, ZenML will build a second image. If not, the
image build from the specified Dockerfile will be used to run the pipeline.
Requirements installation order:
--------------------------------
Depending on the configuration of this object, requirements will be
installed in the following order (each step optional):
- The packages installed in your local python environment
- The packages specified via the `requirements` attribute
- The packages specified via the `required_integrations` and potentially
stack requirements
Attributes:
parent_image: Full name of the Docker image that should be
used as the parent for the image that will be built. Defaults to
a ZenML image built for the active Python and ZenML version.
Additional notes:
* If you specify a custom image here, you need to make sure it has
ZenML installed.
* If this is a non-local image, the environment which is running
the pipeline and building the Docker image needs to be able to pull
this image.
* If a custom `dockerfile` is specified for this settings
object, this parent image will be ignored.
dockerfile: Path to a custom Dockerfile that should be built. Depending
on the other values you specify in this object, the resulting
image will be used directly to run your pipeline or ZenML will use
it as a parent image to build on top of. See the general docstring
of this class for more information.
Additional notes:
* If you specify this, the `parent_image` attribute will be ignored.
* If you specify this, the image built from this Dockerfile needs
to have ZenML installed.
build_context_root: Build context root for the Docker build, only used
when the `dockerfile` attribute is set. If this is left empty, the
build context will only contain the Dockerfile.
build_options: Additional options that will be passed unmodified to the
Docker build call when building an image using the specified
`dockerfile`. You can use this to for example specify build
args or a target stage. See
https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
for a full list of available options.
docker_target_repository: Name of the Docker repository to which the
image should be pushed. This repository will be appended to the
registry URI of the container registry of your stack and should
therefore **not** include any registry.
replicate_local_python_environment: If not `None`, ZenML will use the
specified method to generate a requirements file that replicates
the packages installed in the currently running python environment.
This requirements file will then be installed in the Docker image.
requirements: Path to a requirements file or a list of required pip
packages. During the image build, these requirements will be
installed using pip. If you need to use a different tool to
resolve and/or install your packages, please use a custom parent
image or specify a custom `dockerfile`.
required_integrations: List of ZenML integrations that should be
installed. All requirements for the specified integrations will
be installed inside the Docker image.
install_stack_requirements: If `True`, ZenML will automatically detect
if components of your active stack are part of a ZenML integration
and install the corresponding requirements. If you set this to
`False` or use custom components in your stack, you need to make
sure these get installed by specifying them in the `requirements`
attribute.
environment: Dictionary of environment variables to set inside the
Docker image.
dockerignore: Path to a dockerignore file to use when building the
Docker image.
copy_files: If `True`, user files will be copied into the Docker image.
If this is set to `False`, ZenML will not copy any of your files
into the Docker image and you're responsible that all the files
to run your pipeline exist in the right place.
copy_global_config: If `True`, the global configuration (contains
connection info for your ZenStore) will be copied into the Docker
image. If this is set to `False`, ZenML will not copy this
configuration and you're responsible for making sure ZenML can
access the ZenStore in the Docker image.
user: If not `None`, will use the USER instruction to set the username and
run the commands of the dockerfile as `user` instead of root.
Specifically, the specified user is used for RUN instructions
and at runtime, runs the relevant ENTRYPOINT and CMD commands.
"""
LEVEL = ConfigurationLevel.PIPELINE
parent_image: Optional[str] = None
dockerfile: Optional[str] = None
build_context_root: Optional[str] = None
build_options: Dict[str, Any] = {}
target_repository: str = "zenml"
replicate_local_python_environment: Optional[
PythonEnvironmentExportMethod
] = None
requirements: Union[None, str, List[str]] = None
required_integrations: List[str] = []
install_stack_requirements: bool = True
environment: Dict[str, Any] = {}
dockerignore: Optional[str] = None
copy_files: bool = True
copy_global_config: bool = True
user: Optional[str] = None
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# prevent extra attributes during model initialization
extra = Extra.forbid
Config
Pydantic configuration class.
Source code in zenml/config/docker_settings.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# prevent extra attributes during model initialization
extra = Extra.forbid
PythonEnvironmentExportMethod (Enum)
Different methods to export the local Python environment.
Source code in zenml/config/docker_settings.py
class PythonEnvironmentExportMethod(Enum):
"""Different methods to export the local Python environment."""
PIP_FREEZE = "pip_freeze"
POETRY_EXPORT = "poetry_export"
@property
def command(self) -> str:
"""Shell command that outputs local python packages.
The output string must be something that can be interpreted as a
requirements file for pip once it's written to a file.
Returns:
Shell command.
"""
return {
PythonEnvironmentExportMethod.PIP_FREEZE: "pip freeze",
PythonEnvironmentExportMethod.POETRY_EXPORT: "poetry export --format=requirements.txt",
}[self]
global_config
Functionality to support ZenML GlobalConfiguration.
GlobalConfigMetaClass (ModelMetaclass)
Global configuration metaclass.
This metaclass is used to enforce a singleton instance of the GlobalConfiguration class with the following additional properties:
- the GlobalConfiguration is initialized automatically on import with the default configuration, if no config file exists yet.
- the GlobalConfiguration undergoes a schema migration if the version of the config file is older than the current version of the ZenML package.
- a default store is set if no store is configured yet.
Source code in zenml/config/global_config.py
class GlobalConfigMetaClass(ModelMetaclass):
"""Global configuration metaclass.
This metaclass is used to enforce a singleton instance of the
GlobalConfiguration class with the following additional properties:
* the GlobalConfiguration is initialized automatically on import with the
default configuration, if no config file exists yet.
* the GlobalConfiguration undergoes a schema migration if the version of the
config file is older than the current version of the ZenML package.
* a default store is set if no store is configured yet.
"""
def __init__(cls, *args: Any, **kwargs: Any) -> None:
"""Initialize a singleton class.
Args:
*args: positional arguments
**kwargs: keyword arguments
"""
super().__init__(*args, **kwargs)
cls._global_config: Optional["GlobalConfiguration"] = None
def __call__(cls, *args: Any, **kwargs: Any) -> "GlobalConfiguration":
"""Create or return the default global config instance.
If the GlobalConfiguration constructor is called with custom arguments,
the singleton functionality of the metaclass is bypassed: a new
GlobalConfiguration instance is created and returned immediately and
without saving it as the global GlobalConfiguration singleton.
Args:
*args: positional arguments
**kwargs: keyword arguments
Returns:
The global GlobalConfiguration instance.
"""
if args or kwargs:
return cast(
"GlobalConfiguration", super().__call__(*args, **kwargs)
)
if not cls._global_config:
cls._global_config = cast(
"GlobalConfiguration", super().__call__(*args, **kwargs)
)
cls._global_config._migrate_config()
return cls._global_config
__call__(cls, *args, **kwargs)
special
Create or return the default global config instance.
If the GlobalConfiguration constructor is called with custom arguments, the singleton functionality of the metaclass is bypassed: a new GlobalConfiguration instance is created and returned immediately and without saving it as the global GlobalConfiguration singleton.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
positional arguments |
() |
**kwargs |
Any |
keyword arguments |
{} |
Returns:
Type | Description |
---|---|
GlobalConfiguration |
The global GlobalConfiguration instance. |
Source code in zenml/config/global_config.py
def __call__(cls, *args: Any, **kwargs: Any) -> "GlobalConfiguration":
"""Create or return the default global config instance.
If the GlobalConfiguration constructor is called with custom arguments,
the singleton functionality of the metaclass is bypassed: a new
GlobalConfiguration instance is created and returned immediately and
without saving it as the global GlobalConfiguration singleton.
Args:
*args: positional arguments
**kwargs: keyword arguments
Returns:
The global GlobalConfiguration instance.
"""
if args or kwargs:
return cast(
"GlobalConfiguration", super().__call__(*args, **kwargs)
)
if not cls._global_config:
cls._global_config = cast(
"GlobalConfiguration", super().__call__(*args, **kwargs)
)
cls._global_config._migrate_config()
return cls._global_config
__init__(cls, *args, **kwargs)
special
Initialize a singleton class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
positional arguments |
() |
**kwargs |
Any |
keyword arguments |
{} |
Source code in zenml/config/global_config.py
def __init__(cls, *args: Any, **kwargs: Any) -> None:
"""Initialize a singleton class.
Args:
*args: positional arguments
**kwargs: keyword arguments
"""
super().__init__(*args, **kwargs)
cls._global_config: Optional["GlobalConfiguration"] = None
GlobalConfiguration (BaseModel)
pydantic-model
Stores global configuration options.
Configuration options are read from a config file, but can be overwritten
by environment variables. See GlobalConfiguration.__getattribute__
for
more details.
Attributes:
Name | Type | Description |
---|---|---|
user_id |
Unique user id. |
|
user_email |
Email address associated with this client. |
|
user_email_opt_in |
Whether the user has opted in to email communication. |
|
analytics_opt_in |
If a user agreed to sending analytics or not. |
|
version |
Version of ZenML that was last used to create or update the global config. |
|
store |
Store configuration. |
|
active_stack_id |
The ID of the active stack. |
|
active_project_name |
The name of the active project. |
|
jwt_secret_key |
The secret key used to sign and verify JWT tokens. |
|
_config_path |
Directory where the global config file is stored. |
Source code in zenml/config/global_config.py
class GlobalConfiguration(BaseModel, metaclass=GlobalConfigMetaClass):
"""Stores global configuration options.
Configuration options are read from a config file, but can be overwritten
by environment variables. See `GlobalConfiguration.__getattribute__` for
more details.
Attributes:
user_id: Unique user id.
user_email: Email address associated with this client.
user_email_opt_in: Whether the user has opted in to email communication.
analytics_opt_in: If a user agreed to sending analytics or not.
version: Version of ZenML that was last used to create or update the
global config.
store: Store configuration.
active_stack_id: The ID of the active stack.
active_project_name: The name of the active project.
jwt_secret_key: The secret key used to sign and verify JWT tokens.
_config_path: Directory where the global config file is stored.
"""
user_id: uuid.UUID = Field(default_factory=uuid.uuid4, allow_mutation=False)
user_email: Optional[str] = None
user_email_opt_in: Optional[bool] = None
analytics_opt_in: bool = True
version: Optional[str]
store: Optional[StoreConfiguration]
active_stack_id: Optional[uuid.UUID]
active_project_name: Optional[str]
jwt_secret_key: str = Field(default_factory=generate_jwt_secret_key)
_config_path: str
_zen_store: Optional["BaseZenStore"] = None
_active_project: Optional["ProjectModel"] = None
def __init__(
self, config_path: Optional[str] = None, **kwargs: Any
) -> None:
"""Initializes a GlobalConfiguration object using values from the config file.
GlobalConfiguration is a singleton class: only one instance can exist.
Calling this constructor multiple times will always yield the same
instance (see the exception below).
The `config_path` argument is only meant for internal use and testing
purposes. User code must never pass it to the constructor. When a custom
`config_path` value is passed, an anonymous GlobalConfiguration instance
is created and returned independently of the GlobalConfiguration
singleton and that will have no effect as far as the rest of the ZenML
core code is concerned.
If the config file doesn't exist yet, we try to read values from the
legacy (ZenML version < 0.6) config file.
Args:
config_path: (internal use) custom config file path. When not
specified, the default global configuration path is used and the
global configuration singleton instance is returned. Only used
to create configuration copies for transfer to different
runtime environments.
**kwargs: keyword arguments
"""
self._config_path = config_path or self.default_config_directory()
config_values = self._read_config()
config_values.update(**kwargs)
super().__init__(**config_values)
if not fileio.exists(self._config_file(config_path)):
self._write_config()
@classmethod
def get_instance(cls) -> Optional["GlobalConfiguration"]:
"""Return the GlobalConfiguration singleton instance.
Returns:
The GlobalConfiguration singleton instance or None, if the
GlobalConfiguration hasn't been initialized yet.
"""
return cls._global_config
@classmethod
def _reset_instance(
cls, config: Optional["GlobalConfiguration"] = None
) -> None:
"""Reset the GlobalConfiguration singleton instance.
This method is only meant for internal use and testing purposes.
Args:
config: The GlobalConfiguration instance to set as the global
singleton. If None, the global GlobalConfiguration singleton is
reset to an empty value.
"""
cls._global_config = config
@validator("version")
def _validate_version(cls, v: Optional[str]) -> Optional[str]:
"""Validate the version attribute.
Args:
v: The version attribute value.
Returns:
The version attribute value.
Raises:
RuntimeError: If the version parsing fails.
"""
if v is None:
return v
if not isinstance(version.parse(v), version.Version):
# If the version parsing fails, it returns a `LegacyVersion`
# instead. Check to make sure it's an actual `Version` object
# which represents a valid version.
raise RuntimeError(f"Invalid version in global configuration: {v}.")
return v
def __setattr__(self, key: str, value: Any) -> None:
"""Sets an attribute on the config and persists the new value in the global configuration.
Args:
key: The attribute name.
value: The attribute value.
"""
super().__setattr__(key, value)
if key.startswith("_"):
return
self._write_config()
def __custom_getattribute__(self, key: str) -> Any:
"""Gets an attribute value for a specific key.
If a value for this attribute was specified using an environment
variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
value can be parsed to the attribute type, the value from this
environment variable is returned instead.
Args:
key: The attribute name.
Returns:
The attribute value.
"""
value = super().__getattribute__(key)
if key.startswith("_"):
return value
environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
try:
environment_variable_value = os.environ[environment_variable_name]
# set the environment variable value to leverage Pydantic's type
# conversion and validation
super().__setattr__(key, environment_variable_value)
return_value = super().__getattribute__(key)
# set back the old value as we don't want to permanently store
# the environment variable value here
super().__setattr__(key, value)
return return_value
except (ValidationError, KeyError, TypeError):
return value
if not TYPE_CHECKING:
# When defining __getattribute__, mypy allows accessing non-existent
# attributes without failing
# (see https://github.com/python/mypy/issues/13319).
__getattribute__ = __custom_getattribute__
def _migrate_config(self) -> None:
"""Migrates the global config to the latest version."""
curr_version = version.parse(__version__)
if self.version is None:
logger.info(
"Initializing the ZenML global configuration version to %s",
curr_version,
)
else:
config_version = version.parse(self.version)
if config_version > curr_version:
logger.error(
"The ZenML global configuration version (%s) is higher "
"than the version of ZenML currently being used (%s). "
"This may happen if you recently downgraded ZenML to an "
"earlier version, or if you have already used a more "
"recent ZenML version on the same machine. "
"It is highly recommended that you update ZenML to at "
"least match the global configuration version, otherwise "
"you may run into unexpected issues such as model schema "
"validation failures or even loss of information.",
config_version,
curr_version,
)
# TODO [ENG-899]: Give more detailed instruction on how to resolve
# version mismatch.
return
if config_version == curr_version:
return
logger.info(
"Migrating the ZenML global configuration from version %s "
"to version %s...",
config_version,
curr_version,
)
# this will also trigger rewriting the config file to disk
# to ensure the schema migration results are persisted
self.version = __version__
def _read_config(self) -> Dict[str, Any]:
"""Reads configuration options from disk.
If the config file doesn't exist yet, this method returns an empty
dictionary.
Returns:
A dictionary containing the configuration options.
"""
config_values = {}
if fileio.exists(self._config_file()):
config_values = cast(
Dict[str, Any],
yaml_utils.read_yaml(self._config_file()),
)
return config_values
def _write_config(self, config_path: Optional[str] = None) -> None:
"""Writes the global configuration options to disk.
Args:
config_path: custom config file path. When not specified, the default
global configuration path is used.
"""
config_file = self._config_file(config_path)
yaml_dict = json.loads(self.json())
logger.debug(f"Writing config to {config_file}")
if not fileio.exists(config_file):
io_utils.create_dir_recursive_if_not_exists(
config_path or self.config_directory
)
yaml_utils.write_yaml(config_file, yaml_dict)
def _configure_store(
self,
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Configure the global zen store.
This method creates and initializes the global store according to the
the supplied configuration.
Args:
config: The new store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store
constructor.
"""
from zenml.zen_stores.base_zen_store import BaseZenStore
store = BaseZenStore.create_store(
config, skip_default_registrations, **kwargs
)
if self.store != store.config or not self._zen_store:
logger.debug(f"Configuring the global store to {store.config}")
self.store = store.config
# We want to check if the active user has opted in or out for using
# an email address for marketing purposes and if so, record it in
# the analytics.
active_user = store.active_user
if active_user.email_opted_in is not None:
self.record_email_opt_in_out(
opted_in=active_user.email_opted_in,
email=active_user.email,
source=AnalyticsEventSource.ZENML_SERVER,
)
self._zen_store = store
# Sanitize the global configuration to reflect the new store
self._sanitize_config()
self._write_config()
def _sanitize_config(self) -> None:
"""Sanitize and save the global configuration.
This method is called to ensure that the active stack and project
are set to their default values, if possible.
"""
active_project, active_stack = self.zen_store.validate_active_config(
self.active_project_name,
self.active_stack_id,
config_name="global",
)
self.set_active_project(active_project)
self.active_stack_id = active_stack.id
@staticmethod
def default_config_directory() -> str:
"""Path to the default global configuration directory.
Returns:
The default global configuration directory.
"""
return io_utils.get_global_config_directory()
def _config_file(self, config_path: Optional[str] = None) -> str:
"""Path to the file where global configuration options are stored.
Args:
config_path: custom config file path. When not specified, the
default global configuration path is used.
Returns:
The path to the global configuration file.
"""
return os.path.join(config_path or self._config_path, "config.yaml")
def copy_configuration(
self,
config_path: str,
load_config_path: Optional[PurePath] = None,
store_config: Optional[StoreConfiguration] = None,
) -> "GlobalConfiguration":
"""Create a copy of the global config using a different configuration path.
This method is used to copy the global configuration and store it in a
different configuration path, where it can be loaded in the context of a
new environment, such as a container image.
If the global store configuration uses a local database, the database is
also copied to the new location.
Args:
config_path: path where the active configuration copy should be saved
load_config_path: path that will be used to load the configuration
copy. This can be set to a value different from `config_path`
if the configuration copy will be loaded from a different
path, e.g. when the global config copy is copied to a
container image. This will be reflected in the paths and URLs
encoded in the copied store configuration.
store_config: custom store configuration to use for the copied
global configuration. If not specified, the current global store
configuration is used.
Returns:
A new global configuration object copied to the specified path.
"""
from zenml.zen_stores.base_zen_store import BaseZenStore
# TODO: shouldn't this be done at the end?
self._write_config(config_path)
config_copy = GlobalConfiguration(config_path=config_path)
if store_config:
config_copy.store = store_config
elif self.store:
store_class = BaseZenStore.get_store_class(self.store.type)
store_config_copy = store_class.copy_local_store(
self.store, config_path, load_config_path
)
config_copy.store = store_config_copy
return config_copy
@property
def config_directory(self) -> str:
"""Directory where the global configuration file is located.
Returns:
The directory where the global configuration file is located.
"""
return self._config_path
@property
def local_stores_path(self) -> str:
"""Path where local stores information is stored.
Returns:
The path where local stores information is stored.
"""
return os.path.join(
self.config_directory,
LOCAL_STORES_DIRECTORY_NAME,
)
def get_default_store(self) -> StoreConfiguration:
"""Get the default store configuration.
Returns:
The default store configuration.
"""
from zenml.zen_stores.base_zen_store import BaseZenStore
env_config: Dict[str, str] = {}
for k, v in os.environ.items():
if v == "":
continue
if k.startswith(ENV_ZENML_STORE_PREFIX):
env_config[k[len(ENV_ZENML_STORE_PREFIX) :].lower()] = v
if len(env_config):
logger.debug(
"Using environment variables to configure the default store"
)
return StoreConfiguration(**env_config)
return BaseZenStore.get_default_store_config(
path=os.path.join(
self.local_stores_path,
DEFAULT_STORE_DIRECTORY_NAME,
)
)
def set_default_store(self) -> None:
"""Creates and sets the default store configuration.
Call this method to initialize or revert the store configuration to the
default store.
"""
default_store_cfg = self.get_default_store()
self._configure_store(default_store_cfg)
logger.info("Using the default store for the global config.")
track_event(
AnalyticsEvent.INITIALIZED_STORE,
{"store_type": default_store_cfg.type.value},
)
def set_store(
self,
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Update the active store configuration.
Call this method to validate and update the active store configuration.
Args:
config: The new store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store
constructor.
"""
self._configure_store(config, skip_default_registrations, **kwargs)
logger.info("Updated the global store configuration.")
if self.zen_store.type == StoreType.REST:
# Every time a client connects to a ZenML server, we want to
# group the client ID and the server ID together. This records
# only that a particular client has successfully connected to a
# particular server at least once, but no information about the
# user account is recorded here.
server_info = self.zen_store.get_store_info()
identify_group(
AnalyticsGroup.ZENML_SERVER_GROUP,
group_id=str(server_info.id),
group_metadata={
"version": server_info.version,
"deployment_type": str(server_info.deployment_type),
"database_type": str(server_info.database_type),
},
)
track_event(AnalyticsEvent.ZENML_SERVER_CONNECTED)
track_event(
AnalyticsEvent.INITIALIZED_STORE, {"store_type": config.type.value}
)
@property
def zen_store(self) -> "BaseZenStore":
"""Initialize and/or return the global zen store.
If the store hasn't been initialized yet, it is initialized when this
property is first accessed according to the global store configuration.
Returns:
The current zen store.
"""
if not self.store:
self.set_default_store()
elif self._zen_store is None:
self._configure_store(self.store)
assert self._zen_store is not None
return self._zen_store
@property
def active_project(self) -> "ProjectModel":
"""Get the currently active project of the local client.
Returns:
The active project.
Raises:
RuntimeError: If no project is active.
"""
if (
self._active_project
and self._active_project.name != self.active_project_name
):
# in case someone tries to set the active project name directly
# outside of this class
self._active_project = None
if not self._active_project:
if not self.active_project_name:
raise RuntimeError(
"No active project is configured. Run "
"`zenml project set PROJECT_NAME` to set the active "
"project."
)
self._active_project = self.zen_store.get_project(
project_name_or_id=self.active_project_name
)
return self._active_project
def set_active_project(self, project: "ProjectModel") -> None:
"""Set the project for the local client.
Args:
project: The project to set active.
"""
self.active_project_name = project.name
self._active_project = project
def record_email_opt_in_out(
self, opted_in: bool, email: Optional[str], source: AnalyticsEventSource
) -> None:
"""Set the email address associated with this client.
Args:
opted_in: Whether the user has opted in to email communication.
email: The email address to use for this client, if given.
source: The analytics event source.
"""
# Whenever a new email address is associated with the client, we want
# to identify the client by that email address. If the email address has
# been changed, we also want to update the information.
if opted_in and email and self.user_email != email:
identify_user(
{
"email": email,
"source": source,
}
)
self.user_email = email
if (
self.user_email_opt_in is None
or opted_in
and not self.user_email_opt_in
):
# When the user opts out giving the email for the first time, or
# when the user opts in after opting out (e.g. when connecting to
# a new server where the account has opt-in enabled), we want to
# record the information as an analytics event.
track_event(
AnalyticsEvent.OPT_IN_OUT_EMAIL,
{"opted_in": opted_in, "source": source},
)
self.user_email_opt_in = opted_in
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes from configs of previous ZenML versions to
# permit downgrading
extra = "allow"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
active_project: ProjectModel
property
readonly
Get the currently active project of the local client.
Returns:
Type | Description |
---|---|
ProjectModel |
The active project. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no project is active. |
config_directory: str
property
readonly
Directory where the global configuration file is located.
Returns:
Type | Description |
---|---|
str |
The directory where the global configuration file is located. |
local_stores_path: str
property
readonly
Path where local stores information is stored.
Returns:
Type | Description |
---|---|
str |
The path where local stores information is stored. |
zen_store: BaseZenStore
property
readonly
Initialize and/or return the global zen store.
If the store hasn't been initialized yet, it is initialized when this property is first accessed according to the global store configuration.
Returns:
Type | Description |
---|---|
BaseZenStore |
The current zen store. |
Config
Pydantic configuration class.
Source code in zenml/config/global_config.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes from configs of previous ZenML versions to
# permit downgrading
extra = "allow"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
__custom_getattribute__(self, key)
special
Gets an attribute value for a specific key.
If a value for this attribute was specified using an environment
variable called $(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)
and its
value can be parsed to the attribute type, the value from this
environment variable is returned instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The attribute name. |
required |
Returns:
Type | Description |
---|---|
Any |
The attribute value. |
Source code in zenml/config/global_config.py
def __custom_getattribute__(self, key: str) -> Any:
"""Gets an attribute value for a specific key.
If a value for this attribute was specified using an environment
variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
value can be parsed to the attribute type, the value from this
environment variable is returned instead.
Args:
key: The attribute name.
Returns:
The attribute value.
"""
value = super().__getattribute__(key)
if key.startswith("_"):
return value
environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
try:
environment_variable_value = os.environ[environment_variable_name]
# set the environment variable value to leverage Pydantic's type
# conversion and validation
super().__setattr__(key, environment_variable_value)
return_value = super().__getattribute__(key)
# set back the old value as we don't want to permanently store
# the environment variable value here
super().__setattr__(key, value)
return return_value
except (ValidationError, KeyError, TypeError):
return value
__getattribute__(self, key)
special
Gets an attribute value for a specific key.
If a value for this attribute was specified using an environment
variable called $(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)
and its
value can be parsed to the attribute type, the value from this
environment variable is returned instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The attribute name. |
required |
Returns:
Type | Description |
---|---|
Any |
The attribute value. |
Source code in zenml/config/global_config.py
def __custom_getattribute__(self, key: str) -> Any:
"""Gets an attribute value for a specific key.
If a value for this attribute was specified using an environment
variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
value can be parsed to the attribute type, the value from this
environment variable is returned instead.
Args:
key: The attribute name.
Returns:
The attribute value.
"""
value = super().__getattribute__(key)
if key.startswith("_"):
return value
environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
try:
environment_variable_value = os.environ[environment_variable_name]
# set the environment variable value to leverage Pydantic's type
# conversion and validation
super().__setattr__(key, environment_variable_value)
return_value = super().__getattribute__(key)
# set back the old value as we don't want to permanently store
# the environment variable value here
super().__setattr__(key, value)
return return_value
except (ValidationError, KeyError, TypeError):
return value
__init__(self, config_path=None, **kwargs)
special
Initializes a GlobalConfiguration object using values from the config file.
GlobalConfiguration is a singleton class: only one instance can exist. Calling this constructor multiple times will always yield the same instance (see the exception below).
The config_path
argument is only meant for internal use and testing
purposes. User code must never pass it to the constructor. When a custom
config_path
value is passed, an anonymous GlobalConfiguration instance
is created and returned independently of the GlobalConfiguration
singleton and that will have no effect as far as the rest of the ZenML
core code is concerned.
If the config file doesn't exist yet, we try to read values from the legacy (ZenML version < 0.6) config file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path |
Optional[str] |
(internal use) custom config file path. When not specified, the default global configuration path is used and the global configuration singleton instance is returned. Only used to create configuration copies for transfer to different runtime environments. |
None |
**kwargs |
Any |
keyword arguments |
{} |
Source code in zenml/config/global_config.py
def __init__(
self, config_path: Optional[str] = None, **kwargs: Any
) -> None:
"""Initializes a GlobalConfiguration object using values from the config file.
GlobalConfiguration is a singleton class: only one instance can exist.
Calling this constructor multiple times will always yield the same
instance (see the exception below).
The `config_path` argument is only meant for internal use and testing
purposes. User code must never pass it to the constructor. When a custom
`config_path` value is passed, an anonymous GlobalConfiguration instance
is created and returned independently of the GlobalConfiguration
singleton and that will have no effect as far as the rest of the ZenML
core code is concerned.
If the config file doesn't exist yet, we try to read values from the
legacy (ZenML version < 0.6) config file.
Args:
config_path: (internal use) custom config file path. When not
specified, the default global configuration path is used and the
global configuration singleton instance is returned. Only used
to create configuration copies for transfer to different
runtime environments.
**kwargs: keyword arguments
"""
self._config_path = config_path or self.default_config_directory()
config_values = self._read_config()
config_values.update(**kwargs)
super().__init__(**config_values)
if not fileio.exists(self._config_file(config_path)):
self._write_config()
__setattr__(self, key, value)
special
Sets an attribute on the config and persists the new value in the global configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The attribute name. |
required |
value |
Any |
The attribute value. |
required |
Source code in zenml/config/global_config.py
def __setattr__(self, key: str, value: Any) -> None:
"""Sets an attribute on the config and persists the new value in the global configuration.
Args:
key: The attribute name.
value: The attribute value.
"""
super().__setattr__(key, value)
if key.startswith("_"):
return
self._write_config()
copy_configuration(self, config_path, load_config_path=None, store_config=None)
Create a copy of the global config using a different configuration path.
This method is used to copy the global configuration and store it in a different configuration path, where it can be loaded in the context of a new environment, such as a container image.
If the global store configuration uses a local database, the database is also copied to the new location.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path |
str |
path where the active configuration copy should be saved |
required |
load_config_path |
Optional[pathlib.PurePath] |
path that will be used to load the configuration
copy. This can be set to a value different from |
None |
store_config |
Optional[zenml.config.store_config.StoreConfiguration] |
custom store configuration to use for the copied global configuration. If not specified, the current global store configuration is used. |
None |
Returns:
Type | Description |
---|---|
GlobalConfiguration |
A new global configuration object copied to the specified path. |
Source code in zenml/config/global_config.py
def copy_configuration(
self,
config_path: str,
load_config_path: Optional[PurePath] = None,
store_config: Optional[StoreConfiguration] = None,
) -> "GlobalConfiguration":
"""Create a copy of the global config using a different configuration path.
This method is used to copy the global configuration and store it in a
different configuration path, where it can be loaded in the context of a
new environment, such as a container image.
If the global store configuration uses a local database, the database is
also copied to the new location.
Args:
config_path: path where the active configuration copy should be saved
load_config_path: path that will be used to load the configuration
copy. This can be set to a value different from `config_path`
if the configuration copy will be loaded from a different
path, e.g. when the global config copy is copied to a
container image. This will be reflected in the paths and URLs
encoded in the copied store configuration.
store_config: custom store configuration to use for the copied
global configuration. If not specified, the current global store
configuration is used.
Returns:
A new global configuration object copied to the specified path.
"""
from zenml.zen_stores.base_zen_store import BaseZenStore
# TODO: shouldn't this be done at the end?
self._write_config(config_path)
config_copy = GlobalConfiguration(config_path=config_path)
if store_config:
config_copy.store = store_config
elif self.store:
store_class = BaseZenStore.get_store_class(self.store.type)
store_config_copy = store_class.copy_local_store(
self.store, config_path, load_config_path
)
config_copy.store = store_config_copy
return config_copy
default_config_directory()
staticmethod
Path to the default global configuration directory.
Returns:
Type | Description |
---|---|
str |
The default global configuration directory. |
Source code in zenml/config/global_config.py
@staticmethod
def default_config_directory() -> str:
"""Path to the default global configuration directory.
Returns:
The default global configuration directory.
"""
return io_utils.get_global_config_directory()
get_default_store(self)
Get the default store configuration.
Returns:
Type | Description |
---|---|
StoreConfiguration |
The default store configuration. |
Source code in zenml/config/global_config.py
def get_default_store(self) -> StoreConfiguration:
"""Get the default store configuration.
Returns:
The default store configuration.
"""
from zenml.zen_stores.base_zen_store import BaseZenStore
env_config: Dict[str, str] = {}
for k, v in os.environ.items():
if v == "":
continue
if k.startswith(ENV_ZENML_STORE_PREFIX):
env_config[k[len(ENV_ZENML_STORE_PREFIX) :].lower()] = v
if len(env_config):
logger.debug(
"Using environment variables to configure the default store"
)
return StoreConfiguration(**env_config)
return BaseZenStore.get_default_store_config(
path=os.path.join(
self.local_stores_path,
DEFAULT_STORE_DIRECTORY_NAME,
)
)
get_instance()
classmethod
Return the GlobalConfiguration singleton instance.
Returns:
Type | Description |
---|---|
Optional[GlobalConfiguration] |
The GlobalConfiguration singleton instance or None, if the GlobalConfiguration hasn't been initialized yet. |
Source code in zenml/config/global_config.py
@classmethod
def get_instance(cls) -> Optional["GlobalConfiguration"]:
"""Return the GlobalConfiguration singleton instance.
Returns:
The GlobalConfiguration singleton instance or None, if the
GlobalConfiguration hasn't been initialized yet.
"""
return cls._global_config
record_email_opt_in_out(self, opted_in, email, source)
Set the email address associated with this client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
opted_in |
bool |
Whether the user has opted in to email communication. |
required |
email |
Optional[str] |
The email address to use for this client, if given. |
required |
source |
AnalyticsEventSource |
The analytics event source. |
required |
Source code in zenml/config/global_config.py
def record_email_opt_in_out(
self, opted_in: bool, email: Optional[str], source: AnalyticsEventSource
) -> None:
"""Set the email address associated with this client.
Args:
opted_in: Whether the user has opted in to email communication.
email: The email address to use for this client, if given.
source: The analytics event source.
"""
# Whenever a new email address is associated with the client, we want
# to identify the client by that email address. If the email address has
# been changed, we also want to update the information.
if opted_in and email and self.user_email != email:
identify_user(
{
"email": email,
"source": source,
}
)
self.user_email = email
if (
self.user_email_opt_in is None
or opted_in
and not self.user_email_opt_in
):
# When the user opts out giving the email for the first time, or
# when the user opts in after opting out (e.g. when connecting to
# a new server where the account has opt-in enabled), we want to
# record the information as an analytics event.
track_event(
AnalyticsEvent.OPT_IN_OUT_EMAIL,
{"opted_in": opted_in, "source": source},
)
self.user_email_opt_in = opted_in
set_active_project(self, project)
Set the project for the local client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
ProjectModel |
The project to set active. |
required |
Source code in zenml/config/global_config.py
def set_active_project(self, project: "ProjectModel") -> None:
"""Set the project for the local client.
Args:
project: The project to set active.
"""
self.active_project_name = project.name
self._active_project = project
set_default_store(self)
Creates and sets the default store configuration.
Call this method to initialize or revert the store configuration to the default store.
Source code in zenml/config/global_config.py
def set_default_store(self) -> None:
"""Creates and sets the default store configuration.
Call this method to initialize or revert the store configuration to the
default store.
"""
default_store_cfg = self.get_default_store()
self._configure_store(default_store_cfg)
logger.info("Using the default store for the global config.")
track_event(
AnalyticsEvent.INITIALIZED_STORE,
{"store_type": default_store_cfg.type.value},
)
set_store(self, config, skip_default_registrations=False, **kwargs)
Update the active store configuration.
Call this method to validate and update the active store configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The new store configuration to use. |
required |
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the store constructor. |
{} |
Source code in zenml/config/global_config.py
def set_store(
self,
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Update the active store configuration.
Call this method to validate and update the active store configuration.
Args:
config: The new store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store
constructor.
"""
self._configure_store(config, skip_default_registrations, **kwargs)
logger.info("Updated the global store configuration.")
if self.zen_store.type == StoreType.REST:
# Every time a client connects to a ZenML server, we want to
# group the client ID and the server ID together. This records
# only that a particular client has successfully connected to a
# particular server at least once, but no information about the
# user account is recorded here.
server_info = self.zen_store.get_store_info()
identify_group(
AnalyticsGroup.ZENML_SERVER_GROUP,
group_id=str(server_info.id),
group_metadata={
"version": server_info.version,
"deployment_type": str(server_info.deployment_type),
"database_type": str(server_info.database_type),
},
)
track_event(AnalyticsEvent.ZENML_SERVER_CONNECTED)
track_event(
AnalyticsEvent.INITIALIZED_STORE, {"store_type": config.type.value}
)
generate_jwt_secret_key()
Generate a random JWT secret key.
This key is used to sign and verify generated JWT tokens.
Returns:
Type | Description |
---|---|
str |
A random JWT secret key. |
Source code in zenml/config/global_config.py
def generate_jwt_secret_key() -> str:
"""Generate a random JWT secret key.
This key is used to sign and verify generated JWT tokens.
Returns:
A random JWT secret key.
"""
return token_hex(32)
pipeline_configurations
Pipeline configuration classes.
PipelineConfiguration (PipelineConfigurationUpdate)
pydantic-model
Pipeline configuration class.
Source code in zenml/config/pipeline_configurations.py
class PipelineConfiguration(PipelineConfigurationUpdate):
"""Pipeline configuration class."""
name: str
enable_cache: bool
@validator("name")
def ensure_pipeline_name_allowed(cls, name: str) -> str:
"""Ensures the pipeline name is allowed.
Args:
name: Name of the pipeline.
Returns:
The validated name of the pipeline.
Raises:
ValueError: If the name is not allowed.
"""
if name in DISALLOWED_PIPELINE_NAMES:
raise ValueError(
f"Pipeline name '{name}' is not allowed since '{name}' is a "
"reserved key word. Please choose another name."
)
return name
@property
def docker_settings(self) -> "DockerSettings":
"""Docker settings of this pipeline.
Returns:
The Docker settings of this pipeline.
"""
from zenml.config import DockerSettings
model_or_dict: SettingsOrDict = self.settings.get(
DOCKER_SETTINGS_KEY, {}
)
return DockerSettings.parse_obj(model_or_dict)
docker_settings: DockerSettings
property
readonly
Docker settings of this pipeline.
Returns:
Type | Description |
---|---|
DockerSettings |
The Docker settings of this pipeline. |
ensure_pipeline_name_allowed(name)
classmethod
Ensures the pipeline name is allowed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the pipeline. |
required |
Returns:
Type | Description |
---|---|
str |
The validated name of the pipeline. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the name is not allowed. |
Source code in zenml/config/pipeline_configurations.py
@validator("name")
def ensure_pipeline_name_allowed(cls, name: str) -> str:
"""Ensures the pipeline name is allowed.
Args:
name: Name of the pipeline.
Returns:
The validated name of the pipeline.
Raises:
ValueError: If the name is not allowed.
"""
if name in DISALLOWED_PIPELINE_NAMES:
raise ValueError(
f"Pipeline name '{name}' is not allowed since '{name}' is a "
"reserved key word. Please choose another name."
)
return name
PipelineConfigurationUpdate (StrictBaseModel)
pydantic-model
Class for pipeline configuration updates.
Source code in zenml/config/pipeline_configurations.py
class PipelineConfigurationUpdate(StrictBaseModel):
"""Class for pipeline configuration updates."""
enable_cache: Optional[bool] = None
settings: Dict[str, BaseSettings] = {}
extra: Dict[str, Any] = {}
PipelineRunConfiguration (StrictBaseModel)
pydantic-model
Class for pipeline run configurations.
Source code in zenml/config/pipeline_configurations.py
class PipelineRunConfiguration(StrictBaseModel):
"""Class for pipeline run configurations."""
run_name: Optional[str] = None
enable_cache: Optional[bool] = None
schedule: Optional[Schedule] = None
steps: Dict[str, StepConfigurationUpdate] = {}
settings: Dict[str, BaseSettings] = {}
extra: Dict[str, Any] = {}
PipelineSpec (StrictBaseModel)
pydantic-model
Specification of a pipeline.
Source code in zenml/config/pipeline_configurations.py
class PipelineSpec(StrictBaseModel):
"""Specification of a pipeline."""
version: str = "0.1"
steps: List[StepSpec]
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline.
"""
if isinstance(other, PipelineSpec):
return self.steps == other.steps
return NotImplemented
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same pipeline. |
Source code in zenml/config/pipeline_configurations.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same pipeline.
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same pipeline.
"""
if isinstance(other, PipelineSpec):
return self.steps == other.steps
return NotImplemented
pipeline_deployment
Pipeline deployment.
PipelineDeployment (StrictBaseModel)
pydantic-model
Class representing the deployment of a ZenML pipeline.
Source code in zenml/config/pipeline_deployment.py
class PipelineDeployment(StrictBaseModel):
"""Class representing the deployment of a ZenML pipeline."""
zenml_version: str = zenml.__version__
run_name: str
schedule: Optional[Schedule] = None
stack_id: UUID
pipeline: PipelineConfiguration
pipeline_id: Optional[UUID] = None
proto_pipeline: str
steps: Dict[str, Step] = {}
def add_extra(self, key: str, value: Any) -> None:
"""Adds an extra key-value pair to the pipeline configuration.
Args:
key: Key for which to add the extra value.
value: The extra value.
"""
self.pipeline.extra[key] = value
def yaml(self, **kwargs: Any) -> str:
"""Yaml representation of the deployment.
Args:
**kwargs: Kwargs to pass to the pydantic json(...) method.
Returns:
Yaml string representation of the deployment.
"""
dict_ = json.loads(self.json(**kwargs, sort_keys=False))
return cast(str, yaml.dump(dict_, sort_keys=False))
add_extra(self, key, value)
Adds an extra key-value pair to the pipeline configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
Key for which to add the extra value. |
required |
value |
Any |
The extra value. |
required |
Source code in zenml/config/pipeline_deployment.py
def add_extra(self, key: str, value: Any) -> None:
"""Adds an extra key-value pair to the pipeline configuration.
Args:
key: Key for which to add the extra value.
value: The extra value.
"""
self.pipeline.extra[key] = value
yaml(self, **kwargs)
Yaml representation of the deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Kwargs to pass to the pydantic json(...) method. |
{} |
Returns:
Type | Description |
---|---|
str |
Yaml string representation of the deployment. |
Source code in zenml/config/pipeline_deployment.py
def yaml(self, **kwargs: Any) -> str:
"""Yaml representation of the deployment.
Args:
**kwargs: Kwargs to pass to the pydantic json(...) method.
Returns:
Yaml string representation of the deployment.
"""
dict_ = json.loads(self.json(**kwargs, sort_keys=False))
return cast(str, yaml.dump(dict_, sort_keys=False))
resource_settings
Resource settings class used to specify resources for a step.
ByteUnit (Enum)
Enum for byte units.
Source code in zenml/config/resource_settings.py
class ByteUnit(Enum):
"""Enum for byte units."""
KB = "KB"
KIB = "KiB"
MB = "MB"
MIB = "MiB"
GB = "GB"
GIB = "GiB"
TB = "TB"
TIB = "TiB"
PB = "PB"
PIB = "PiB"
@property
def byte_value(self) -> int:
"""Returns the amount of bytes that this unit represents.
Returns:
The byte value of this unit.
"""
return {
ByteUnit.KB: 10**3,
ByteUnit.KIB: 1 << 10,
ByteUnit.MB: 10**6,
ByteUnit.MIB: 1 << 20,
ByteUnit.GB: 10**9,
ByteUnit.GIB: 1 << 30,
ByteUnit.TB: 10**12,
ByteUnit.TIB: 1 << 40,
ByteUnit.PB: 10**15,
ByteUnit.PIB: 1 << 50,
}[self]
ResourceSettings (BaseSettings)
pydantic-model
Hardware resource settings.
Attributes:
Name | Type | Description |
---|---|---|
cpu_count |
Optional[pydantic.types.PositiveFloat] |
The amount of CPU cores that should be configured. |
gpu_count |
Optional[pydantic.types.NonNegativeInt] |
The amount of GPUs that should be configured. |
memory |
Optional[str] |
The amount of memory that should be configured. |
Source code in zenml/config/resource_settings.py
class ResourceSettings(BaseSettings):
"""Hardware resource settings.
Attributes:
cpu_count: The amount of CPU cores that should be configured.
gpu_count: The amount of GPUs that should be configured.
memory: The amount of memory that should be configured.
"""
cpu_count: Optional[PositiveFloat] = None
gpu_count: Optional[NonNegativeInt] = None
memory: Optional[str] = Field(regex=MEMORY_REGEX)
@property
def empty(self) -> bool:
"""Returns if this object is "empty" (=no values configured) or not.
Returns:
`True` if no values were configured, `False` otherwise.
"""
# To detect whether this config is empty (= no values specified), we
# check if there are any attributes which are explicitly set to any
# value other than `None`.
return len(self.dict(exclude_unset=True, exclude_none=True)) == 0
def get_memory(
self, unit: Union[str, ByteUnit] = ByteUnit.GB
) -> Optional[float]:
"""Gets the memory configuration in a specific unit.
Args:
unit: The unit to which the memory should be converted.
Raises:
ValueError: If the memory string is invalid.
Returns:
The memory configuration converted to the requested unit, or None
if no memory was configured.
"""
if not self.memory:
return None
if isinstance(unit, str):
unit = ByteUnit(unit)
memory = self.memory
for memory_unit in ByteUnit:
if memory.endswith(memory_unit.value):
memory_value = int(memory[: -len(memory_unit.value)])
return memory_value * memory_unit.byte_value / unit.byte_value
else:
# Should never happen due to the regex validation
raise ValueError(f"Unable to parse memory unit from '{memory}'.")
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# prevent extra attributes during model initialization
extra = Extra.forbid
empty: bool
property
readonly
Returns if this object is "empty" (=no values configured) or not.
Returns:
Type | Description |
---|---|
bool |
|
Config
Pydantic configuration class.
Source code in zenml/config/resource_settings.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# prevent extra attributes during model initialization
extra = Extra.forbid
get_memory(self, unit=<ByteUnit.GB: 'GB'>)
Gets the memory configuration in a specific unit.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unit |
Union[str, zenml.config.resource_settings.ByteUnit] |
The unit to which the memory should be converted. |
<ByteUnit.GB: 'GB'> |
Exceptions:
Type | Description |
---|---|
ValueError |
If the memory string is invalid. |
Returns:
Type | Description |
---|---|
Optional[float] |
The memory configuration converted to the requested unit, or None if no memory was configured. |
Source code in zenml/config/resource_settings.py
def get_memory(
self, unit: Union[str, ByteUnit] = ByteUnit.GB
) -> Optional[float]:
"""Gets the memory configuration in a specific unit.
Args:
unit: The unit to which the memory should be converted.
Raises:
ValueError: If the memory string is invalid.
Returns:
The memory configuration converted to the requested unit, or None
if no memory was configured.
"""
if not self.memory:
return None
if isinstance(unit, str):
unit = ByteUnit(unit)
memory = self.memory
for memory_unit in ByteUnit:
if memory.endswith(memory_unit.value):
memory_value = int(memory[: -len(memory_unit.value)])
return memory_value * memory_unit.byte_value / unit.byte_value
else:
# Should never happen due to the regex validation
raise ValueError(f"Unable to parse memory unit from '{memory}'.")
schedule
Class for defining a pipeline schedule.
Schedule (BaseModel)
pydantic-model
Class for defining a pipeline schedule.
Attributes:
Name | Type | Description |
---|---|---|
cron_expression |
Optional[str] |
Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval. |
start_time |
Optional[datetime.datetime] |
datetime object to indicate when to start the schedule. |
end_time |
Optional[datetime.datetime] |
datetime object to indicate when to end the schedule. |
interval_second |
Optional[datetime.timedelta] |
datetime timedelta indicating the seconds between two recurring runs for a periodic schedule. |
catchup |
bool |
Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill. |
Source code in zenml/config/schedule.py
class Schedule(BaseModel):
"""Class for defining a pipeline schedule.
Attributes:
cron_expression: Cron expression for the pipeline schedule. If a value
for this is set it takes precedence over the start time + interval.
start_time: datetime object to indicate when to start the schedule.
end_time: datetime object to indicate when to end the schedule.
interval_second: datetime timedelta indicating the seconds between two
recurring runs for a periodic schedule.
catchup: Whether the recurring run should catch up if behind schedule.
For example, if the recurring run is paused for a while and
re-enabled afterwards. If catchup=True, the scheduler will catch
up on (backfill) each missed interval. Otherwise, it only
schedules the latest interval if more than one interval is ready to
be scheduled. Usually, if your pipeline handles backfill
internally, you should turn catchup off to avoid duplicate backfill.
"""
cron_expression: Optional[str] = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
interval_second: Optional[datetime.timedelta] = None
catchup: bool = False
@root_validator
def _ensure_cron_or_periodic_schedule_configured(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Ensures that the cron expression or start time + interval are set.
Args:
values: All attributes of the schedule.
Returns:
All schedule attributes.
Raises:
ValueError: If no cron expression or start time + interval were
provided.
"""
cron_expression = values.get("cron_expression")
periodic_schedule = values.get("start_time") and values.get(
"interval_second"
)
if cron_expression and periodic_schedule:
logger.warning(
"This schedule was created with a cron expression as well as "
"values for `start_time` and `interval_seconds`. The resulting "
"behavior depends on the concrete orchestrator implementation "
"but will usually ignore the interval and use the cron "
"expression."
)
return values
elif cron_expression or periodic_schedule:
return values
else:
raise ValueError(
"Either a cron expression or start time and interval seconds "
"need to be set for a valid schedule."
)
@property
def utc_start_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC start time.
Returns:
Optional ISO-formatted string of the UTC start time.
"""
if not self.start_time:
return None
return self.start_time.astimezone(datetime.timezone.utc).isoformat()
@property
def utc_end_time(self) -> Optional[str]:
"""Optional ISO-formatted string of the UTC end time.
Returns:
Optional ISO-formatted string of the UTC end time.
"""
if not self.end_time:
return None
return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str]
property
readonly
Optional ISO-formatted string of the UTC end time.
Returns:
Type | Description |
---|---|
Optional[str] |
Optional ISO-formatted string of the UTC end time. |
utc_start_time: Optional[str]
property
readonly
Optional ISO-formatted string of the UTC start time.
Returns:
Type | Description |
---|---|
Optional[str] |
Optional ISO-formatted string of the UTC start time. |
secret_reference_mixin
Secret reference mixin implementation.
SecretReferenceMixin (BaseModel)
pydantic-model
Mixin class for secret references in pydantic model attributes.
Source code in zenml/config/secret_reference_mixin.py
class SecretReferenceMixin(BaseModel):
"""Mixin class for secret references in pydantic model attributes."""
def __init__(self, **kwargs: Any) -> None:
"""Ensures that secret references are only passed for valid fields.
This method ensures that secret references are not passed for fields
that explicitly prevent them or require pydantic validation.
Args:
**kwargs: Arguments to initialize this object.
Raises:
ValueError: If an attribute that requires custom pydantic validation
or an attribute which explicitly disallows secret references is
is passed as a secret reference.
"""
for key, value in kwargs.items():
try:
field = self.__class__.__fields__[key]
except KeyError:
# Value for a private attribute or non-existing field, this
# will fail during the upcoming pydantic validation
continue
if value is None:
continue
if not secret_utils.is_secret_reference(value):
if secret_utils.is_secret_field(field):
logger.warning(
"You specified a plain-text value for the sensitive "
f"attribute `{key}`. This is currently only a warning, "
"but future versions of ZenML will require you to pass "
"in sensitive information as secrets. Check out the "
"documentation on how to configure values with secrets "
"here: https://docs.zenml.io/advanced-guide/practical/secrets-management"
)
continue
if secret_utils.is_clear_text_field(field):
raise ValueError(
f"Passing the `{key}` attribute as a secret reference is "
"not allowed."
)
requires_validation = field.pre_validators or field.post_validators
if requires_validation:
raise ValueError(
f"Passing the attribute `{key}` as a secret reference is "
"not allowed as additional validation is required for "
"this attribute."
)
super().__init__(**kwargs)
def __custom_getattribute__(self, key: str) -> Any:
"""Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret
reference. In case of a secret reference, this method resolves the
reference and returns the secret value instead.
Args:
key: The key for which to get the attribute value.
Raises:
RuntimeError: If the active stack is missing a secrets manager.
KeyError: If the secret or secret key don't exist.
Returns:
The (potentially resolved) attribute value.
"""
value = super().__getattribute__(key)
if not secret_utils.is_secret_reference(value):
return value
from zenml.client import Client
secrets_manager = Client().active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key}: "
"The active stack does not have a secrets manager."
)
secret_ref = secret_utils.parse_secret_reference(value)
try:
secret = secrets_manager.get_secret(secret_ref.name)
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key}: "
f"The secret {secret_ref.name} does not exist."
)
try:
secret_value = secret.content[secret_ref.key]
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key}: "
f"The secret {secret_ref.name} does not contain a value for key "
f"{secret_ref.key}. Available keys: {set(secret.content)}."
)
return str(secret_value)
if not TYPE_CHECKING:
# When defining __getattribute__, mypy allows accessing non-existent
# attributes without failing
# (see https://github.com/python/mypy/issues/13319).
__getattribute__ = __custom_getattribute__
@property
def required_secrets(self) -> Set[secret_utils.SecretReference]:
"""All required secrets for this object.
Returns:
The required secrets of this object.
"""
return {
secret_utils.parse_secret_reference(v)
for v in self.dict().values()
if secret_utils.is_secret_reference(v)
}
required_secrets: Set[zenml.utils.secret_utils.SecretReference]
property
readonly
All required secrets for this object.
Returns:
Type | Description |
---|---|
Set[zenml.utils.secret_utils.SecretReference] |
The required secrets of this object. |
__custom_getattribute__(self, key)
special
Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key for which to get the attribute value. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the active stack is missing a secrets manager. |
KeyError |
If the secret or secret key don't exist. |
Returns:
Type | Description |
---|---|
Any |
The (potentially resolved) attribute value. |
Source code in zenml/config/secret_reference_mixin.py
def __custom_getattribute__(self, key: str) -> Any:
"""Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret
reference. In case of a secret reference, this method resolves the
reference and returns the secret value instead.
Args:
key: The key for which to get the attribute value.
Raises:
RuntimeError: If the active stack is missing a secrets manager.
KeyError: If the secret or secret key don't exist.
Returns:
The (potentially resolved) attribute value.
"""
value = super().__getattribute__(key)
if not secret_utils.is_secret_reference(value):
return value
from zenml.client import Client
secrets_manager = Client().active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key}: "
"The active stack does not have a secrets manager."
)
secret_ref = secret_utils.parse_secret_reference(value)
try:
secret = secrets_manager.get_secret(secret_ref.name)
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key}: "
f"The secret {secret_ref.name} does not exist."
)
try:
secret_value = secret.content[secret_ref.key]
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key}: "
f"The secret {secret_ref.name} does not contain a value for key "
f"{secret_ref.key}. Available keys: {set(secret.content)}."
)
return str(secret_value)
__getattribute__(self, key)
special
Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
The key for which to get the attribute value. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the active stack is missing a secrets manager. |
KeyError |
If the secret or secret key don't exist. |
Returns:
Type | Description |
---|---|
Any |
The (potentially resolved) attribute value. |
Source code in zenml/config/secret_reference_mixin.py
def __custom_getattribute__(self, key: str) -> Any:
"""Returns the (potentially resolved) attribute value for the given key.
An attribute value may be either specified directly, or as a secret
reference. In case of a secret reference, this method resolves the
reference and returns the secret value instead.
Args:
key: The key for which to get the attribute value.
Raises:
RuntimeError: If the active stack is missing a secrets manager.
KeyError: If the secret or secret key don't exist.
Returns:
The (potentially resolved) attribute value.
"""
value = super().__getattribute__(key)
if not secret_utils.is_secret_reference(value):
return value
from zenml.client import Client
secrets_manager = Client().active_stack.secrets_manager
if not secrets_manager:
raise RuntimeError(
f"Failed to resolve secret reference for attribute {key}: "
"The active stack does not have a secrets manager."
)
secret_ref = secret_utils.parse_secret_reference(value)
try:
secret = secrets_manager.get_secret(secret_ref.name)
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key}: "
f"The secret {secret_ref.name} does not exist."
)
try:
secret_value = secret.content[secret_ref.key]
except KeyError:
raise KeyError(
f"Failed to resolve secret reference for attribute {key}: "
f"The secret {secret_ref.name} does not contain a value for key "
f"{secret_ref.key}. Available keys: {set(secret.content)}."
)
return str(secret_value)
__init__(self, **kwargs)
special
Ensures that secret references are only passed for valid fields.
This method ensures that secret references are not passed for fields that explicitly prevent them or require pydantic validation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Arguments to initialize this object. |
{} |
Exceptions:
Type | Description |
---|---|
ValueError |
If an attribute that requires custom pydantic validation or an attribute which explicitly disallows secret references is is passed as a secret reference. |
Source code in zenml/config/secret_reference_mixin.py
def __init__(self, **kwargs: Any) -> None:
"""Ensures that secret references are only passed for valid fields.
This method ensures that secret references are not passed for fields
that explicitly prevent them or require pydantic validation.
Args:
**kwargs: Arguments to initialize this object.
Raises:
ValueError: If an attribute that requires custom pydantic validation
or an attribute which explicitly disallows secret references is
is passed as a secret reference.
"""
for key, value in kwargs.items():
try:
field = self.__class__.__fields__[key]
except KeyError:
# Value for a private attribute or non-existing field, this
# will fail during the upcoming pydantic validation
continue
if value is None:
continue
if not secret_utils.is_secret_reference(value):
if secret_utils.is_secret_field(field):
logger.warning(
"You specified a plain-text value for the sensitive "
f"attribute `{key}`. This is currently only a warning, "
"but future versions of ZenML will require you to pass "
"in sensitive information as secrets. Check out the "
"documentation on how to configure values with secrets "
"here: https://docs.zenml.io/advanced-guide/practical/secrets-management"
)
continue
if secret_utils.is_clear_text_field(field):
raise ValueError(
f"Passing the `{key}` attribute as a secret reference is "
"not allowed."
)
requires_validation = field.pre_validators or field.post_validators
if requires_validation:
raise ValueError(
f"Passing the attribute `{key}` as a secret reference is "
"not allowed as additional validation is required for "
"this attribute."
)
super().__init__(**kwargs)
settings_resolver
Class for resolving settings.
SettingsResolver
Class for resolving settings.
This class converts a BaseSettings
instance to the correct subclass
depending on the key for which these settings were specified.
Source code in zenml/config/settings_resolver.py
class SettingsResolver:
"""Class for resolving settings.
This class converts a `BaseSettings` instance to the correct subclass
depending on the key for which these settings were specified.
"""
def __init__(self, key: str, settings: "BaseSettings"):
"""Checks if the settings key is valid.
Args:
key: Settings key.
settings: The settings.
Raises:
ValueError: If the settings key is invalid.
"""
if not settings_utils.is_valid_setting_key(key):
raise ValueError(
f"Invalid setting key `{key}`. Setting keys can either refer "
"to general settings (available keys: "
f"{set(settings_utils.get_general_settings())}) or stack "
"component specific settings. Stack component specific keys "
"are of the format "
"`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
)
self._key = key
self._settings = settings
def resolve(self, stack: "Stack") -> "BaseSettings":
"""Resolves settings for the given stack.
Args:
stack: The stack for which to resolve the settings.
Returns:
The resolved settings.
"""
if settings_utils.is_general_setting_key(self._key):
target_class = self._resolve_general_settings_class()
else:
target_class = self._resolve_stack_component_setting_class(
stack=stack
)
return self._convert_settings(target_class=target_class)
def _resolve_general_settings_class(
self,
) -> Type["BaseSettings"]:
"""Resolves general settings.
Returns:
The resolved settings.
"""
return settings_utils.get_general_settings()[self._key]
def _resolve_stack_component_setting_class(
self, stack: "Stack"
) -> Type["BaseSettings"]:
"""Resolves stack component settings with the given stack.
Args:
stack: The stack to use for resolving.
Raises:
KeyError: If the stack contains no settings for the key.
Returns:
The resolved settings.
"""
settings_class = stack.setting_classes.get(self._key)
if not settings_class:
raise KeyError(
f"Failed to resolve settings for key {self._key}: "
"No settings for this key exist in the stack. "
"Available settings: "
f"{set(stack.setting_classes)}"
)
return settings_class
def _convert_settings(self, target_class: Type["T"]) -> "T":
"""Converts the settings to their correct class.
Args:
target_class: The correct settings class.
Raises:
SettingsResolvingError: If the conversion failed.
Returns:
The converted settings.
"""
settings_dict = self._settings.dict()
try:
return target_class(**settings_dict)
except ValidationError:
raise SettingsResolvingError(
f"Failed to convert settings `{settings_dict}` to expected "
f"class {target_class}."
)
__init__(self, key, settings)
special
Checks if the settings key is valid.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
Settings key. |
required |
settings |
BaseSettings |
The settings. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the settings key is invalid. |
Source code in zenml/config/settings_resolver.py
def __init__(self, key: str, settings: "BaseSettings"):
"""Checks if the settings key is valid.
Args:
key: Settings key.
settings: The settings.
Raises:
ValueError: If the settings key is invalid.
"""
if not settings_utils.is_valid_setting_key(key):
raise ValueError(
f"Invalid setting key `{key}`. Setting keys can either refer "
"to general settings (available keys: "
f"{set(settings_utils.get_general_settings())}) or stack "
"component specific settings. Stack component specific keys "
"are of the format "
"`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
)
self._key = key
self._settings = settings
resolve(self, stack)
Resolves settings for the given stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
Stack |
The stack for which to resolve the settings. |
required |
Returns:
Type | Description |
---|---|
BaseSettings |
The resolved settings. |
Source code in zenml/config/settings_resolver.py
def resolve(self, stack: "Stack") -> "BaseSettings":
"""Resolves settings for the given stack.
Args:
stack: The stack for which to resolve the settings.
Returns:
The resolved settings.
"""
if settings_utils.is_general_setting_key(self._key):
target_class = self._resolve_general_settings_class()
else:
target_class = self._resolve_stack_component_setting_class(
stack=stack
)
return self._convert_settings(target_class=target_class)
step_configurations
Pipeline configuration classes.
ArtifactConfiguration (PartialArtifactConfiguration)
pydantic-model
Class representing a complete input/output artifact configuration.
Source code in zenml/config/step_configurations.py
class ArtifactConfiguration(PartialArtifactConfiguration):
"""Class representing a complete input/output artifact configuration."""
artifact_source: str
materializer_source: str
PartialArtifactConfiguration (StrictBaseModel)
pydantic-model
Class representing a partial input/output artifact configuration.
Source code in zenml/config/step_configurations.py
class PartialArtifactConfiguration(StrictBaseModel):
"""Class representing a partial input/output artifact configuration."""
artifact_source: Optional[str] = None
materializer_source: Optional[str] = None
PartialStepConfiguration (StepConfigurationUpdate)
pydantic-model
Class representing a partial step configuration.
Source code in zenml/config/step_configurations.py
class PartialStepConfiguration(StepConfigurationUpdate):
"""Class representing a partial step configuration."""
name: str
enable_cache: bool
inputs: Mapping[str, PartialArtifactConfiguration] = {}
outputs: Mapping[str, PartialArtifactConfiguration] = {}
Step (StrictBaseModel)
pydantic-model
Class representing a ZenML step.
Source code in zenml/config/step_configurations.py
class Step(StrictBaseModel):
"""Class representing a ZenML step."""
spec: StepSpec
config: StepConfiguration
StepConfiguration (PartialStepConfiguration)
pydantic-model
Step configuration class.
Source code in zenml/config/step_configurations.py
class StepConfiguration(PartialStepConfiguration):
"""Step configuration class."""
docstring: Optional[str]
inputs: Mapping[str, ArtifactConfiguration] = {}
outputs: Mapping[str, ArtifactConfiguration] = {}
@property
def resource_settings(self) -> "ResourceSettings":
"""Resource settings of this step configuration.
Returns:
The resource settings of this step configuration.
"""
from zenml.config import ResourceSettings
model_or_dict: SettingsOrDict = self.settings.get(
RESOURCE_SETTINGS_KEY, {}
)
return ResourceSettings.parse_obj(model_or_dict)
resource_settings: ResourceSettings
property
readonly
Resource settings of this step configuration.
Returns:
Type | Description |
---|---|
ResourceSettings |
The resource settings of this step configuration. |
StepConfigurationUpdate (StrictBaseModel)
pydantic-model
Class for step configuration updates.
Source code in zenml/config/step_configurations.py
class StepConfigurationUpdate(StrictBaseModel):
"""Class for step configuration updates."""
enable_cache: Optional[bool] = None
step_operator: Optional[str] = None
experiment_tracker: Optional[str] = None
parameters: Dict[str, Any] = {}
settings: Dict[str, BaseSettings] = {}
extra: Dict[str, Any] = {}
outputs: Mapping[str, PartialArtifactConfiguration] = {}
StepSpec (StrictBaseModel)
pydantic-model
Specification of a pipeline.
Source code in zenml/config/step_configurations.py
class StepSpec(StrictBaseModel):
"""Specification of a pipeline."""
source: str
upstream_steps: List[str]
@property
def module_name(self) -> str:
"""The step module name.
Returns:
The step module name.
"""
module_name, _ = self.source.rsplit(".", maxsplit=1)
return module_name
@property
def class_name(self) -> str:
"""The step class name.
Returns:
The step class name.
"""
_, class_name = self.source.rsplit(".", maxsplit=1)
return class_name
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step.
This is the case if the other objects is a `StepSpec` with the same
`upstream_steps` and a `source` that meets one of the following
conditions:
- it is the same as the `source` of this step
- it refers to the same absolute path as the `source` of this step
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same step.
"""
if isinstance(other, StepSpec):
if self.upstream_steps != other.upstream_steps:
return False
if self.source == other.source:
return True
if self.source.endswith(other.source):
return True
if other.source.endswith(self.source):
return True
return False
return NotImplemented
class_name: str
property
readonly
The step class name.
Returns:
Type | Description |
---|---|
str |
The step class name. |
module_name: str
property
readonly
The step module name.
Returns:
Type | Description |
---|---|
str |
The step module name. |
__eq__(self, other)
special
Returns whether the other object is referring to the same step.
This is the case if the other objects is a StepSpec
with the same
upstream_steps
and a source
that meets one of the following
!!! conditions
- it is the same as the source
of this step
- it refers to the same absolute path as the source
of this step
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
Any |
The other object to compare to. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the other object is referring to the same step. |
Source code in zenml/config/step_configurations.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step.
This is the case if the other objects is a `StepSpec` with the same
`upstream_steps` and a `source` that meets one of the following
conditions:
- it is the same as the `source` of this step
- it refers to the same absolute path as the `source` of this step
Args:
other: The other object to compare to.
Returns:
True if the other object is referring to the same step.
"""
if isinstance(other, StepSpec):
if self.upstream_steps != other.upstream_steps:
return False
if self.source == other.source:
return True
if self.source.endswith(other.source):
return True
if other.source.endswith(self.source):
return True
return False
return NotImplemented
step_run_info
Step run info.
StepRunInfo (StrictBaseModel)
pydantic-model
All information necessary to run a step.
Source code in zenml/config/step_run_info.py
class StepRunInfo(StrictBaseModel):
"""All information necessary to run a step."""
config: StepConfiguration
pipeline: PipelineConfiguration
run_name: str
store_config
Functionality to support ZenML store configurations.
StoreConfiguration (BaseModel)
pydantic-model
Generic store configuration.
The store configurations of concrete store implementations must inherit from this class and validate any extra attributes that are configured in addition to those defined in this class.
Attributes:
Name | Type | Description |
---|---|---|
type |
StoreType |
The type of store backend. |
url |
str |
The URL of the store backend. |
Source code in zenml/config/store_config.py
class StoreConfiguration(BaseModel):
"""Generic store configuration.
The store configurations of concrete store implementations must inherit from
this class and validate any extra attributes that are configured in addition
to those defined in this class.
Attributes:
type: The type of store backend.
url: The URL of the store backend.
"""
type: StoreType
url: str
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes to be set in the base class. The concrete
# classes are responsible for validating the attributes.
extra = "allow"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
Config
Pydantic configuration class.
Source code in zenml/config/store_config.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes to be set in the base class. The concrete
# classes are responsible for validating the attributes.
extra = "allow"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
strict_base_model
Strict immutable pydantic model.
StrictBaseModel (BaseModel)
pydantic-model
Immutable pydantic model which prevents extra attributes.
Source code in zenml/config/strict_base_model.py
class StrictBaseModel(BaseModel):
"""Immutable pydantic model which prevents extra attributes."""
class Config:
"""Pydantic config class."""
allow_mutation = False
extra = Extra.forbid
Config
Pydantic config class.
Source code in zenml/config/strict_base_model.py
class Config:
"""Pydantic config class."""
allow_mutation = False
extra = Extra.forbid