Skip to content

Config

zenml.config special

The config module contains classes and functions that manage user-specific configuration.

ZenML's configuration is stored in a file called config.yaml, located on the user's directory for configuration files. (The exact location differs from operating system to operating system.)

The GlobalConfiguration class is the main class in this module. It provides a Pydantic configuration object that is used to store and retrieve configuration. This GlobalConfiguration object handles the serialization and deserialization of the configuration options that are stored in the file in order to persist the configuration across sessions.

base_settings

Base class for all ZenML settings.

BaseSettings (SecretReferenceMixin) pydantic-model

Base class for settings.

The LEVEL class variable defines on which level the settings can be specified. By default, subclasses can be defined on both pipelines and steps.

Source code in zenml/config/base_settings.py
class BaseSettings(SecretReferenceMixin):
    """Base class for settings.

    The `LEVEL` class variable defines on which level the settings can be
    specified. By default, subclasses can be defined on both pipelines and
    steps.
    """

    LEVEL: ClassVar[ConfigurationLevel] = (
        ConfigurationLevel.PIPELINE | ConfigurationLevel.STEP
    )

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # allow extra attributes so this class can be used to parse dicts
        # of arbitrary subclasses
        extra = Extra.allow
Config

Pydantic configuration class.

Source code in zenml/config/base_settings.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # allow extra attributes so this class can be used to parse dicts
    # of arbitrary subclasses
    extra = Extra.allow

ConfigurationLevel (IntFlag)

Settings configuration level.

Bit flag that can be used to specify where a BaseSettings subclass can be specified.

Source code in zenml/config/base_settings.py
class ConfigurationLevel(IntFlag):
    """Settings configuration level.

    Bit flag that can be used to specify where a `BaseSettings` subclass
    can be specified.
    """

    STEP = auto()
    PIPELINE = auto()

compiler

Class for compiling ZenML pipelines into a serializable format.

Compiler

Compiles ZenML pipelines to serializable representations.

Source code in zenml/config/compiler.py
class Compiler:
    """Compiles ZenML pipelines to serializable representations."""

    def compile(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        run_configuration: PipelineRunConfiguration,
    ) -> PipelineDeployment:
        """Compiles a ZenML pipeline to a serializable representation.

        Args:
            pipeline: The pipeline to compile.
            stack: The stack on which the pipeline will run.
            run_configuration: The run configuration for this pipeline.

        Returns:
            The compiled pipeline.
        """
        logger.debug("Compiling pipeline `%s`.", pipeline.name)
        # Copy the pipeline before we apply any run-level configurations so
        # we don't mess with the pipeline object/step objects in any way
        pipeline = copy.deepcopy(pipeline)
        self._apply_run_configuration(
            pipeline=pipeline, config=run_configuration
        )
        self._apply_stack_default_settings(pipeline=pipeline, stack=stack)
        self._verify_distinct_step_names(pipeline=pipeline)
        if run_configuration.run_name:
            self._verify_run_name(run_configuration.run_name)

        pipeline.connect(**pipeline.steps)

        pipeline_settings = self._filter_and_validate_settings(
            settings=pipeline.configuration.settings,
            configuration_level=ConfigurationLevel.PIPELINE,
            stack=stack,
        )
        pipeline.configure(settings=pipeline_settings, merge=False)

        settings_to_passdown = {
            key: settings
            for key, settings in pipeline_settings.items()
            if ConfigurationLevel.STEP in settings.LEVEL
        }

        steps = {
            name: self._compile_step(
                step=step,
                pipeline_settings=settings_to_passdown,
                pipeline_extra=pipeline.configuration.extra,
                stack=stack,
            )
            for name, step in self._get_sorted_steps(steps=pipeline.steps)
        }

        self._ensure_required_stack_components_exist(
            stack=stack, steps=list(steps.values())
        )

        run_name = run_configuration.run_name or self._get_default_run_name(
            pipeline_name=pipeline.name
        )

        deployment = PipelineDeployment(
            run_name=run_name,
            stack_id=stack.id,
            schedule=run_configuration.schedule,
            pipeline=pipeline.configuration,
            steps=steps,
            client_environment=get_run_environment_dict(),
        )
        logger.debug("Compiled pipeline deployment: %s", deployment)
        return deployment

    def _apply_run_configuration(
        self, pipeline: "BasePipeline", config: PipelineRunConfiguration
    ) -> None:
        """Applies run configurations to the pipeline and its steps.

        Args:
            pipeline: The pipeline to configure.
            config: The run configurations.

        Raises:
            KeyError: If the run configuration contains options for a
                non-existent step.
        """
        pipeline.configure(
            enable_cache=config.enable_cache,
            settings=config.settings,
            extra=config.extra,
        )

        for step_name, step_config in config.steps.items():
            if step_name not in pipeline.steps:
                raise KeyError(f"No step with name {step_name}.")
            pipeline.steps[step_name]._apply_configuration(step_config)

        # Override `enable_cache` of all steps if set at run level
        if config.enable_cache is not None:
            for step_ in pipeline.steps.values():
                step_.configure(enable_cache=config.enable_cache)

    def _apply_stack_default_settings(
        self, pipeline: "BasePipeline", stack: "Stack"
    ) -> None:
        """Applies stack default settings to a pipeline.

        Args:
            pipeline: The pipeline to which to apply the default settings.
            stack: The stack containing potential default settings.
        """
        pipeline_settings = pipeline.configuration.settings

        for component in stack.components.values():
            if not component.settings_class:
                continue

            settings_key = settings_utils.get_stack_component_setting_key(
                component
            )
            default_settings = self._get_default_settings(component)

            if settings_key in pipeline_settings:
                combined_settings = pydantic_utils.update_model(
                    default_settings, update=pipeline_settings[settings_key]
                )
                pipeline_settings[settings_key] = combined_settings
            else:
                pipeline_settings[settings_key] = default_settings

        pipeline.configure(settings=pipeline_settings, merge=False)

    def _get_default_settings(
        self,
        stack_component: "StackComponent",
    ) -> "BaseSettings":
        """Gets default settings configured on a stack component.

        Args:
            stack_component: The stack component for which to get the settings.

        Returns:
            The settings configured on the stack component.
        """
        assert stack_component.settings_class
        # Exclude additional config attributes that aren't part of the settings
        field_names = set(stack_component.settings_class.__fields__)
        default_settings = stack_component.settings_class.parse_obj(
            stack_component.config.dict(
                include=field_names, exclude_unset=True, exclude_defaults=True
            )
        )
        return default_settings

    def _verify_distinct_step_names(self, pipeline: "BasePipeline") -> None:
        """Verifies that all steps inside the pipeline have separate names.

        Args:
            pipeline: The pipeline to verify.

        Raises:
            PipelineInterfaceError: If multiple steps share the same name.
        """
        step_names: Dict[str, str] = {}

        for step_argument_name, step in pipeline.steps.items():
            previous_argument_name = step_names.get(step.name, None)
            if previous_argument_name:
                raise PipelineInterfaceError(
                    f"Found multiple step objects with the same name "
                    f"`{step.name}` for arguments '{previous_argument_name}' "
                    f"and '{step_argument_name}' in pipeline "
                    f"'{pipeline.name}'. All steps of a ZenML pipeline need "
                    "to have distinct names. To solve this issue, assign a new "
                    "name to one of the steps by calling "
                    "`my_step_instance.configure(name='some_distinct_name')`"
                )

            step_names[step.name] = step_argument_name

    @staticmethod
    def _verify_run_name(run_name: str) -> None:
        """Verifies that the run name contains only valid placeholders.

        Args:
            run_name: The run name to verify.

        Raises:
            ValueError: If the run name contains invalid placeholders.
        """
        valid_placeholder_names = {"date", "time"}
        placeholders = {
            v[1] for v in string.Formatter().parse(run_name) if v[1]
        }
        if not placeholders.issubset(valid_placeholder_names):
            raise ValueError(
                f"Invalid run name {run_name}. Only the placeholders "
                f"{valid_placeholder_names} are allowed in run names."
            )

    def _filter_and_validate_settings(
        self,
        settings: Dict[str, "BaseSettings"],
        configuration_level: ConfigurationLevel,
        stack: "Stack",
    ) -> Dict[str, "BaseSettings"]:
        """Filters and validates settings.

        Args:
            settings: The settings to check.
            configuration_level: The level on which these settings
                were configured.
            stack: The stack on which the pipeline will run.

        Raises:
            TypeError: If settings with an unsupported configuration
                level were specified.

        Returns:
            The filtered settings.
        """
        validated_settings = {}

        for key, settings_instance in settings.items():
            resolver = SettingsResolver(key=key, settings=settings_instance)
            try:
                settings_instance = resolver.resolve(stack=stack)
            except KeyError:
                logger.info(
                    "Not including stack component settings with key `%s`.",
                    key,
                )
                continue

            if configuration_level not in settings_instance.LEVEL:
                raise TypeError(
                    f"The settings class {settings_instance.__class__} can not "
                    f"be specified on a {configuration_level.name} level."
                )
            validated_settings[key] = settings_instance

        return validated_settings

    def _get_step_spec(self, step: "BaseStep") -> StepSpec:
        """Gets the spec for a step.

        Args:
            step: The step for which to get the spec.

        Returns:
            The step spec.
        """
        return StepSpec(
            source=source_utils.resolve_class(step.__class__),
            upstream_steps=sorted(step.upstream_steps),
            inputs=step.inputs,
        )

    def _compile_step(
        self,
        step: "BaseStep",
        pipeline_settings: Dict[str, "BaseSettings"],
        pipeline_extra: Dict[str, Any],
        stack: "Stack",
    ) -> Step:
        """Compiles a ZenML step.

        Args:
            step: The step to compile.
            pipeline_settings: settings configured on the
                pipeline of the step.
            pipeline_extra: Extra values configured on the pipeline of the step.
            stack: The stack on which the pipeline will be run.

        Returns:
            The compiled step.
        """
        step_spec = self._get_step_spec(step=step)
        step_settings = self._filter_and_validate_settings(
            settings=step.configuration.settings,
            configuration_level=ConfigurationLevel.STEP,
            stack=stack,
        )

        merged_settings = {
            **pipeline_settings,
            **step_settings,
        }
        merged_extras = {**pipeline_extra, **step.configuration.extra}

        step.configure(
            settings=merged_settings,
            extra=merged_extras,
            merge=False,
        )

        complete_step_configuration = StepConfiguration(
            **step.configuration.dict()
        )

        return Step(spec=step_spec, config=complete_step_configuration)

    @staticmethod
    def _get_default_run_name(pipeline_name: str) -> str:
        """Gets the default name for a pipeline run.

        Args:
            pipeline_name: Name of the pipeline which will be run.

        Returns:
            Run name.
        """
        return f"{pipeline_name}-{{date}}-{{time}}"

    @staticmethod
    def _get_sorted_steps(
        steps: Dict[str, "BaseStep"]
    ) -> List[Tuple[str, "BaseStep"]]:
        """Sorts the steps of a pipeline using topological sort.

        The resulting list of steps will be in an order that can be executed
        sequentially without any conflicts.

        Args:
            steps: ZenML pipeline steps.

        Returns:
            The sorted steps.
        """
        from zenml.orchestrators.dag_runner import reverse_dag
        from zenml.orchestrators.topsort import topsorted_layers

        # Sort step names using topological sort
        dag: Dict[str, List[str]] = {
            step.name: list(step.upstream_steps) for step in steps.values()
        }
        reversed_dag: Dict[str, List[str]] = reverse_dag(dag)
        layers = topsorted_layers(
            nodes=[step.name for step in steps.values()],
            get_node_id_fn=lambda node: node,
            get_parent_nodes=lambda node: dag[node],
            get_child_nodes=lambda node: reversed_dag[node],
        )
        sorted_step_names = [step for layer in layers for step in layer]

        # Construct pipeline name to step mapping
        step_name_to_name_in_pipeline: Dict[str, str] = {
            step.name: name_in_pipeline
            for name_in_pipeline, step in steps.items()
        }
        sorted_names_in_pipeline: List[str] = [
            step_name_to_name_in_pipeline[step_name]
            for step_name in sorted_step_names
        ]
        sorted_steps: List[Tuple[str, "BaseStep"]] = [
            (name_in_pipeline, steps[name_in_pipeline])
            for name_in_pipeline in sorted_names_in_pipeline
        ]
        return sorted_steps

    @staticmethod
    def _ensure_required_stack_components_exist(
        stack: "Stack", steps: Sequence["Step"]
    ) -> None:
        """Ensures that the stack components required for each step exist.

        Args:
            stack: The stack on which the pipeline should be deployed.
            steps: The steps of the pipeline.

        Raises:
            StackValidationError: If a required stack component is missing.
        """
        available_step_operators = (
            {stack.step_operator.name} if stack.step_operator else set()
        )
        available_experiment_trackers = (
            {stack.experiment_tracker.name}
            if stack.experiment_tracker
            else set()
        )

        for step in steps:
            step_operator = step.config.step_operator
            if step_operator and step_operator not in available_step_operators:
                raise StackValidationError(
                    f"Step '{step.config.name}' requires step operator "
                    f"'{step_operator}' which is not configured in "
                    f"the stack '{stack.name}'. Available step operators: "
                    f"{available_step_operators}."
                )

            experiment_tracker = step.config.experiment_tracker
            if (
                experiment_tracker
                and experiment_tracker not in available_experiment_trackers
            ):
                raise StackValidationError(
                    f"Step '{step.config.name}' requires experiment tracker "
                    f"'{experiment_tracker}' which is not "
                    f"configured in the stack '{stack.name}'. Available "
                    f"experiment trackers: {available_experiment_trackers}."
                )
compile(self, pipeline, stack, run_configuration)

Compiles a ZenML pipeline to a serializable representation.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline to compile.

required
stack Stack

The stack on which the pipeline will run.

required
run_configuration PipelineRunConfiguration

The run configuration for this pipeline.

required

Returns:

Type Description
PipelineDeployment

The compiled pipeline.

Source code in zenml/config/compiler.py
def compile(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    run_configuration: PipelineRunConfiguration,
) -> PipelineDeployment:
    """Compiles a ZenML pipeline to a serializable representation.

    Args:
        pipeline: The pipeline to compile.
        stack: The stack on which the pipeline will run.
        run_configuration: The run configuration for this pipeline.

    Returns:
        The compiled pipeline.
    """
    logger.debug("Compiling pipeline `%s`.", pipeline.name)
    # Copy the pipeline before we apply any run-level configurations so
    # we don't mess with the pipeline object/step objects in any way
    pipeline = copy.deepcopy(pipeline)
    self._apply_run_configuration(
        pipeline=pipeline, config=run_configuration
    )
    self._apply_stack_default_settings(pipeline=pipeline, stack=stack)
    self._verify_distinct_step_names(pipeline=pipeline)
    if run_configuration.run_name:
        self._verify_run_name(run_configuration.run_name)

    pipeline.connect(**pipeline.steps)

    pipeline_settings = self._filter_and_validate_settings(
        settings=pipeline.configuration.settings,
        configuration_level=ConfigurationLevel.PIPELINE,
        stack=stack,
    )
    pipeline.configure(settings=pipeline_settings, merge=False)

    settings_to_passdown = {
        key: settings
        for key, settings in pipeline_settings.items()
        if ConfigurationLevel.STEP in settings.LEVEL
    }

    steps = {
        name: self._compile_step(
            step=step,
            pipeline_settings=settings_to_passdown,
            pipeline_extra=pipeline.configuration.extra,
            stack=stack,
        )
        for name, step in self._get_sorted_steps(steps=pipeline.steps)
    }

    self._ensure_required_stack_components_exist(
        stack=stack, steps=list(steps.values())
    )

    run_name = run_configuration.run_name or self._get_default_run_name(
        pipeline_name=pipeline.name
    )

    deployment = PipelineDeployment(
        run_name=run_name,
        stack_id=stack.id,
        schedule=run_configuration.schedule,
        pipeline=pipeline.configuration,
        steps=steps,
        client_environment=get_run_environment_dict(),
    )
    logger.debug("Compiled pipeline deployment: %s", deployment)
    return deployment

config_keys

Validates global configuration values.

ConfigKeys

Class to validate dictionary configurations.

Source code in zenml/config/config_keys.py
class ConfigKeys:
    """Class to validate dictionary configurations."""

    @classmethod
    def get_keys(cls) -> Tuple[List[str], List[str]]:
        """Gets all the required and optional config keys for this class.

        Returns:
            A tuple (required, optional) which are lists of the
            required/optional keys for this class.
        """
        keys = {
            key: value
            for key, value in cls.__dict__.items()
            if not isinstance(value, classmethod)
            and not isinstance(value, staticmethod)
            and not callable(value)
            and not key.startswith("__")
        }

        required = [v for k, v in keys.items() if not k.endswith("_")]
        optional = [v for k, v in keys.items() if k.endswith("_")]

        return required, optional

    @classmethod
    def key_check(cls, config: Dict[str, Any]) -> None:
        """Checks whether a configuration dict contains all required keys and no unknown keys.

        Args:
            config: The configuration dict to verify.

        Raises:
            TypeError: If no config dictionary is passed.
            ValueError: If required keys are missing or unknown keys are found.
        """
        if not isinstance(config, dict):
            raise TypeError(f"Please specify a dict for {cls.__name__}")

        # Required and optional keys for the config dict
        required, optional = cls.get_keys()

        # Check for missing keys
        missing_keys = [k for k in required if k not in config.keys()]
        if missing_keys:
            raise ValueError(
                f"Missing key(s) {missing_keys} in {cls.__name__}"
            )

        # Check for unknown keys
        unknown_keys = [
            k for k in config.keys() if k not in required and k not in optional
        ]
        if unknown_keys:
            raise ValueError(
                f"Unknown key(s) {unknown_keys} in {cls.__name__}. "
                f"Required keys: {required}, optional keys: {optional}."
            )
get_keys() classmethod

Gets all the required and optional config keys for this class.

Returns:

Type Description
Tuple[List[str], List[str]]

A tuple (required, optional) which are lists of the required/optional keys for this class.

Source code in zenml/config/config_keys.py
@classmethod
def get_keys(cls) -> Tuple[List[str], List[str]]:
    """Gets all the required and optional config keys for this class.

    Returns:
        A tuple (required, optional) which are lists of the
        required/optional keys for this class.
    """
    keys = {
        key: value
        for key, value in cls.__dict__.items()
        if not isinstance(value, classmethod)
        and not isinstance(value, staticmethod)
        and not callable(value)
        and not key.startswith("__")
    }

    required = [v for k, v in keys.items() if not k.endswith("_")]
    optional = [v for k, v in keys.items() if k.endswith("_")]

    return required, optional
key_check(config) classmethod

Checks whether a configuration dict contains all required keys and no unknown keys.

Parameters:

Name Type Description Default
config Dict[str, Any]

The configuration dict to verify.

required

Exceptions:

Type Description
TypeError

If no config dictionary is passed.

ValueError

If required keys are missing or unknown keys are found.

Source code in zenml/config/config_keys.py
@classmethod
def key_check(cls, config: Dict[str, Any]) -> None:
    """Checks whether a configuration dict contains all required keys and no unknown keys.

    Args:
        config: The configuration dict to verify.

    Raises:
        TypeError: If no config dictionary is passed.
        ValueError: If required keys are missing or unknown keys are found.
    """
    if not isinstance(config, dict):
        raise TypeError(f"Please specify a dict for {cls.__name__}")

    # Required and optional keys for the config dict
    required, optional = cls.get_keys()

    # Check for missing keys
    missing_keys = [k for k in required if k not in config.keys()]
    if missing_keys:
        raise ValueError(
            f"Missing key(s) {missing_keys} in {cls.__name__}"
        )

    # Check for unknown keys
    unknown_keys = [
        k for k in config.keys() if k not in required and k not in optional
    ]
    if unknown_keys:
        raise ValueError(
            f"Unknown key(s) {unknown_keys} in {cls.__name__}. "
            f"Required keys: {required}, optional keys: {optional}."
        )

PipelineConfigurationKeys (ConfigKeys)

Keys for a pipeline configuration dict.

Source code in zenml/config/config_keys.py
class PipelineConfigurationKeys(ConfigKeys):
    """Keys for a pipeline configuration dict."""

    NAME = "name"
    STEPS = "steps"

SourceConfigurationKeys (ConfigKeys)

Keys for a step configuration source dict.

Source code in zenml/config/config_keys.py
class SourceConfigurationKeys(ConfigKeys):
    """Keys for a step configuration source dict."""

    FILE_ = "file"
    NAME_ = "name"

StepConfigurationKeys (ConfigKeys)

Keys for a step configuration dict.

Source code in zenml/config/config_keys.py
class StepConfigurationKeys(ConfigKeys):
    """Keys for a step configuration dict."""

    SOURCE_ = "source"
    PARAMETERS_ = "parameters"
    MATERIALIZERS_ = "materializers"

constants

ZenML settings constants.

docker_settings

Docker settings.

DockerSettings (BaseSettings) pydantic-model

Settings for building Docker images to run ZenML pipelines.

Build process:
  • No dockerfile specified: If any of the options regarding requirements, environment variables or copying files require us to build an image, ZenML will build this image. Otherwise the parent_image will be used to run the pipeline.
  • dockerfile specified: ZenML will first build an image based on the specified Dockerfile. If any of the options regarding requirements, environment variables or copying files require an additional image built on top of that, ZenML will build a second image. If not, the image build from the specified Dockerfile will be used to run the pipeline.
Requirements installation order:

Depending on the configuration of this object, requirements will be installed in the following order (each step optional): - The packages installed in your local python environment - The packages specified via the requirements attribute - The packages specified via the required_integrations and potentially stack requirements

Attributes:

Name Type Description
parent_image Optional[str]

Full name of the Docker image that should be used as the parent for the image that will be built. Defaults to a ZenML image built for the active Python and ZenML version.

Additional notes: * If you specify a custom image here, you need to make sure it has ZenML installed. * If this is a non-local image, the environment which is running the pipeline and building the Docker image needs to be able to pull this image. * If a custom dockerfile is specified for this settings object, this parent image will be ignored.

dockerfile Optional[str]

Path to a custom Dockerfile that should be built. Depending on the other values you specify in this object, the resulting image will be used directly to run your pipeline or ZenML will use it as a parent image to build on top of. See the general docstring of this class for more information.

Additional notes: * If you specify this, the parent_image attribute will be ignored. * If you specify this, the image built from this Dockerfile needs to have ZenML installed.

build_context_root Optional[str]

Build context root for the Docker build, only used when the dockerfile attribute is set. If this is left empty, the build context will only contain the Dockerfile.

build_options Dict[str, Any]

Additional options that will be passed unmodified to the Docker build call when building an image using the specified dockerfile. You can use this to for example specify build args or a target stage. See https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build for a full list of available options.

docker_target_repository

Name of the Docker repository to which the image should be pushed. This repository will be appended to the registry URI of the container registry of your stack and should therefore not include any registry.

replicate_local_python_environment Union[List[str], zenml.config.docker_settings.PythonEnvironmentExportMethod]

If not None, ZenML will use the specified method to generate a requirements file that replicates the packages installed in the currently running python environment. This requirements file will then be installed in the Docker image.

requirements Union[NoneType, str, List[str]]

Path to a requirements file or a list of required pip packages. During the image build, these requirements will be installed using pip. If you need to use a different tool to resolve and/or install your packages, please use a custom parent image or specify a custom dockerfile.

required_integrations List[str]

List of ZenML integrations that should be installed. All requirements for the specified integrations will be installed inside the Docker image.

install_stack_requirements bool

If True, ZenML will automatically detect if components of your active stack are part of a ZenML integration and install the corresponding requirements and apt packages. If you set this to False or use custom components in your stack, you need to make sure these get installed by specifying them in the requirements and apt_packages attributes.

apt_packages List[str]

APT packages to install inside the Docker image.

environment Dict[str, Any]

Dictionary of environment variables to set inside the Docker image.

dockerignore Optional[str]

Path to a dockerignore file to use when building the Docker image.

copy_files bool

If True, user files will be copied into the Docker image. If this is set to False, ZenML will not copy any of your files into the Docker image and you're responsible that all the files to run your pipeline exist in the right place.

copy_global_config bool

If True, the global configuration (contains connection info for your ZenStore) will be copied into the Docker image. If this is set to False, ZenML will not copy this configuration and you're responsible for making sure ZenML can access the ZenStore in the Docker image.

user Optional[str]

If not None, will set the user, make it owner of the /app directory which contains all the user code and run the container entrypoint as this user.

Source code in zenml/config/docker_settings.py
class DockerSettings(BaseSettings):
    """Settings for building Docker images to run ZenML pipelines.

    Build process:
    --------------
    * No `dockerfile` specified: If any of the options regarding
    requirements, environment variables or copying files require us to build an
    image, ZenML will build this image. Otherwise the `parent_image` will be
    used to run the pipeline.
    * `dockerfile` specified: ZenML will first build an image based on the
    specified Dockerfile. If any of the options regarding
    requirements, environment variables or copying files require an additional
    image built on top of that, ZenML will build a second image. If not, the
    image build from the specified Dockerfile will be used to run the pipeline.

    Requirements installation order:
    --------------------------------
    Depending on the configuration of this object, requirements will be
    installed in the following order (each step optional):
    - The packages installed in your local python environment
    - The packages specified via the `requirements` attribute
    - The packages specified via the `required_integrations` and potentially
      stack requirements

    Attributes:
        parent_image: Full name of the Docker image that should be
            used as the parent for the image that will be built. Defaults to
            a ZenML image built for the active Python and ZenML version.

            Additional notes:
            * If you specify a custom image here, you need to make sure it has
            ZenML installed.
            * If this is a non-local image, the environment which is running
            the pipeline and building the Docker image needs to be able to pull
            this image.
            * If a custom `dockerfile` is specified for this settings
            object, this parent image will be ignored.
        dockerfile: Path to a custom Dockerfile that should be built. Depending
            on the other values you specify in this object, the resulting
            image will be used directly to run your pipeline or ZenML will use
            it as a parent image to build on top of. See the general docstring
            of this class for more information.

            Additional notes:
            * If you specify this, the `parent_image` attribute will be ignored.
            * If you specify this, the image built from this Dockerfile needs
            to have ZenML installed.
        build_context_root: Build context root for the Docker build, only used
            when the `dockerfile` attribute is set. If this is left empty, the
            build context will only contain the Dockerfile.
        build_options: Additional options that will be passed unmodified to the
            Docker build call when building an image using the specified
            `dockerfile`. You can use this to for example specify build
            args or a target stage. See
            https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
            for a full list of available options.
        docker_target_repository: Name of the Docker repository to which the
            image should be pushed. This repository will be appended to the
            registry URI of the container registry of your stack and should
            therefore **not** include any registry.
        replicate_local_python_environment: If not `None`, ZenML will use the
            specified method to generate a requirements file that replicates
            the packages installed in the currently running python environment.
            This requirements file will then be installed in the Docker image.
        requirements: Path to a requirements file or a list of required pip
            packages. During the image build, these requirements will be
            installed using pip. If you need to use a different tool to
            resolve and/or install your packages, please use a custom parent
            image or specify a custom `dockerfile`.
        required_integrations: List of ZenML integrations that should be
            installed. All requirements for the specified integrations will
            be installed inside the Docker image.
        install_stack_requirements: If `True`, ZenML will automatically detect
            if components of your active stack are part of a ZenML integration
            and install the corresponding requirements and apt packages.
            If you set this to `False` or use custom components in your stack,
            you need to make sure these get installed by specifying them in
            the `requirements` and `apt_packages` attributes.
        apt_packages: APT packages to install inside the Docker image.
        environment: Dictionary of environment variables to set inside the
            Docker image.
        dockerignore: Path to a dockerignore file to use when building the
            Docker image.
        copy_files: If `True`, user files will be copied into the Docker image.
            If this is set to `False`, ZenML will not copy any of your files
            into the Docker image and you're responsible that all the files
            to run your pipeline exist in the right place.
        copy_global_config: If `True`, the global configuration (contains
            connection info for your ZenStore) will be copied into the Docker
            image. If this is set to `False`, ZenML will not copy this
            configuration and you're responsible for making sure ZenML can
            access the ZenStore in the Docker image.
        user: If not `None`, will set the user, make it owner of the `/app`
            directory which contains all the user code and run the container
            entrypoint as this user.
    """

    LEVEL = ConfigurationLevel.PIPELINE

    parent_image: Optional[str] = None
    dockerfile: Optional[str] = None
    build_context_root: Optional[str] = None
    build_options: Dict[str, Any] = {}

    target_repository: str = "zenml"

    replicate_local_python_environment: Optional[
        Union[List[str], PythonEnvironmentExportMethod]
    ] = None
    requirements: Union[None, str, List[str]] = None
    required_integrations: List[str] = []
    install_stack_requirements: bool = True

    apt_packages: List[str] = []

    environment: Dict[str, Any] = {}
    dockerignore: Optional[str] = None
    copy_files: bool = True
    copy_global_config: bool = True
    user: Optional[str] = None

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # prevent extra attributes during model initialization
        extra = Extra.forbid
Config

Pydantic configuration class.

Source code in zenml/config/docker_settings.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # prevent extra attributes during model initialization
    extra = Extra.forbid

PythonEnvironmentExportMethod (Enum)

Different methods to export the local Python environment.

Source code in zenml/config/docker_settings.py
class PythonEnvironmentExportMethod(Enum):
    """Different methods to export the local Python environment."""

    PIP_FREEZE = "pip_freeze"
    POETRY_EXPORT = "poetry_export"

    @property
    def command(self) -> str:
        """Shell command that outputs local python packages.

        The output string must be something that can be interpreted as a
        requirements file for pip once it's written to a file.

        Returns:
            Shell command.
        """
        return {
            PythonEnvironmentExportMethod.PIP_FREEZE: "pip freeze",
            PythonEnvironmentExportMethod.POETRY_EXPORT: "poetry export --format=requirements.txt",
        }[self]

global_config

Functionality to support ZenML GlobalConfiguration.

GlobalConfigMetaClass (ModelMetaclass)

Global configuration metaclass.

This metaclass is used to enforce a singleton instance of the GlobalConfiguration class with the following additional properties:

  • the GlobalConfiguration is initialized automatically on import with the default configuration, if no config file exists yet.
  • the GlobalConfiguration undergoes a schema migration if the version of the config file is older than the current version of the ZenML package.
  • a default store is set if no store is configured yet.
Source code in zenml/config/global_config.py
class GlobalConfigMetaClass(ModelMetaclass):
    """Global configuration metaclass.

    This metaclass is used to enforce a singleton instance of the
    GlobalConfiguration class with the following additional properties:

    * the GlobalConfiguration is initialized automatically on import with the
    default configuration, if no config file exists yet.
    * the GlobalConfiguration undergoes a schema migration if the version of the
    config file is older than the current version of the ZenML package.
    * a default store is set if no store is configured yet.
    """

    def __init__(cls, *args: Any, **kwargs: Any) -> None:
        """Initialize a singleton class.

        Args:
            *args: positional arguments
            **kwargs: keyword arguments
        """
        super().__init__(*args, **kwargs)
        cls._global_config: Optional["GlobalConfiguration"] = None

    def __call__(cls, *args: Any, **kwargs: Any) -> "GlobalConfiguration":
        """Create or return the default global config instance.

        If the GlobalConfiguration constructor is called with custom arguments,
        the singleton functionality of the metaclass is bypassed: a new
        GlobalConfiguration instance is created and returned immediately and
        without saving it as the global GlobalConfiguration singleton.

        Args:
            *args: positional arguments
            **kwargs: keyword arguments

        Returns:
            The global GlobalConfiguration instance.
        """
        if args or kwargs:
            return cast(
                "GlobalConfiguration", super().__call__(*args, **kwargs)
            )

        if not cls._global_config:
            cls._global_config = cast(
                "GlobalConfiguration", super().__call__(*args, **kwargs)
            )
            cls._global_config._migrate_config()
        return cls._global_config
__call__(cls, *args, **kwargs) special

Create or return the default global config instance.

If the GlobalConfiguration constructor is called with custom arguments, the singleton functionality of the metaclass is bypassed: a new GlobalConfiguration instance is created and returned immediately and without saving it as the global GlobalConfiguration singleton.

Parameters:

Name Type Description Default
*args Any

positional arguments

()
**kwargs Any

keyword arguments

{}

Returns:

Type Description
GlobalConfiguration

The global GlobalConfiguration instance.

Source code in zenml/config/global_config.py
def __call__(cls, *args: Any, **kwargs: Any) -> "GlobalConfiguration":
    """Create or return the default global config instance.

    If the GlobalConfiguration constructor is called with custom arguments,
    the singleton functionality of the metaclass is bypassed: a new
    GlobalConfiguration instance is created and returned immediately and
    without saving it as the global GlobalConfiguration singleton.

    Args:
        *args: positional arguments
        **kwargs: keyword arguments

    Returns:
        The global GlobalConfiguration instance.
    """
    if args or kwargs:
        return cast(
            "GlobalConfiguration", super().__call__(*args, **kwargs)
        )

    if not cls._global_config:
        cls._global_config = cast(
            "GlobalConfiguration", super().__call__(*args, **kwargs)
        )
        cls._global_config._migrate_config()
    return cls._global_config
__init__(cls, *args, **kwargs) special

Initialize a singleton class.

Parameters:

Name Type Description Default
*args Any

positional arguments

()
**kwargs Any

keyword arguments

{}
Source code in zenml/config/global_config.py
def __init__(cls, *args: Any, **kwargs: Any) -> None:
    """Initialize a singleton class.

    Args:
        *args: positional arguments
        **kwargs: keyword arguments
    """
    super().__init__(*args, **kwargs)
    cls._global_config: Optional["GlobalConfiguration"] = None

GlobalConfiguration (BaseModel) pydantic-model

Stores global configuration options.

Configuration options are read from a config file, but can be overwritten by environment variables. See GlobalConfiguration.__getattribute__ for more details.

Attributes:

Name Type Description
user_id

Unique user id.

user_email

Email address associated with this client.

user_email_opt_in

Whether the user has opted in to email communication.

analytics_opt_in

If a user agreed to sending analytics or not.

version

Version of ZenML that was last used to create or update the global config.

store

Store configuration.

active_stack_id

The ID of the active stack.

active_project_name

The name of the active project.

jwt_secret_key

The secret key used to sign and verify JWT tokens.

_config_path

Directory where the global config file is stored.

Source code in zenml/config/global_config.py
class GlobalConfiguration(BaseModel, metaclass=GlobalConfigMetaClass):
    """Stores global configuration options.

    Configuration options are read from a config file, but can be overwritten
    by environment variables. See `GlobalConfiguration.__getattribute__` for
    more details.

    Attributes:
        user_id: Unique user id.
        user_email: Email address associated with this client.
        user_email_opt_in: Whether the user has opted in to email communication.
        analytics_opt_in: If a user agreed to sending analytics or not.
        version: Version of ZenML that was last used to create or update the
            global config.
        store: Store configuration.
        active_stack_id: The ID of the active stack.
        active_project_name: The name of the active project.
        jwt_secret_key: The secret key used to sign and verify JWT tokens.
        _config_path: Directory where the global config file is stored.
    """

    user_id: uuid.UUID = Field(
        default_factory=uuid.uuid4, allow_mutation=False
    )
    user_email: Optional[str] = None
    user_email_opt_in: Optional[bool] = None
    analytics_opt_in: bool = True
    version: Optional[str]
    store: Optional[StoreConfiguration]
    active_stack_id: Optional[uuid.UUID]
    active_project_name: Optional[str]
    jwt_secret_key: str = Field(default_factory=generate_jwt_secret_key)

    _config_path: str
    _zen_store: Optional["BaseZenStore"] = None
    _active_project: Optional["ProjectResponseModel"] = None

    def __init__(
        self, config_path: Optional[str] = None, **kwargs: Any
    ) -> None:
        """Initializes a GlobalConfiguration using values from the config file.

        GlobalConfiguration is a singleton class: only one instance can exist.
        Calling this constructor multiple times will always yield the same
        instance (see the exception below).

        The `config_path` argument is only meant for internal use and testing
        purposes. User code must never pass it to the constructor. When a custom
        `config_path` value is passed, an anonymous GlobalConfiguration instance
        is created and returned independently of the GlobalConfiguration
        singleton and that will have no effect as far as the rest of the ZenML
        core code is concerned.

        If the config file doesn't exist yet, we try to read values from the
        legacy (ZenML version < 0.6) config file.

        Args:
            config_path: (internal use) custom config file path. When not
                specified, the default global configuration path is used and the
                global configuration singleton instance is returned. Only used
                to create configuration copies for transfer to different
                runtime environments.
            **kwargs: keyword arguments
        """
        self._config_path = config_path or self.default_config_directory()
        config_values = self._read_config()
        config_values.update(**kwargs)
        super().__init__(**config_values)

        if not fileio.exists(self._config_file(config_path)):
            self._write_config()

    @classmethod
    def get_instance(cls) -> Optional["GlobalConfiguration"]:
        """Return the GlobalConfiguration singleton instance.

        Returns:
            The GlobalConfiguration singleton instance or None, if the
            GlobalConfiguration hasn't been initialized yet.
        """
        return cls._global_config

    @classmethod
    def _reset_instance(
        cls, config: Optional["GlobalConfiguration"] = None
    ) -> None:
        """Reset the GlobalConfiguration singleton instance.

        This method is only meant for internal use and testing purposes.

        Args:
            config: The GlobalConfiguration instance to set as the global
                singleton. If None, the global GlobalConfiguration singleton is
                reset to an empty value.
        """
        cls._global_config = config

    @validator("version")
    def _validate_version(cls, v: Optional[str]) -> Optional[str]:
        """Validate the version attribute.

        Args:
            v: The version attribute value.

        Returns:
            The version attribute value.

        Raises:
            RuntimeError: If the version parsing fails.
        """
        if v is None:
            return v

        if not isinstance(version.parse(v), version.Version):
            # If the version parsing fails, it returns a `LegacyVersion`
            # instead. Check to make sure it's an actual `Version` object
            # which represents a valid version.
            raise RuntimeError(
                f"Invalid version in global configuration: {v}."
            )

        return v

    def __setattr__(self, key: str, value: Any) -> None:
        """Sets an attribute and persists it in the global configuration.

        Args:
            key: The attribute name.
            value: The attribute value.
        """
        super().__setattr__(key, value)
        if key.startswith("_"):
            return
        self._write_config()

    def __custom_getattribute__(self, key: str) -> Any:
        """Gets an attribute value for a specific key.

        If a value for this attribute was specified using an environment
        variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
        value can be parsed to the attribute type, the value from this
        environment variable is returned instead.

        Args:
            key: The attribute name.

        Returns:
            The attribute value.
        """
        value = super().__getattribute__(key)
        if key.startswith("_") or key not in type(self).__fields__:
            return value

        environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
        try:
            environment_variable_value = os.environ[environment_variable_name]
            # set the environment variable value to leverage Pydantic's type
            # conversion and validation
            super().__setattr__(key, environment_variable_value)
            return_value = super().__getattribute__(key)
            # set back the old value as we don't want to permanently store
            # the environment variable value here
            super().__setattr__(key, value)
            return return_value
        except (ValidationError, KeyError, TypeError):
            return value

    if not TYPE_CHECKING:
        # When defining __getattribute__, mypy allows accessing non-existent
        # attributes without failing
        # (see https://github.com/python/mypy/issues/13319).
        __getattribute__ = __custom_getattribute__

    def _migrate_config(self) -> None:
        """Migrates the global config to the latest version."""
        curr_version = version.parse(__version__)
        if self.version is None:
            logger.info(
                "Initializing the ZenML global configuration version to %s",
                curr_version,
            )
        else:
            config_version = version.parse(self.version)
            if config_version > curr_version:
                logger.error(
                    "The ZenML global configuration version (%s) is higher "
                    "than the version of ZenML currently being used (%s). "
                    "This may happen if you recently downgraded ZenML to an "
                    "earlier version, or if you have already used a more "
                    "recent ZenML version on the same machine. "
                    "It is highly recommended that you update ZenML to at "
                    "least match the global configuration version, otherwise "
                    "you may run into unexpected issues such as model schema "
                    "validation failures or even loss of information.",
                    config_version,
                    curr_version,
                )
                # TODO [ENG-899]: Give more detailed instruction on how to
                #  resolve version mismatch.
                return

            if config_version == curr_version:
                return

            logger.info(
                "Migrating the ZenML global configuration from version %s "
                "to version %s...",
                config_version,
                curr_version,
            )

        # this will also trigger rewriting the config file to disk
        # to ensure the schema migration results are persisted
        self.version = __version__

    def _read_config(self) -> Dict[str, Any]:
        """Reads configuration options from disk.

        If the config file doesn't exist yet, this method returns an empty
        dictionary.

        Returns:
            A dictionary containing the configuration options.
        """
        config_values = {}
        if fileio.exists(self._config_file()):
            config_values = cast(
                Dict[str, Any],
                yaml_utils.read_yaml(self._config_file()),
            )

        return config_values

    def _write_config(self, config_path: Optional[str] = None) -> None:
        """Writes the global configuration options to disk.

        Args:
            config_path: custom config file path. When not specified, the
                default global configuration path is used.
        """
        config_file = self._config_file(config_path)
        yaml_dict = json.loads(self.json(exclude_none=True))
        logger.debug(f"Writing config to {config_file}")

        if not fileio.exists(config_file):
            io_utils.create_dir_recursive_if_not_exists(
                config_path or self.config_directory
            )

        yaml_utils.write_yaml(config_file, yaml_dict)

    def _configure_store(
        self,
        config: StoreConfiguration,
        skip_default_registrations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Configure the global zen store.

        This method creates and initializes the global store according to the
        supplied configuration.

        Args:
            config: The new store configuration to use.
            skip_default_registrations: If `True`, the creation of the default
                stack and user in the store will be skipped.
            **kwargs: Additional keyword arguments to pass to the store
                constructor.
        """
        from zenml.zen_stores.base_zen_store import BaseZenStore

        store = BaseZenStore.create_store(
            config, skip_default_registrations, **kwargs
        )
        if self.store != store.config or not self._zen_store:
            logger.debug(f"Configuring the global store to {store.config}")
            self.store = store.config

            # We want to check if the active user has opted in or out for using
            # an email address for marketing purposes and if so, record it in
            # the analytics.
            active_user = store.get_user(include_private=True)
            if active_user.email_opted_in is not None:
                self.record_email_opt_in_out(
                    opted_in=active_user.email_opted_in,
                    email=active_user.email,
                    source=AnalyticsEventSource.ZENML_SERVER,
                )
            self._zen_store = store

            # Sanitize the global configuration to reflect the new store
            self._sanitize_config()
            self._write_config()

            local_stores_path = Path(self.local_stores_path)
            local_stores_path.mkdir(parents=True, exist_ok=True)

    def _sanitize_config(self) -> None:
        """Sanitize and save the global configuration.

        This method is called to ensure that the active stack and project
        are set to their default values, if possible.
        """
        active_project, active_stack = self.zen_store.validate_active_config(
            self.active_project_name,
            self.active_stack_id,
            config_name="global",
        )
        self.active_project_name = active_project.name
        self._active_project = active_project
        self.set_active_stack(active_stack)

    @staticmethod
    def default_config_directory() -> str:
        """Path to the default global configuration directory.

        Returns:
            The default global configuration directory.
        """
        return io_utils.get_global_config_directory()

    def _config_file(self, config_path: Optional[str] = None) -> str:
        """Path to the file where global configuration options are stored.

        Args:
            config_path: custom config file path. When not specified, the
                default global configuration path is used.

        Returns:
            The path to the global configuration file.
        """
        return os.path.join(config_path or self._config_path, "config.yaml")

    def copy_configuration(
        self,
        config_path: str,
        load_config_path: Optional[PurePath] = None,
        store_config: Optional[StoreConfiguration] = None,
        empty_store: bool = False,
    ) -> "GlobalConfiguration":
        """Create a copy of the global config using a different config path.

        This method is used to copy the global configuration and store it in a
        different configuration path, where it can be loaded in the context of a
        new environment, such as a container image.

        The configuration files accompanying the store configuration are also
        copied to the new configuration path (e.g. certificates etc.)
        unless a custom store configuration is provided or the `empty_store`
        flag is set to `True`.

        If the default local store is currently in use, it will not be included
        in the configuration copy. This is the same as explicitly setting the
        `empty_store` flag to `True`.

        Args:
            config_path: path where the configuration copy should be saved
            load_config_path: absolute path that will be used to load the copied
                configuration. This can be set to a value different from
                `config_path` if the configuration copy will be loaded from
                a different environment, e.g. when the configuration is copied
                to a container image and loaded using a different absolute path.
                This will be reflected in the paths and URLs encoded in the
                copied configuration.
            store_config: custom store configuration to use for the copied
                global configuration. If not specified, the current global store
                configuration is used.
            empty_store: if `True`, an empty store configuration is used for the
                copied global configuration. This means that the copied global
                configuration will be initialized to the default local store in
                the new environment.

        Returns:
            A new global configuration object copied to the specified path.
        """
        from zenml.zen_stores.base_zen_store import BaseZenStore

        self._write_config(config_path)
        config_copy = GlobalConfiguration(config_path=config_path)

        store: Optional[StoreConfiguration] = None

        if store_config is not None:
            store = store_config

        elif empty_store or self.uses_default_store():
            store = None

        elif self.store:
            store_config_class = BaseZenStore.get_store_config_class(
                self.store.type
            )

            store_config_copy = store_config_class.copy_configuration(
                self.store, config_path, load_config_path
            )
            store = store_config_copy
        config_copy.store = store

        return config_copy

    @property
    def config_directory(self) -> str:
        """Directory where the global configuration file is located.

        Returns:
            The directory where the global configuration file is located.
        """
        return self._config_path

    @property
    def local_stores_path(self) -> str:
        """Path where local stores information is stored.

        Returns:
            The path where local stores information is stored.
        """
        if ENV_ZENML_LOCAL_STORES_PATH in os.environ:
            return os.environ[ENV_ZENML_LOCAL_STORES_PATH]

        return os.path.join(
            self.config_directory,
            LOCAL_STORES_DIRECTORY_NAME,
        )

    def get_default_store(self) -> StoreConfiguration:
        """Get the default store configuration.

        Returns:
            The default store configuration.
        """
        from zenml.zen_stores.base_zen_store import BaseZenStore

        env_config: Dict[str, str] = {}
        for k, v in os.environ.items():
            if v == "":
                continue
            if k.startswith(ENV_ZENML_STORE_PREFIX):
                env_config[k[len(ENV_ZENML_STORE_PREFIX) :].lower()] = v
        if len(env_config):
            if "type" not in env_config and "url" in env_config:
                env_config["type"] = BaseZenStore.get_store_type(
                    env_config["url"]
                )

            logger.debug(
                "Using environment variables to configure the default store"
            )
            return StoreConfiguration(**env_config)

        return BaseZenStore.get_default_store_config(
            path=os.path.join(
                self.local_stores_path,
                DEFAULT_STORE_DIRECTORY_NAME,
            )
        )

    def set_default_store(self) -> None:
        """Creates and sets the default store configuration.

        Call this method to initialize or revert the store configuration to the
        default store.
        """
        with event_handler(
            AnalyticsEvent.INITIALIZED_STORE
        ) as analytics_handler:
            default_store_cfg = self.get_default_store()
            self._configure_store(default_store_cfg)
            logger.info("Using the default store for the global config.")
            analytics_handler.metadata = {
                "store_type": default_store_cfg.type.value
            }

    def uses_default_store(self) -> bool:
        """Check if the global configuration uses the default store.

        Returns:
            `True` if the global configuration uses the default store.
        """
        return (
            self.store is not None
            and self.store.url == self.get_default_store().url
        )

    def set_store(
        self,
        config: StoreConfiguration,
        skip_default_registrations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Update the active store configuration.

        Call this method to validate and update the active store configuration.

        Args:
            config: The new store configuration to use.
            skip_default_registrations: If `True`, the creation of the default
                stack and user in the store will be skipped.
            **kwargs: Additional keyword arguments to pass to the store
                constructor.
        """
        with event_handler(
            event=AnalyticsEvent.INITIALIZED_STORE,
            metadata={"store_type": config.type.value},
        ):
            self._configure_store(config, skip_default_registrations, **kwargs)
            logger.info("Updated the global store configuration.")

            if self.zen_store.type == StoreType.REST:
                # Every time a client connects to a ZenML server, we want to
                # group the client ID and the server ID together. This records
                # only that a particular client has successfully connected to a
                # particular server at least once, but no information about the
                # user account is recorded here.

                with event_handler(
                    event=AnalyticsEvent.ZENML_SERVER_CONNECTED
                ):
                    server_info = self.zen_store.get_store_info()

                    identify_group(
                        AnalyticsGroup.ZENML_SERVER_GROUP,
                        group_id=str(server_info.id),
                        group_metadata={
                            "version": server_info.version,
                            "deployment_type": str(
                                server_info.deployment_type
                            ),
                            "database_type": str(server_info.database_type),
                        },
                    )

    @property
    def zen_store(self) -> "BaseZenStore":
        """Initialize and/or return the global zen store.

        If the store hasn't been initialized yet, it is initialized when this
        property is first accessed according to the global store configuration.

        Returns:
            The current zen store.
        """
        if not self.store:
            self.set_default_store()
        elif self._zen_store is None:
            self._configure_store(self.store)

        assert self._zen_store is not None

        return self._zen_store

    def set_active_project(
        self, project: "ProjectResponseModel"
    ) -> "ProjectResponseModel":
        """Set the project for the local client.

        Args:
            project: The project to set active.

        Returns:
            The project that was set active.
        """
        self.active_project_name = project.name
        self._active_project = project
        # Sanitize the global configuration to reflect the new project
        self._sanitize_config()
        return project

    def set_active_stack(self, stack: "StackResponseModel") -> None:
        """Set the active stack for the local client.

        Args:
            stack: The model of the stack to set active.
        """
        self.active_stack_id = stack.id

    def get_active_project(self) -> "ProjectResponseModel":
        """Get a model of the active project for the local client.

        Returns:
            The model of the active project.
        """
        project_name = self.get_active_project_name()

        if self._active_project is not None:
            return self._active_project

        project = self.zen_store.get_project(
            project_name_or_id=project_name,
        )
        return self.set_active_project(project)

    def get_active_project_name(self) -> str:
        """Get the name of the active project.

        If the active project doesn't exist yet, the ZenStore is reinitialized.

        Returns:
            The name of the active project.
        """
        if self.active_project_name is None:
            _ = self.zen_store
            assert self.active_project_name is not None

        return self.active_project_name

    def get_active_stack_id(self) -> UUID:
        """Get the ID of the active stack.

        If the active stack doesn't exist yet, the ZenStore is reinitialized.

        Returns:
            The active stack ID.
        """
        if self.active_stack_id is None:
            _ = self.zen_store
            assert self.active_stack_id is not None

        return self.active_stack_id

    def record_email_opt_in_out(
        self,
        opted_in: bool,
        email: Optional[str],
        source: AnalyticsEventSource,
    ) -> None:
        """Set the email address associated with this client.

        Args:
            opted_in: Whether the user has opted in to email communication.
            email: The email address to use for this client, if given.
            source: The analytics event source.
        """
        # Whenever a new email address is associated with the client, we want
        # to identify the client by that email address. If the email address has
        # been changed, we also want to update the information.
        if opted_in and email and self.user_email != email:
            identify_user(
                {
                    "email": email,
                    "source": source,
                }
            )
            self.user_email = email

        if (
            self.user_email_opt_in is None
            or opted_in
            and not self.user_email_opt_in
        ):
            # When the user opts out giving the email for the first time, or
            # when the user opts in after opting out (e.g. when connecting to
            # a new server where the account has opt-in enabled), we want to
            # record the information as an analytics event.
            track_event(
                AnalyticsEvent.OPT_IN_OUT_EMAIL,
                {"opted_in": opted_in, "source": source},
            )

            self.user_email_opt_in = opted_in

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes from configs of previous ZenML versions to
        # permit downgrading
        extra = "allow"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
config_directory: str property readonly

Directory where the global configuration file is located.

Returns:

Type Description
str

The directory where the global configuration file is located.

local_stores_path: str property readonly

Path where local stores information is stored.

Returns:

Type Description
str

The path where local stores information is stored.

zen_store: BaseZenStore property readonly

Initialize and/or return the global zen store.

If the store hasn't been initialized yet, it is initialized when this property is first accessed according to the global store configuration.

Returns:

Type Description
BaseZenStore

The current zen store.

Config

Pydantic configuration class.

Source code in zenml/config/global_config.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes from configs of previous ZenML versions to
    # permit downgrading
    extra = "allow"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
__custom_getattribute__(self, key) special

Gets an attribute value for a specific key.

If a value for this attribute was specified using an environment variable called $(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME) and its value can be parsed to the attribute type, the value from this environment variable is returned instead.

Parameters:

Name Type Description Default
key str

The attribute name.

required

Returns:

Type Description
Any

The attribute value.

Source code in zenml/config/global_config.py
def __custom_getattribute__(self, key: str) -> Any:
    """Gets an attribute value for a specific key.

    If a value for this attribute was specified using an environment
    variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
    value can be parsed to the attribute type, the value from this
    environment variable is returned instead.

    Args:
        key: The attribute name.

    Returns:
        The attribute value.
    """
    value = super().__getattribute__(key)
    if key.startswith("_") or key not in type(self).__fields__:
        return value

    environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
    try:
        environment_variable_value = os.environ[environment_variable_name]
        # set the environment variable value to leverage Pydantic's type
        # conversion and validation
        super().__setattr__(key, environment_variable_value)
        return_value = super().__getattribute__(key)
        # set back the old value as we don't want to permanently store
        # the environment variable value here
        super().__setattr__(key, value)
        return return_value
    except (ValidationError, KeyError, TypeError):
        return value
__getattribute__(self, key) special

Gets an attribute value for a specific key.

If a value for this attribute was specified using an environment variable called $(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME) and its value can be parsed to the attribute type, the value from this environment variable is returned instead.

Parameters:

Name Type Description Default
key str

The attribute name.

required

Returns:

Type Description
Any

The attribute value.

Source code in zenml/config/global_config.py
def __custom_getattribute__(self, key: str) -> Any:
    """Gets an attribute value for a specific key.

    If a value for this attribute was specified using an environment
    variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
    value can be parsed to the attribute type, the value from this
    environment variable is returned instead.

    Args:
        key: The attribute name.

    Returns:
        The attribute value.
    """
    value = super().__getattribute__(key)
    if key.startswith("_") or key not in type(self).__fields__:
        return value

    environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
    try:
        environment_variable_value = os.environ[environment_variable_name]
        # set the environment variable value to leverage Pydantic's type
        # conversion and validation
        super().__setattr__(key, environment_variable_value)
        return_value = super().__getattribute__(key)
        # set back the old value as we don't want to permanently store
        # the environment variable value here
        super().__setattr__(key, value)
        return return_value
    except (ValidationError, KeyError, TypeError):
        return value
__init__(self, config_path=None, **kwargs) special

Initializes a GlobalConfiguration using values from the config file.

GlobalConfiguration is a singleton class: only one instance can exist. Calling this constructor multiple times will always yield the same instance (see the exception below).

The config_path argument is only meant for internal use and testing purposes. User code must never pass it to the constructor. When a custom config_path value is passed, an anonymous GlobalConfiguration instance is created and returned independently of the GlobalConfiguration singleton and that will have no effect as far as the rest of the ZenML core code is concerned.

If the config file doesn't exist yet, we try to read values from the legacy (ZenML version < 0.6) config file.

Parameters:

Name Type Description Default
config_path Optional[str]

(internal use) custom config file path. When not specified, the default global configuration path is used and the global configuration singleton instance is returned. Only used to create configuration copies for transfer to different runtime environments.

None
**kwargs Any

keyword arguments

{}
Source code in zenml/config/global_config.py
def __init__(
    self, config_path: Optional[str] = None, **kwargs: Any
) -> None:
    """Initializes a GlobalConfiguration using values from the config file.

    GlobalConfiguration is a singleton class: only one instance can exist.
    Calling this constructor multiple times will always yield the same
    instance (see the exception below).

    The `config_path` argument is only meant for internal use and testing
    purposes. User code must never pass it to the constructor. When a custom
    `config_path` value is passed, an anonymous GlobalConfiguration instance
    is created and returned independently of the GlobalConfiguration
    singleton and that will have no effect as far as the rest of the ZenML
    core code is concerned.

    If the config file doesn't exist yet, we try to read values from the
    legacy (ZenML version < 0.6) config file.

    Args:
        config_path: (internal use) custom config file path. When not
            specified, the default global configuration path is used and the
            global configuration singleton instance is returned. Only used
            to create configuration copies for transfer to different
            runtime environments.
        **kwargs: keyword arguments
    """
    self._config_path = config_path or self.default_config_directory()
    config_values = self._read_config()
    config_values.update(**kwargs)
    super().__init__(**config_values)

    if not fileio.exists(self._config_file(config_path)):
        self._write_config()
__setattr__(self, key, value) special

Sets an attribute and persists it in the global configuration.

Parameters:

Name Type Description Default
key str

The attribute name.

required
value Any

The attribute value.

required
Source code in zenml/config/global_config.py
def __setattr__(self, key: str, value: Any) -> None:
    """Sets an attribute and persists it in the global configuration.

    Args:
        key: The attribute name.
        value: The attribute value.
    """
    super().__setattr__(key, value)
    if key.startswith("_"):
        return
    self._write_config()
copy_configuration(self, config_path, load_config_path=None, store_config=None, empty_store=False)

Create a copy of the global config using a different config path.

This method is used to copy the global configuration and store it in a different configuration path, where it can be loaded in the context of a new environment, such as a container image.

The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.) unless a custom store configuration is provided or the empty_store flag is set to True.

If the default local store is currently in use, it will not be included in the configuration copy. This is the same as explicitly setting the empty_store flag to True.

Parameters:

Name Type Description Default
config_path str

path where the configuration copy should be saved

required
load_config_path Optional[pathlib.PurePath]

absolute path that will be used to load the copied configuration. This can be set to a value different from config_path if the configuration copy will be loaded from a different environment, e.g. when the configuration is copied to a container image and loaded using a different absolute path. This will be reflected in the paths and URLs encoded in the copied configuration.

None
store_config Optional[zenml.config.store_config.StoreConfiguration]

custom store configuration to use for the copied global configuration. If not specified, the current global store configuration is used.

None
empty_store bool

if True, an empty store configuration is used for the copied global configuration. This means that the copied global configuration will be initialized to the default local store in the new environment.

False

Returns:

Type Description
GlobalConfiguration

A new global configuration object copied to the specified path.

Source code in zenml/config/global_config.py
def copy_configuration(
    self,
    config_path: str,
    load_config_path: Optional[PurePath] = None,
    store_config: Optional[StoreConfiguration] = None,
    empty_store: bool = False,
) -> "GlobalConfiguration":
    """Create a copy of the global config using a different config path.

    This method is used to copy the global configuration and store it in a
    different configuration path, where it can be loaded in the context of a
    new environment, such as a container image.

    The configuration files accompanying the store configuration are also
    copied to the new configuration path (e.g. certificates etc.)
    unless a custom store configuration is provided or the `empty_store`
    flag is set to `True`.

    If the default local store is currently in use, it will not be included
    in the configuration copy. This is the same as explicitly setting the
    `empty_store` flag to `True`.

    Args:
        config_path: path where the configuration copy should be saved
        load_config_path: absolute path that will be used to load the copied
            configuration. This can be set to a value different from
            `config_path` if the configuration copy will be loaded from
            a different environment, e.g. when the configuration is copied
            to a container image and loaded using a different absolute path.
            This will be reflected in the paths and URLs encoded in the
            copied configuration.
        store_config: custom store configuration to use for the copied
            global configuration. If not specified, the current global store
            configuration is used.
        empty_store: if `True`, an empty store configuration is used for the
            copied global configuration. This means that the copied global
            configuration will be initialized to the default local store in
            the new environment.

    Returns:
        A new global configuration object copied to the specified path.
    """
    from zenml.zen_stores.base_zen_store import BaseZenStore

    self._write_config(config_path)
    config_copy = GlobalConfiguration(config_path=config_path)

    store: Optional[StoreConfiguration] = None

    if store_config is not None:
        store = store_config

    elif empty_store or self.uses_default_store():
        store = None

    elif self.store:
        store_config_class = BaseZenStore.get_store_config_class(
            self.store.type
        )

        store_config_copy = store_config_class.copy_configuration(
            self.store, config_path, load_config_path
        )
        store = store_config_copy
    config_copy.store = store

    return config_copy
default_config_directory() staticmethod

Path to the default global configuration directory.

Returns:

Type Description
str

The default global configuration directory.

Source code in zenml/config/global_config.py
@staticmethod
def default_config_directory() -> str:
    """Path to the default global configuration directory.

    Returns:
        The default global configuration directory.
    """
    return io_utils.get_global_config_directory()
get_active_project(self)

Get a model of the active project for the local client.

Returns:

Type Description
ProjectResponseModel

The model of the active project.

Source code in zenml/config/global_config.py
def get_active_project(self) -> "ProjectResponseModel":
    """Get a model of the active project for the local client.

    Returns:
        The model of the active project.
    """
    project_name = self.get_active_project_name()

    if self._active_project is not None:
        return self._active_project

    project = self.zen_store.get_project(
        project_name_or_id=project_name,
    )
    return self.set_active_project(project)
get_active_project_name(self)

Get the name of the active project.

If the active project doesn't exist yet, the ZenStore is reinitialized.

Returns:

Type Description
str

The name of the active project.

Source code in zenml/config/global_config.py
def get_active_project_name(self) -> str:
    """Get the name of the active project.

    If the active project doesn't exist yet, the ZenStore is reinitialized.

    Returns:
        The name of the active project.
    """
    if self.active_project_name is None:
        _ = self.zen_store
        assert self.active_project_name is not None

    return self.active_project_name
get_active_stack_id(self)

Get the ID of the active stack.

If the active stack doesn't exist yet, the ZenStore is reinitialized.

Returns:

Type Description
UUID

The active stack ID.

Source code in zenml/config/global_config.py
def get_active_stack_id(self) -> UUID:
    """Get the ID of the active stack.

    If the active stack doesn't exist yet, the ZenStore is reinitialized.

    Returns:
        The active stack ID.
    """
    if self.active_stack_id is None:
        _ = self.zen_store
        assert self.active_stack_id is not None

    return self.active_stack_id
get_default_store(self)

Get the default store configuration.

Returns:

Type Description
StoreConfiguration

The default store configuration.

Source code in zenml/config/global_config.py
def get_default_store(self) -> StoreConfiguration:
    """Get the default store configuration.

    Returns:
        The default store configuration.
    """
    from zenml.zen_stores.base_zen_store import BaseZenStore

    env_config: Dict[str, str] = {}
    for k, v in os.environ.items():
        if v == "":
            continue
        if k.startswith(ENV_ZENML_STORE_PREFIX):
            env_config[k[len(ENV_ZENML_STORE_PREFIX) :].lower()] = v
    if len(env_config):
        if "type" not in env_config and "url" in env_config:
            env_config["type"] = BaseZenStore.get_store_type(
                env_config["url"]
            )

        logger.debug(
            "Using environment variables to configure the default store"
        )
        return StoreConfiguration(**env_config)

    return BaseZenStore.get_default_store_config(
        path=os.path.join(
            self.local_stores_path,
            DEFAULT_STORE_DIRECTORY_NAME,
        )
    )
get_instance() classmethod

Return the GlobalConfiguration singleton instance.

Returns:

Type Description
Optional[GlobalConfiguration]

The GlobalConfiguration singleton instance or None, if the GlobalConfiguration hasn't been initialized yet.

Source code in zenml/config/global_config.py
@classmethod
def get_instance(cls) -> Optional["GlobalConfiguration"]:
    """Return the GlobalConfiguration singleton instance.

    Returns:
        The GlobalConfiguration singleton instance or None, if the
        GlobalConfiguration hasn't been initialized yet.
    """
    return cls._global_config
record_email_opt_in_out(self, opted_in, email, source)

Set the email address associated with this client.

Parameters:

Name Type Description Default
opted_in bool

Whether the user has opted in to email communication.

required
email Optional[str]

The email address to use for this client, if given.

required
source AnalyticsEventSource

The analytics event source.

required
Source code in zenml/config/global_config.py
def record_email_opt_in_out(
    self,
    opted_in: bool,
    email: Optional[str],
    source: AnalyticsEventSource,
) -> None:
    """Set the email address associated with this client.

    Args:
        opted_in: Whether the user has opted in to email communication.
        email: The email address to use for this client, if given.
        source: The analytics event source.
    """
    # Whenever a new email address is associated with the client, we want
    # to identify the client by that email address. If the email address has
    # been changed, we also want to update the information.
    if opted_in and email and self.user_email != email:
        identify_user(
            {
                "email": email,
                "source": source,
            }
        )
        self.user_email = email

    if (
        self.user_email_opt_in is None
        or opted_in
        and not self.user_email_opt_in
    ):
        # When the user opts out giving the email for the first time, or
        # when the user opts in after opting out (e.g. when connecting to
        # a new server where the account has opt-in enabled), we want to
        # record the information as an analytics event.
        track_event(
            AnalyticsEvent.OPT_IN_OUT_EMAIL,
            {"opted_in": opted_in, "source": source},
        )

        self.user_email_opt_in = opted_in
set_active_project(self, project)

Set the project for the local client.

Parameters:

Name Type Description Default
project ProjectResponseModel

The project to set active.

required

Returns:

Type Description
ProjectResponseModel

The project that was set active.

Source code in zenml/config/global_config.py
def set_active_project(
    self, project: "ProjectResponseModel"
) -> "ProjectResponseModel":
    """Set the project for the local client.

    Args:
        project: The project to set active.

    Returns:
        The project that was set active.
    """
    self.active_project_name = project.name
    self._active_project = project
    # Sanitize the global configuration to reflect the new project
    self._sanitize_config()
    return project
set_active_stack(self, stack)

Set the active stack for the local client.

Parameters:

Name Type Description Default
stack StackResponseModel

The model of the stack to set active.

required
Source code in zenml/config/global_config.py
def set_active_stack(self, stack: "StackResponseModel") -> None:
    """Set the active stack for the local client.

    Args:
        stack: The model of the stack to set active.
    """
    self.active_stack_id = stack.id
set_default_store(self)

Creates and sets the default store configuration.

Call this method to initialize or revert the store configuration to the default store.

Source code in zenml/config/global_config.py
def set_default_store(self) -> None:
    """Creates and sets the default store configuration.

    Call this method to initialize or revert the store configuration to the
    default store.
    """
    with event_handler(
        AnalyticsEvent.INITIALIZED_STORE
    ) as analytics_handler:
        default_store_cfg = self.get_default_store()
        self._configure_store(default_store_cfg)
        logger.info("Using the default store for the global config.")
        analytics_handler.metadata = {
            "store_type": default_store_cfg.type.value
        }
set_store(self, config, skip_default_registrations=False, **kwargs)

Update the active store configuration.

Call this method to validate and update the active store configuration.

Parameters:

Name Type Description Default
config StoreConfiguration

The new store configuration to use.

required
skip_default_registrations bool

If True, the creation of the default stack and user in the store will be skipped.

False
**kwargs Any

Additional keyword arguments to pass to the store constructor.

{}
Source code in zenml/config/global_config.py
def set_store(
    self,
    config: StoreConfiguration,
    skip_default_registrations: bool = False,
    **kwargs: Any,
) -> None:
    """Update the active store configuration.

    Call this method to validate and update the active store configuration.

    Args:
        config: The new store configuration to use.
        skip_default_registrations: If `True`, the creation of the default
            stack and user in the store will be skipped.
        **kwargs: Additional keyword arguments to pass to the store
            constructor.
    """
    with event_handler(
        event=AnalyticsEvent.INITIALIZED_STORE,
        metadata={"store_type": config.type.value},
    ):
        self._configure_store(config, skip_default_registrations, **kwargs)
        logger.info("Updated the global store configuration.")

        if self.zen_store.type == StoreType.REST:
            # Every time a client connects to a ZenML server, we want to
            # group the client ID and the server ID together. This records
            # only that a particular client has successfully connected to a
            # particular server at least once, but no information about the
            # user account is recorded here.

            with event_handler(
                event=AnalyticsEvent.ZENML_SERVER_CONNECTED
            ):
                server_info = self.zen_store.get_store_info()

                identify_group(
                    AnalyticsGroup.ZENML_SERVER_GROUP,
                    group_id=str(server_info.id),
                    group_metadata={
                        "version": server_info.version,
                        "deployment_type": str(
                            server_info.deployment_type
                        ),
                        "database_type": str(server_info.database_type),
                    },
                )
uses_default_store(self)

Check if the global configuration uses the default store.

Returns:

Type Description
bool

True if the global configuration uses the default store.

Source code in zenml/config/global_config.py
def uses_default_store(self) -> bool:
    """Check if the global configuration uses the default store.

    Returns:
        `True` if the global configuration uses the default store.
    """
    return (
        self.store is not None
        and self.store.url == self.get_default_store().url
    )

generate_jwt_secret_key()

Generate a random JWT secret key.

This key is used to sign and verify generated JWT tokens.

Returns:

Type Description
str

A random JWT secret key.

Source code in zenml/config/global_config.py
def generate_jwt_secret_key() -> str:
    """Generate a random JWT secret key.

    This key is used to sign and verify generated JWT tokens.

    Returns:
        A random JWT secret key.
    """
    return token_hex(32)

pipeline_configurations

Pipeline configuration classes.

PipelineConfiguration (PipelineConfigurationUpdate) pydantic-model

Pipeline configuration class.

Source code in zenml/config/pipeline_configurations.py
class PipelineConfiguration(PipelineConfigurationUpdate):
    """Pipeline configuration class."""

    name: str

    @validator("name")
    def ensure_pipeline_name_allowed(cls, name: str) -> str:
        """Ensures the pipeline name is allowed.

        Args:
            name: Name of the pipeline.

        Returns:
            The validated name of the pipeline.

        Raises:
            ValueError: If the name is not allowed.
        """
        if name in DISALLOWED_PIPELINE_NAMES:
            raise ValueError(
                f"Pipeline name '{name}' is not allowed since '{name}' is a "
                "reserved key word. Please choose another name."
            )
        return name

    @property
    def docker_settings(self) -> "DockerSettings":
        """Docker settings of this pipeline.

        Returns:
            The Docker settings of this pipeline.
        """
        from zenml.config import DockerSettings

        model_or_dict: SettingsOrDict = self.settings.get(
            DOCKER_SETTINGS_KEY, {}
        )
        return DockerSettings.parse_obj(model_or_dict)
docker_settings: DockerSettings property readonly

Docker settings of this pipeline.

Returns:

Type Description
DockerSettings

The Docker settings of this pipeline.

ensure_pipeline_name_allowed(name) classmethod

Ensures the pipeline name is allowed.

Parameters:

Name Type Description Default
name str

Name of the pipeline.

required

Returns:

Type Description
str

The validated name of the pipeline.

Exceptions:

Type Description
ValueError

If the name is not allowed.

Source code in zenml/config/pipeline_configurations.py
@validator("name")
def ensure_pipeline_name_allowed(cls, name: str) -> str:
    """Ensures the pipeline name is allowed.

    Args:
        name: Name of the pipeline.

    Returns:
        The validated name of the pipeline.

    Raises:
        ValueError: If the name is not allowed.
    """
    if name in DISALLOWED_PIPELINE_NAMES:
        raise ValueError(
            f"Pipeline name '{name}' is not allowed since '{name}' is a "
            "reserved key word. Please choose another name."
        )
    return name

PipelineConfigurationUpdate (StrictBaseModel) pydantic-model

Class for pipeline configuration updates.

Source code in zenml/config/pipeline_configurations.py
class PipelineConfigurationUpdate(StrictBaseModel):
    """Class for pipeline configuration updates."""

    enable_cache: Optional[bool] = None
    settings: Dict[str, BaseSettings] = {}
    extra: Dict[str, Any] = {}

PipelineRunConfiguration (StrictBaseModel) pydantic-model

Class for pipeline run configurations.

Source code in zenml/config/pipeline_configurations.py
class PipelineRunConfiguration(StrictBaseModel):
    """Class for pipeline run configurations."""

    run_name: Optional[str] = None
    enable_cache: Optional[bool] = None
    schedule: Optional[Schedule] = None
    steps: Dict[str, StepConfigurationUpdate] = {}
    settings: Dict[str, BaseSettings] = {}
    extra: Dict[str, Any] = {}

PipelineSpec (StrictBaseModel) pydantic-model

Specification of a pipeline.

Source code in zenml/config/pipeline_configurations.py
class PipelineSpec(StrictBaseModel):
    """Specification of a pipeline."""

    version: str = "0.1"
    steps: List[StepSpec]

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same pipeline.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same pipeline.
        """
        if isinstance(other, PipelineSpec):
            return self.steps == other.steps
        return NotImplemented
__eq__(self, other) special

Returns whether the other object is referring to the same pipeline.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same pipeline.

Source code in zenml/config/pipeline_configurations.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same pipeline.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same pipeline.
    """
    if isinstance(other, PipelineSpec):
        return self.steps == other.steps
    return NotImplemented

pipeline_deployment

Pipeline deployment.

PipelineDeployment (StrictBaseModel) pydantic-model

Class representing the deployment of a ZenML pipeline.

Source code in zenml/config/pipeline_deployment.py
class PipelineDeployment(StrictBaseModel):
    """Class representing the deployment of a ZenML pipeline."""

    run_name: str
    schedule: Optional[Schedule] = None
    schedule_id: Optional[UUID] = None
    stack_id: UUID
    pipeline: PipelineConfiguration
    pipeline_id: Optional[UUID] = None
    steps: Dict[str, Step] = {}
    zenml_version: str = zenml.__version__
    client_environment: Dict[str, str] = {}

    def add_extra(self, key: str, value: Any) -> None:
        """Adds an extra key-value pair to the pipeline configuration.

        Args:
            key: Key for which to add the extra value.
            value: The extra value.
        """
        self.pipeline.extra[key] = value

    def yaml(self, **kwargs: Any) -> str:
        """Yaml representation of the deployment.

        Args:
            **kwargs: Kwargs to pass to the pydantic json(...) method.

        Returns:
            Yaml string representation of the deployment.
        """
        dict_ = json.loads(self.json(**kwargs, sort_keys=False))
        return cast(str, yaml.dump(dict_, sort_keys=False))
add_extra(self, key, value)

Adds an extra key-value pair to the pipeline configuration.

Parameters:

Name Type Description Default
key str

Key for which to add the extra value.

required
value Any

The extra value.

required
Source code in zenml/config/pipeline_deployment.py
def add_extra(self, key: str, value: Any) -> None:
    """Adds an extra key-value pair to the pipeline configuration.

    Args:
        key: Key for which to add the extra value.
        value: The extra value.
    """
    self.pipeline.extra[key] = value
yaml(self, **kwargs)

Yaml representation of the deployment.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs to pass to the pydantic json(...) method.

{}

Returns:

Type Description
str

Yaml string representation of the deployment.

Source code in zenml/config/pipeline_deployment.py
def yaml(self, **kwargs: Any) -> str:
    """Yaml representation of the deployment.

    Args:
        **kwargs: Kwargs to pass to the pydantic json(...) method.

    Returns:
        Yaml string representation of the deployment.
    """
    dict_ = json.loads(self.json(**kwargs, sort_keys=False))
    return cast(str, yaml.dump(dict_, sort_keys=False))

resource_settings

Resource settings class used to specify resources for a step.

ByteUnit (Enum)

Enum for byte units.

Source code in zenml/config/resource_settings.py
class ByteUnit(Enum):
    """Enum for byte units."""

    KB = "KB"
    KIB = "KiB"
    MB = "MB"
    MIB = "MiB"
    GB = "GB"
    GIB = "GiB"
    TB = "TB"
    TIB = "TiB"
    PB = "PB"
    PIB = "PiB"

    @property
    def byte_value(self) -> int:
        """Returns the amount of bytes that this unit represents.

        Returns:
            The byte value of this unit.
        """
        return {
            ByteUnit.KB: 10**3,
            ByteUnit.KIB: 1 << 10,
            ByteUnit.MB: 10**6,
            ByteUnit.MIB: 1 << 20,
            ByteUnit.GB: 10**9,
            ByteUnit.GIB: 1 << 30,
            ByteUnit.TB: 10**12,
            ByteUnit.TIB: 1 << 40,
            ByteUnit.PB: 10**15,
            ByteUnit.PIB: 1 << 50,
        }[self]

ResourceSettings (BaseSettings) pydantic-model

Hardware resource settings.

Attributes:

Name Type Description
cpu_count Optional[pydantic.types.PositiveFloat]

The amount of CPU cores that should be configured.

gpu_count Optional[pydantic.types.NonNegativeInt]

The amount of GPUs that should be configured.

memory Optional[str]

The amount of memory that should be configured.

Source code in zenml/config/resource_settings.py
class ResourceSettings(BaseSettings):
    """Hardware resource settings.

    Attributes:
        cpu_count: The amount of CPU cores that should be configured.
        gpu_count: The amount of GPUs that should be configured.
        memory: The amount of memory that should be configured.
    """

    cpu_count: Optional[PositiveFloat] = None
    gpu_count: Optional[NonNegativeInt] = None
    memory: Optional[str] = Field(regex=MEMORY_REGEX)

    @property
    def empty(self) -> bool:
        """Returns if this object is "empty" (=no values configured) or not.

        Returns:
            `True` if no values were configured, `False` otherwise.
        """
        # To detect whether this config is empty (= no values specified), we
        # check if there are any attributes which are explicitly set to any
        # value other than `None`.
        return len(self.dict(exclude_unset=True, exclude_none=True)) == 0

    def get_memory(
        self, unit: Union[str, ByteUnit] = ByteUnit.GB
    ) -> Optional[float]:
        """Gets the memory configuration in a specific unit.

        Args:
            unit: The unit to which the memory should be converted.

        Raises:
            ValueError: If the memory string is invalid.

        Returns:
            The memory configuration converted to the requested unit, or None
            if no memory was configured.
        """
        if not self.memory:
            return None

        if isinstance(unit, str):
            unit = ByteUnit(unit)

        memory = self.memory
        for memory_unit in ByteUnit:
            if memory.endswith(memory_unit.value):
                memory_value = int(memory[: -len(memory_unit.value)])
                return memory_value * memory_unit.byte_value / unit.byte_value
        else:
            # Should never happen due to the regex validation
            raise ValueError(f"Unable to parse memory unit from '{memory}'.")

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False

        # prevent extra attributes during model initialization
        extra = Extra.forbid
empty: bool property readonly

Returns if this object is "empty" (=no values configured) or not.

Returns:

Type Description
bool

True if no values were configured, False otherwise.

Config

Pydantic configuration class.

Source code in zenml/config/resource_settings.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False

    # prevent extra attributes during model initialization
    extra = Extra.forbid
get_memory(self, unit=<ByteUnit.GB: 'GB'>)

Gets the memory configuration in a specific unit.

Parameters:

Name Type Description Default
unit Union[str, zenml.config.resource_settings.ByteUnit]

The unit to which the memory should be converted.

<ByteUnit.GB: 'GB'>

Exceptions:

Type Description
ValueError

If the memory string is invalid.

Returns:

Type Description
Optional[float]

The memory configuration converted to the requested unit, or None if no memory was configured.

Source code in zenml/config/resource_settings.py
def get_memory(
    self, unit: Union[str, ByteUnit] = ByteUnit.GB
) -> Optional[float]:
    """Gets the memory configuration in a specific unit.

    Args:
        unit: The unit to which the memory should be converted.

    Raises:
        ValueError: If the memory string is invalid.

    Returns:
        The memory configuration converted to the requested unit, or None
        if no memory was configured.
    """
    if not self.memory:
        return None

    if isinstance(unit, str):
        unit = ByteUnit(unit)

    memory = self.memory
    for memory_unit in ByteUnit:
        if memory.endswith(memory_unit.value):
            memory_value = int(memory[: -len(memory_unit.value)])
            return memory_value * memory_unit.byte_value / unit.byte_value
    else:
        # Should never happen due to the regex validation
        raise ValueError(f"Unable to parse memory unit from '{memory}'.")

schedule

Class for defining a pipeline schedule.

Schedule (BaseModel) pydantic-model

Class for defining a pipeline schedule.

Attributes:

Name Type Description
name Optional[str]

Optional name to give to the schedule. If not set, a default name will be generated based on the pipeline name and the current date and time.

cron_expression Optional[str]

Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval.

start_time Optional[datetime.datetime]

datetime object to indicate when to start the schedule.

end_time Optional[datetime.datetime]

datetime object to indicate when to end the schedule.

interval_second Optional[datetime.timedelta]

datetime timedelta indicating the seconds between two recurring runs for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

Source code in zenml/config/schedule.py
class Schedule(BaseModel):
    """Class for defining a pipeline schedule.

    Attributes:
        name: Optional name to give to the schedule. If not set, a default name
            will be generated based on the pipeline name and the current date
            and time.
        cron_expression: Cron expression for the pipeline schedule. If a value
            for this is set it takes precedence over the start time + interval.
        start_time: datetime object to indicate when to start the schedule.
        end_time: datetime object to indicate when to end the schedule.
        interval_second: datetime timedelta indicating the seconds between two
            recurring runs for a periodic schedule.
        catchup: Whether the recurring run should catch up if behind schedule.
            For example, if the recurring run is paused for a while and
            re-enabled afterwards. If catchup=True, the scheduler will catch
            up on (backfill) each missed interval. Otherwise, it only
            schedules the latest interval if more than one interval is ready to
            be scheduled. Usually, if your pipeline handles backfill
            internally, you should turn catchup off to avoid duplicate backfill.
    """

    name: Optional[str] = None
    cron_expression: Optional[str] = None
    start_time: Optional[datetime.datetime] = None
    end_time: Optional[datetime.datetime] = None
    interval_second: Optional[datetime.timedelta] = None
    catchup: bool = False

    @root_validator
    def _ensure_cron_or_periodic_schedule_configured(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Ensures that the cron expression or start time + interval are set.

        Args:
            values: All attributes of the schedule.

        Returns:
            All schedule attributes.

        Raises:
            ValueError: If no cron expression or start time + interval were
                provided.
        """
        cron_expression = values.get("cron_expression")
        periodic_schedule = values.get("start_time") and values.get(
            "interval_second"
        )

        if cron_expression and periodic_schedule:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "values for `start_time` and `interval_seconds`. The resulting "
                "behavior depends on the concrete orchestrator implementation "
                "but will usually ignore the interval and use the cron "
                "expression."
            )
            return values
        elif cron_expression or periodic_schedule:
            return values
        else:
            raise ValueError(
                "Either a cron expression or start time and interval seconds "
                "need to be set for a valid schedule."
            )

    @property
    def utc_start_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC start time.

        Returns:
            Optional ISO-formatted string of the UTC start time.
        """
        if not self.start_time:
            return None

        return self.start_time.astimezone(datetime.timezone.utc).isoformat()

    @property
    def utc_end_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC end time.

        Returns:
            Optional ISO-formatted string of the UTC end time.
        """
        if not self.end_time:
            return None

        return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC end time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC end time.

utc_start_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC start time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC start time.

secret_reference_mixin

Secret reference mixin implementation.

SecretReferenceMixin (BaseModel) pydantic-model

Mixin class for secret references in pydantic model attributes.

Source code in zenml/config/secret_reference_mixin.py
class SecretReferenceMixin(BaseModel):
    """Mixin class for secret references in pydantic model attributes."""

    def __init__(self, **kwargs: Any) -> None:
        """Ensures that secret references are only passed for valid fields.

        This method ensures that secret references are not passed for fields
        that explicitly prevent them or require pydantic validation.

        Args:
            **kwargs: Arguments to initialize this object.

        Raises:
            ValueError: If an attribute that requires custom pydantic validation
                or an attribute which explicitly disallows secret references is
                is passed as a secret reference.
        """
        for key, value in kwargs.items():
            try:
                field = self.__class__.__fields__[key]
            except KeyError:
                # Value for a private attribute or non-existing field, this
                # will fail during the upcoming pydantic validation
                continue

            if value is None:
                continue

            if not secret_utils.is_secret_reference(value):
                if secret_utils.is_secret_field(field):
                    logger.warning(
                        "You specified a plain-text value for the sensitive "
                        f"attribute `{key}`. This is currently only a warning, "
                        "but future versions of ZenML will require you to pass "
                        "in sensitive information as secrets. Check out the "
                        "documentation on how to configure values with secrets "
                        "here: https://docs.zenml.io/advanced-guide/practical/secrets-management"
                    )
                continue

            if secret_utils.is_clear_text_field(field):
                raise ValueError(
                    f"Passing the `{key}` attribute as a secret reference is "
                    "not allowed."
                )

            requires_validation = field.pre_validators or field.post_validators
            if requires_validation:
                raise ValueError(
                    f"Passing the attribute `{key}` as a secret reference is "
                    "not allowed as additional validation is required for "
                    "this attribute."
                )

        super().__init__(**kwargs)

    def __custom_getattribute__(self, key: str) -> Any:
        """Returns the (potentially resolved) attribute value for the given key.

        An attribute value may be either specified directly, or as a secret
        reference. In case of a secret reference, this method resolves the
        reference and returns the secret value instead.

        Args:
            key: The key for which to get the attribute value.

        Raises:
            RuntimeError: If the active stack is missing a secrets manager.
            KeyError: If the secret or secret key don't exist.

        Returns:
            The (potentially resolved) attribute value.
        """
        value = super().__getattribute__(key)

        if not secret_utils.is_secret_reference(value):
            return value

        from zenml.client import Client

        secrets_manager = Client().active_stack.secrets_manager
        if not secrets_manager:
            raise RuntimeError(
                f"Failed to resolve secret reference for attribute {key}: "
                "The active stack does not have a secrets manager."
            )

        secret_ref = secret_utils.parse_secret_reference(value)
        try:
            secret = secrets_manager.get_secret(secret_ref.name)
        except KeyError:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key}: "
                f"The secret {secret_ref.name} does not exist."
            )

        try:
            secret_value = secret.content[secret_ref.key]
        except KeyError:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key}: "
                f"The secret {secret_ref.name} does not contain a value for key "
                f"{secret_ref.key}. Available keys: {set(secret.content)}."
            )

        return str(secret_value)

    if not TYPE_CHECKING:
        # When defining __getattribute__, mypy allows accessing non-existent
        # attributes without failing
        # (see https://github.com/python/mypy/issues/13319).
        __getattribute__ = __custom_getattribute__

    @property
    def required_secrets(self) -> Set[secret_utils.SecretReference]:
        """All required secrets for this object.

        Returns:
            The required secrets of this object.
        """
        return {
            secret_utils.parse_secret_reference(v)
            for v in self.dict().values()
            if secret_utils.is_secret_reference(v)
        }
required_secrets: Set[zenml.utils.secret_utils.SecretReference] property readonly

All required secrets for this object.

Returns:

Type Description
Set[zenml.utils.secret_utils.SecretReference]

The required secrets of this object.

__custom_getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
RuntimeError

If the active stack is missing a secrets manager.

KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/config/secret_reference_mixin.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        RuntimeError: If the active stack is missing a secrets manager.
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    from zenml.client import Client

    secrets_manager = Client().active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key}: "
            "The active stack does not have a secrets manager."
        )

    secret_ref = secret_utils.parse_secret_reference(value)
    try:
        secret = secrets_manager.get_secret(secret_ref.name)
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not exist."
        )

    try:
        secret_value = secret.content[secret_ref.key]
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not contain a value for key "
            f"{secret_ref.key}. Available keys: {set(secret.content)}."
        )

    return str(secret_value)
__getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
RuntimeError

If the active stack is missing a secrets manager.

KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/config/secret_reference_mixin.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        RuntimeError: If the active stack is missing a secrets manager.
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    from zenml.client import Client

    secrets_manager = Client().active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key}: "
            "The active stack does not have a secrets manager."
        )

    secret_ref = secret_utils.parse_secret_reference(value)
    try:
        secret = secrets_manager.get_secret(secret_ref.name)
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not exist."
        )

    try:
        secret_value = secret.content[secret_ref.key]
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not contain a value for key "
            f"{secret_ref.key}. Available keys: {set(secret.content)}."
        )

    return str(secret_value)
__init__(self, **kwargs) special

Ensures that secret references are only passed for valid fields.

This method ensures that secret references are not passed for fields that explicitly prevent them or require pydantic validation.

Parameters:

Name Type Description Default
**kwargs Any

Arguments to initialize this object.

{}

Exceptions:

Type Description
ValueError

If an attribute that requires custom pydantic validation or an attribute which explicitly disallows secret references is is passed as a secret reference.

Source code in zenml/config/secret_reference_mixin.py
def __init__(self, **kwargs: Any) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references is
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.__fields__[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if secret_utils.is_secret_field(field):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/advanced-guide/practical/secrets-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = field.pre_validators or field.post_validators
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)

settings_resolver

Class for resolving settings.

SettingsResolver

Class for resolving settings.

This class converts a BaseSettings instance to the correct subclass depending on the key for which these settings were specified.

Source code in zenml/config/settings_resolver.py
class SettingsResolver:
    """Class for resolving settings.

    This class converts a `BaseSettings` instance to the correct subclass
    depending on the key for which these settings were specified.
    """

    def __init__(self, key: str, settings: "BaseSettings"):
        """Checks if the settings key is valid.

        Args:
            key: Settings key.
            settings: The settings.

        Raises:
            ValueError: If the settings key is invalid.
        """
        if not settings_utils.is_valid_setting_key(key):
            raise ValueError(
                f"Invalid setting key `{key}`. Setting keys can either refer "
                "to general settings (available keys: "
                f"{set(settings_utils.get_general_settings())}) or stack "
                "component specific settings. Stack component specific keys "
                "are of the format "
                "`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
            )

        self._key = key
        self._settings = settings

    def resolve(self, stack: "Stack") -> "BaseSettings":
        """Resolves settings for the given stack.

        Args:
            stack: The stack for which to resolve the settings.

        Returns:
            The resolved settings.
        """
        if settings_utils.is_general_setting_key(self._key):
            target_class = self._resolve_general_settings_class()
        else:
            target_class = self._resolve_stack_component_setting_class(
                stack=stack
            )

        return self._convert_settings(target_class=target_class)

    def _resolve_general_settings_class(
        self,
    ) -> Type["BaseSettings"]:
        """Resolves general settings.

        Returns:
            The resolved settings.
        """
        return settings_utils.get_general_settings()[self._key]

    def _resolve_stack_component_setting_class(
        self, stack: "Stack"
    ) -> Type["BaseSettings"]:
        """Resolves stack component settings with the given stack.

        Args:
            stack: The stack to use for resolving.

        Raises:
            KeyError: If the stack contains no settings for the key.

        Returns:
            The resolved settings.
        """
        settings_class = stack.setting_classes.get(self._key)
        if not settings_class:
            raise KeyError(
                f"Failed to resolve settings for key {self._key}: "
                "No settings for this key exist in the stack. "
                "Available settings: "
                f"{set(stack.setting_classes)}"
            )

        return settings_class

    def _convert_settings(self, target_class: Type["T"]) -> "T":
        """Converts the settings to their correct class.

        Args:
            target_class: The correct settings class.

        Raises:
            SettingsResolvingError: If the conversion failed.

        Returns:
            The converted settings.
        """
        settings_dict = self._settings.dict()
        try:
            return target_class(**settings_dict)
        except ValidationError:
            raise SettingsResolvingError(
                f"Failed to convert settings `{settings_dict}` to expected "
                f"class {target_class}."
            )
__init__(self, key, settings) special

Checks if the settings key is valid.

Parameters:

Name Type Description Default
key str

Settings key.

required
settings BaseSettings

The settings.

required

Exceptions:

Type Description
ValueError

If the settings key is invalid.

Source code in zenml/config/settings_resolver.py
def __init__(self, key: str, settings: "BaseSettings"):
    """Checks if the settings key is valid.

    Args:
        key: Settings key.
        settings: The settings.

    Raises:
        ValueError: If the settings key is invalid.
    """
    if not settings_utils.is_valid_setting_key(key):
        raise ValueError(
            f"Invalid setting key `{key}`. Setting keys can either refer "
            "to general settings (available keys: "
            f"{set(settings_utils.get_general_settings())}) or stack "
            "component specific settings. Stack component specific keys "
            "are of the format "
            "`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
        )

    self._key = key
    self._settings = settings
resolve(self, stack)

Resolves settings for the given stack.

Parameters:

Name Type Description Default
stack Stack

The stack for which to resolve the settings.

required

Returns:

Type Description
BaseSettings

The resolved settings.

Source code in zenml/config/settings_resolver.py
def resolve(self, stack: "Stack") -> "BaseSettings":
    """Resolves settings for the given stack.

    Args:
        stack: The stack for which to resolve the settings.

    Returns:
        The resolved settings.
    """
    if settings_utils.is_general_setting_key(self._key):
        target_class = self._resolve_general_settings_class()
    else:
        target_class = self._resolve_stack_component_setting_class(
            stack=stack
        )

    return self._convert_settings(target_class=target_class)

step_configurations

Pipeline configuration classes.

ArtifactConfiguration (PartialArtifactConfiguration) pydantic-model

Class representing a complete input/output artifact configuration.

Source code in zenml/config/step_configurations.py
class ArtifactConfiguration(PartialArtifactConfiguration):
    """Class representing a complete input/output artifact configuration."""

    materializer_source: str

InputSpec (StrictBaseModel) pydantic-model

Step input specification.

Source code in zenml/config/step_configurations.py
class InputSpec(StrictBaseModel):
    """Step input specification."""

    step_name: str
    output_name: str

PartialArtifactConfiguration (StrictBaseModel) pydantic-model

Class representing a partial input/output artifact configuration.

Source code in zenml/config/step_configurations.py
class PartialArtifactConfiguration(StrictBaseModel):
    """Class representing a partial input/output artifact configuration."""

    materializer_source: Optional[str] = None

    @root_validator(pre=True)
    def _remove_deprecated_attributes(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Removes deprecated attributes from the values dict.

        Args:
            values: The values dict used to instantiate the model.

        Returns:
            The values dict without deprecated attributes.
        """
        deprecated_attributes = ["artifact_source"]
        for deprecated_attribute in deprecated_attributes:
            if deprecated_attribute in values:
                values.pop(deprecated_attribute)
        return values

PartialStepConfiguration (StepConfigurationUpdate) pydantic-model

Class representing a partial step configuration.

Source code in zenml/config/step_configurations.py
class PartialStepConfiguration(StepConfigurationUpdate):
    """Class representing a partial step configuration."""

    name: str
    caching_parameters: Mapping[str, Any] = {}
    inputs: Mapping[str, PartialArtifactConfiguration] = {}
    outputs: Mapping[str, PartialArtifactConfiguration] = {}

    @root_validator(pre=True)
    def _remove_deprecated_attributes(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Removes deprecated attributes from the values dict.

        Args:
            values: The values dict used to instantiate the model.

        Returns:
            The values dict without deprecated attributes.
        """
        deprecated_attributes = ["docstring"]
        for deprecated_attribute in deprecated_attributes:
            if deprecated_attribute in values:
                values.pop(deprecated_attribute)
        return values

Step (StrictBaseModel) pydantic-model

Class representing a ZenML step.

Source code in zenml/config/step_configurations.py
class Step(StrictBaseModel):
    """Class representing a ZenML step."""

    spec: StepSpec
    config: StepConfiguration

StepConfiguration (PartialStepConfiguration) pydantic-model

Step configuration class.

Source code in zenml/config/step_configurations.py
class StepConfiguration(PartialStepConfiguration):
    """Step configuration class."""

    inputs: Mapping[str, ArtifactConfiguration] = {}
    outputs: Mapping[str, ArtifactConfiguration] = {}

    @property
    def resource_settings(self) -> "ResourceSettings":
        """Resource settings of this step configuration.

        Returns:
            The resource settings of this step configuration.
        """
        from zenml.config import ResourceSettings

        model_or_dict: SettingsOrDict = self.settings.get(
            RESOURCE_SETTINGS_KEY, {}
        )
        return ResourceSettings.parse_obj(model_or_dict)
resource_settings: ResourceSettings property readonly

Resource settings of this step configuration.

Returns:

Type Description
ResourceSettings

The resource settings of this step configuration.

StepConfigurationUpdate (StrictBaseModel) pydantic-model

Class for step configuration updates.

Source code in zenml/config/step_configurations.py
class StepConfigurationUpdate(StrictBaseModel):
    """Class for step configuration updates."""

    name: Optional[str] = None
    enable_cache: Optional[bool] = None
    step_operator: Optional[str] = None
    experiment_tracker: Optional[str] = None
    parameters: Dict[str, Any] = {}
    settings: Dict[str, BaseSettings] = {}
    extra: Dict[str, Any] = {}

    outputs: Mapping[str, PartialArtifactConfiguration] = {}

StepSpec (StrictBaseModel) pydantic-model

Specification of a pipeline.

Source code in zenml/config/step_configurations.py
class StepSpec(StrictBaseModel):
    """Specification of a pipeline."""

    source: str
    upstream_steps: List[str]
    inputs: Dict[str, InputSpec] = {}

    @property
    def module_name(self) -> str:
        """The step module name.

        Returns:
            The step module name.
        """
        module_name, _ = self.source.rsplit(".", maxsplit=1)
        return module_name

    @property
    def class_name(self) -> str:
        """The step class name.

        Returns:
            The step class name.
        """
        _, class_name = self.source.rsplit(".", maxsplit=1)
        return class_name

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same step.

        This is the case if the other objects is a `StepSpec` with the same
        `upstream_steps` and a `source` that meets one of the following
        conditions:
            - it is the same as the `source` of this step
            - it refers to the same absolute path as the `source` of this step

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same step.
        """
        if isinstance(other, StepSpec):
            if self.upstream_steps != other.upstream_steps:
                return False

            # TODO: rethink this once we have pipeline versioning
            # for now we don't compare the inputs because that would force
            # users to re-register their pipeline if they change an output or
            # input name
            # if self.inputs != other.inputs:
            #     return False

            # Remove internal version pin from older sources for backwards
            # compatibility
            source = source_utils.remove_internal_version_pin(self.source)
            other_source = source_utils.remove_internal_version_pin(
                other.source
            )

            if source == other_source:
                return True

            if source.endswith(other_source):
                return True

            if other_source.endswith(source):
                return True

            return False

        return NotImplemented
class_name: str property readonly

The step class name.

Returns:

Type Description
str

The step class name.

module_name: str property readonly

The step module name.

Returns:

Type Description
str

The step module name.

__eq__(self, other) special

Returns whether the other object is referring to the same step.

This is the case if the other objects is a StepSpec with the same upstream_steps and a source that meets one of the following !!! conditions - it is the same as the source of this step - it refers to the same absolute path as the source of this step

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same step.

Source code in zenml/config/step_configurations.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same step.

    This is the case if the other objects is a `StepSpec` with the same
    `upstream_steps` and a `source` that meets one of the following
    conditions:
        - it is the same as the `source` of this step
        - it refers to the same absolute path as the `source` of this step

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same step.
    """
    if isinstance(other, StepSpec):
        if self.upstream_steps != other.upstream_steps:
            return False

        # TODO: rethink this once we have pipeline versioning
        # for now we don't compare the inputs because that would force
        # users to re-register their pipeline if they change an output or
        # input name
        # if self.inputs != other.inputs:
        #     return False

        # Remove internal version pin from older sources for backwards
        # compatibility
        source = source_utils.remove_internal_version_pin(self.source)
        other_source = source_utils.remove_internal_version_pin(
            other.source
        )

        if source == other_source:
            return True

        if source.endswith(other_source):
            return True

        if other_source.endswith(source):
            return True

        return False

    return NotImplemented

step_run_info

Step run info.

StepRunInfo (StrictBaseModel) pydantic-model

All information necessary to run a step.

Source code in zenml/config/step_run_info.py
class StepRunInfo(StrictBaseModel):
    """All information necessary to run a step."""

    step_run_id: UUID
    run_id: UUID
    run_name: str

    config: StepConfiguration
    pipeline: PipelineConfiguration

store_config

Functionality to support ZenML store configurations.

StoreConfiguration (BaseModel) pydantic-model

Generic store configuration.

The store configurations of concrete store implementations must inherit from this class and validate any extra attributes that are configured in addition to those defined in this class.

Attributes:

Name Type Description
type StoreType

The type of store backend.

url str

The URL of the store backend.

Source code in zenml/config/store_config.py
class StoreConfiguration(BaseModel):
    """Generic store configuration.

    The store configurations of concrete store implementations must inherit from
    this class and validate any extra attributes that are configured in addition
    to those defined in this class.

    Attributes:
        type: The type of store backend.
        url: The URL of the store backend.
    """

    type: StoreType
    url: str

    @classmethod
    def copy_configuration(
        cls,
        config: "StoreConfiguration",
        config_path: str,
        load_config_path: Optional[PurePath] = None,
    ) -> "StoreConfiguration":
        """Create a copy of the store config using a different configuration path.

        This method is used to create a copy of the store configuration that can
        be loaded using a different configuration path or in the context of a
        new environment, such as a container image.

        The configuration files accompanying the store configuration are also
        copied to the new configuration path (e.g. certificates etc.).

        Args:
            config: The store configuration to copy.
            config_path: new path where the configuration copy will be loaded
                from.
            load_config_path: absolute path that will be used to load the copied
                configuration. This can be set to a value different from
                `config_path` if the configuration copy will be loaded from
                a different environment, e.g. when the configuration is copied
                to a container image and loaded using a different absolute path.
                This will be reflected in the paths and URLs encoded in the
                copied configuration.

        Returns:
            A new store configuration object that reflects the new configuration
            path.
        """
        return config.copy()

    @classmethod
    def supports_url_scheme(cls, url: str) -> bool:
        """Check if a URL scheme is supported by this store.

        Concrete store configuration classes should override this method to
        check if a URL scheme is supported by the store.

        Args:
            url: The URL to check.

        Returns:
            True if the URL scheme is supported, False otherwise.
        """
        return True

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes to be set in the base class. The concrete
        # classes are responsible for validating the attributes.
        extra = "allow"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
Config

Pydantic configuration class.

Source code in zenml/config/store_config.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes to be set in the base class. The concrete
    # classes are responsible for validating the attributes.
    extra = "allow"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
copy_configuration(config, config_path, load_config_path=None) classmethod

Create a copy of the store config using a different configuration path.

This method is used to create a copy of the store configuration that can be loaded using a different configuration path or in the context of a new environment, such as a container image.

The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.).

Parameters:

Name Type Description Default
config StoreConfiguration

The store configuration to copy.

required
config_path str

new path where the configuration copy will be loaded from.

required
load_config_path Optional[pathlib.PurePath]

absolute path that will be used to load the copied configuration. This can be set to a value different from config_path if the configuration copy will be loaded from a different environment, e.g. when the configuration is copied to a container image and loaded using a different absolute path. This will be reflected in the paths and URLs encoded in the copied configuration.

None

Returns:

Type Description
StoreConfiguration

A new store configuration object that reflects the new configuration path.

Source code in zenml/config/store_config.py
@classmethod
def copy_configuration(
    cls,
    config: "StoreConfiguration",
    config_path: str,
    load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
    """Create a copy of the store config using a different configuration path.

    This method is used to create a copy of the store configuration that can
    be loaded using a different configuration path or in the context of a
    new environment, such as a container image.

    The configuration files accompanying the store configuration are also
    copied to the new configuration path (e.g. certificates etc.).

    Args:
        config: The store configuration to copy.
        config_path: new path where the configuration copy will be loaded
            from.
        load_config_path: absolute path that will be used to load the copied
            configuration. This can be set to a value different from
            `config_path` if the configuration copy will be loaded from
            a different environment, e.g. when the configuration is copied
            to a container image and loaded using a different absolute path.
            This will be reflected in the paths and URLs encoded in the
            copied configuration.

    Returns:
        A new store configuration object that reflects the new configuration
        path.
    """
    return config.copy()
supports_url_scheme(url) classmethod

Check if a URL scheme is supported by this store.

Concrete store configuration classes should override this method to check if a URL scheme is supported by the store.

Parameters:

Name Type Description Default
url str

The URL to check.

required

Returns:

Type Description
bool

True if the URL scheme is supported, False otherwise.

Source code in zenml/config/store_config.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
    """Check if a URL scheme is supported by this store.

    Concrete store configuration classes should override this method to
    check if a URL scheme is supported by the store.

    Args:
        url: The URL to check.

    Returns:
        True if the URL scheme is supported, False otherwise.
    """
    return True

strict_base_model

Strict immutable pydantic model.

StrictBaseModel (BaseModel) pydantic-model

Immutable pydantic model which prevents extra attributes.

Source code in zenml/config/strict_base_model.py
class StrictBaseModel(BaseModel):
    """Immutable pydantic model which prevents extra attributes."""

    class Config:
        """Pydantic config class."""

        allow_mutation = False
        extra = Extra.forbid
Config

Pydantic config class.

Source code in zenml/config/strict_base_model.py
class Config:
    """Pydantic config class."""

    allow_mutation = False
    extra = Extra.forbid