Skip to content

Config

zenml.config special

The config module contains classes and functions that manage user-specific configuration.

ZenML's configuration is stored in a file called config.yaml, located on the user's directory for configuration files. (The exact location differs from operating system to operating system.)

The GlobalConfiguration class is the main class in this module. It provides a Pydantic configuration object that is used to store and retrieve configuration. This GlobalConfiguration object handles the serialization and deserialization of the configuration options that are stored in the file in order to persist the configuration across sessions.

base_settings

Base class for all ZenML settings.

BaseSettings (SecretReferenceMixin) pydantic-model

Base class for settings.

The LEVEL class variable defines on which level the settings can be specified. By default, subclasses can be defined on both pipelines and steps.

Source code in zenml/config/base_settings.py
class BaseSettings(SecretReferenceMixin):
    """Base class for settings.

    The `LEVEL` class variable defines on which level the settings can be
    specified. By default, subclasses can be defined on both pipelines and
    steps.
    """

    LEVEL: ClassVar[ConfigurationLevel] = (
        ConfigurationLevel.PIPELINE | ConfigurationLevel.STEP
    )

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # allow extra attributes so this class can be used to parse dicts
        # of arbitrary subclasses
        extra = Extra.allow
Config

Pydantic configuration class.

Source code in zenml/config/base_settings.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # allow extra attributes so this class can be used to parse dicts
    # of arbitrary subclasses
    extra = Extra.allow

ConfigurationLevel (IntFlag)

Settings configuration level.

Bit flag that can be used to specify where a BaseSettings subclass can be specified.

Source code in zenml/config/base_settings.py
class ConfigurationLevel(IntFlag):
    """Settings configuration level.

    Bit flag that can be used to specify where a `BaseSettings` subclass
    can be specified.
    """

    STEP = auto()
    PIPELINE = auto()

build_configuration

Build configuration class.

BuildConfiguration (BaseModel) pydantic-model

Configuration of Docker builds.

Attributes:

Name Type Description
key str

The key to store the build.

settings DockerSettings

Settings for the build.

step_name Optional[str]

Name of the step for which this image will be built.

entrypoint Optional[str]

Optional entrypoint for the image.

extra_files Dict[str, str]

Extra files to include in the Docker image.

Source code in zenml/config/build_configuration.py
class BuildConfiguration(BaseModel):
    """Configuration of Docker builds.

    Attributes:
        key: The key to store the build.
        settings: Settings for the build.
        step_name: Name of the step for which this image will be built.
        entrypoint: Optional entrypoint for the image.
        extra_files: Extra files to include in the Docker image.
    """

    key: str
    settings: DockerSettings
    step_name: Optional[str] = None
    entrypoint: Optional[str] = None
    extra_files: Dict[str, str] = {}

    def compute_settings_checksum(
        self,
        stack: "Stack",
        code_repository: Optional["BaseCodeRepository"] = None,
    ) -> str:
        """Checksum for all build settings.

        Args:
            stack: The stack for which to compute the checksum. This is needed
                to gather the stack integration requirements in case the
                Docker settings specify to install them.
            code_repository: Optional code repository that will be used to
                download files inside the image.

        Returns:
            The checksum.
        """
        hash_ = hashlib.md5()  # nosec
        hash_.update(self.settings.json().encode())
        if self.entrypoint:
            hash_.update(self.entrypoint.encode())

        for destination, source in self.extra_files.items():
            hash_.update(destination.encode())
            hash_.update(source.encode())

        from zenml.utils.pipeline_docker_image_builder import (
            PipelineDockerImageBuilder,
        )

        pass_code_repo = self.should_download_files(
            code_repository=code_repository
        )
        requirements_files = (
            PipelineDockerImageBuilder.gather_requirements_files(
                docker_settings=self.settings,
                stack=stack,
                code_repository=code_repository if pass_code_repo else None,
                log=False,
            )
        )
        for _, requirements, _ in requirements_files:
            hash_.update(requirements.encode())

        return hash_.hexdigest()

    def should_include_files(
        self,
        code_repository: Optional["BaseCodeRepository"],
    ) -> bool:
        """Whether files should be included in the image.

        Args:
            code_repository: Code repository that can be used to download files
                inside the image.

        Returns:
            Whether files should be included in the image.
        """
        if self.settings.source_files == SourceFileMode.INCLUDE:
            return True

        if (
            self.settings.source_files == SourceFileMode.DOWNLOAD_OR_INCLUDE
            and not code_repository
        ):
            return True

        return False

    def should_download_files(
        self,
        code_repository: Optional["BaseCodeRepository"],
    ) -> bool:
        """Whether files should be downloaded in the image.

        Args:
            code_repository: Code repository that can be used to download files
                inside the image.

        Returns:
            Whether files should be downloaded in the image.
        """
        if not code_repository:
            return False

        return self.settings.source_files in {
            SourceFileMode.DOWNLOAD,
            SourceFileMode.DOWNLOAD_OR_INCLUDE,
        }
compute_settings_checksum(self, stack, code_repository=None)

Checksum for all build settings.

Parameters:

Name Type Description Default
stack Stack

The stack for which to compute the checksum. This is needed to gather the stack integration requirements in case the Docker settings specify to install them.

required
code_repository Optional[BaseCodeRepository]

Optional code repository that will be used to download files inside the image.

None

Returns:

Type Description
str

The checksum.

Source code in zenml/config/build_configuration.py
def compute_settings_checksum(
    self,
    stack: "Stack",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
    """Checksum for all build settings.

    Args:
        stack: The stack for which to compute the checksum. This is needed
            to gather the stack integration requirements in case the
            Docker settings specify to install them.
        code_repository: Optional code repository that will be used to
            download files inside the image.

    Returns:
        The checksum.
    """
    hash_ = hashlib.md5()  # nosec
    hash_.update(self.settings.json().encode())
    if self.entrypoint:
        hash_.update(self.entrypoint.encode())

    for destination, source in self.extra_files.items():
        hash_.update(destination.encode())
        hash_.update(source.encode())

    from zenml.utils.pipeline_docker_image_builder import (
        PipelineDockerImageBuilder,
    )

    pass_code_repo = self.should_download_files(
        code_repository=code_repository
    )
    requirements_files = (
        PipelineDockerImageBuilder.gather_requirements_files(
            docker_settings=self.settings,
            stack=stack,
            code_repository=code_repository if pass_code_repo else None,
            log=False,
        )
    )
    for _, requirements, _ in requirements_files:
        hash_.update(requirements.encode())

    return hash_.hexdigest()
should_download_files(self, code_repository)

Whether files should be downloaded in the image.

Parameters:

Name Type Description Default
code_repository Optional[BaseCodeRepository]

Code repository that can be used to download files inside the image.

required

Returns:

Type Description
bool

Whether files should be downloaded in the image.

Source code in zenml/config/build_configuration.py
def should_download_files(
    self,
    code_repository: Optional["BaseCodeRepository"],
) -> bool:
    """Whether files should be downloaded in the image.

    Args:
        code_repository: Code repository that can be used to download files
            inside the image.

    Returns:
        Whether files should be downloaded in the image.
    """
    if not code_repository:
        return False

    return self.settings.source_files in {
        SourceFileMode.DOWNLOAD,
        SourceFileMode.DOWNLOAD_OR_INCLUDE,
    }
should_include_files(self, code_repository)

Whether files should be included in the image.

Parameters:

Name Type Description Default
code_repository Optional[BaseCodeRepository]

Code repository that can be used to download files inside the image.

required

Returns:

Type Description
bool

Whether files should be included in the image.

Source code in zenml/config/build_configuration.py
def should_include_files(
    self,
    code_repository: Optional["BaseCodeRepository"],
) -> bool:
    """Whether files should be included in the image.

    Args:
        code_repository: Code repository that can be used to download files
            inside the image.

    Returns:
        Whether files should be included in the image.
    """
    if self.settings.source_files == SourceFileMode.INCLUDE:
        return True

    if (
        self.settings.source_files == SourceFileMode.DOWNLOAD_OR_INCLUDE
        and not code_repository
    ):
        return True

    return False

compiler

Class for compiling ZenML pipelines into a serializable format.

Compiler

Compiles ZenML pipelines to serializable representations.

Source code in zenml/config/compiler.py
class Compiler:
    """Compiles ZenML pipelines to serializable representations."""

    def compile(
        self,
        pipeline: "Pipeline",
        stack: "Stack",
        run_configuration: PipelineRunConfiguration,
    ) -> Tuple[PipelineDeploymentBase, PipelineSpec]:
        """Compiles a ZenML pipeline to a serializable representation.

        Args:
            pipeline: The pipeline to compile.
            stack: The stack on which the pipeline will run.
            run_configuration: The run configuration for this pipeline.

        Returns:
            The compiled pipeline deployment and spec
        """
        logger.debug("Compiling pipeline `%s`.", pipeline.name)
        # Copy the pipeline before we apply any run-level configurations, so
        # we don't mess with the pipeline object/step objects in any way
        pipeline = copy.deepcopy(pipeline)
        self._apply_run_configuration(
            pipeline=pipeline, config=run_configuration
        )
        self._apply_stack_default_settings(pipeline=pipeline, stack=stack)
        if run_configuration.run_name:
            self._verify_run_name(run_configuration.run_name)

        pipeline_settings = self._filter_and_validate_settings(
            settings=pipeline.configuration.settings,
            configuration_level=ConfigurationLevel.PIPELINE,
            stack=stack,
        )
        with pipeline.__suppress_configure_warnings__():
            pipeline.configure(settings=pipeline_settings, merge=False)

        settings_to_passdown = {
            key: settings
            for key, settings in pipeline_settings.items()
            if ConfigurationLevel.STEP in settings.LEVEL
        }

        steps = {
            invocation_id: self._compile_step_invocation(
                invocation=invocation,
                pipeline_settings=settings_to_passdown,
                pipeline_extra=pipeline.configuration.extra,
                stack=stack,
                step_config=run_configuration.steps.get(invocation_id),
                pipeline_failure_hook_source=pipeline.configuration.failure_hook_source,
                pipeline_success_hook_source=pipeline.configuration.success_hook_source,
            )
            for invocation_id, invocation in self._get_sorted_invocations(
                pipeline=pipeline
            )
        }

        self._ensure_required_stack_components_exist(stack=stack, steps=steps)

        run_name = run_configuration.run_name or self._get_default_run_name(
            pipeline_name=pipeline.name
        )

        client_version, server_version = get_zenml_versions()

        deployment = PipelineDeploymentBase(
            run_name_template=run_name,
            pipeline_configuration=pipeline.configuration,
            step_configurations=steps,
            client_environment=get_run_environment_dict(),
            client_version=client_version,
            server_version=server_version,
        )

        step_specs = [step.spec for step in steps.values()]
        pipeline_spec = self._compute_pipeline_spec(
            pipeline=pipeline, step_specs=step_specs
        )

        logger.debug("Compiled pipeline deployment: %s", deployment)
        logger.debug("Compiled pipeline spec: %s", pipeline_spec)

        return deployment, pipeline_spec

    def compile_spec(self, pipeline: "Pipeline") -> PipelineSpec:
        """Compiles a ZenML pipeline to a pipeline spec.

        This method can be used when a pipeline spec is needed but the full
        deployment including stack information is not required.

        Args:
            pipeline: The pipeline to compile.

        Returns:
            The compiled pipeline spec.
        """
        logger.debug(
            "Compiling pipeline spec for pipeline `%s`.", pipeline.name
        )
        # Copy the pipeline before we connect the steps, so we don't mess with
        # the pipeline object/step objects in any way
        pipeline = copy.deepcopy(pipeline)

        invocations = [
            self._get_step_spec(invocation=invocation)
            for _, invocation in self._get_sorted_invocations(
                pipeline=pipeline
            )
        ]

        pipeline_spec = self._compute_pipeline_spec(
            pipeline=pipeline, step_specs=invocations
        )
        logger.debug("Compiled pipeline spec: %s", pipeline_spec)
        return pipeline_spec

    def _apply_run_configuration(
        self, pipeline: "Pipeline", config: PipelineRunConfiguration
    ) -> None:
        """Applies run configurations to the pipeline and its steps.

        Args:
            pipeline: The pipeline to configure.
            config: The run configurations.

        Raises:
            KeyError: If the run configuration contains options for a
                non-existent step.
        """
        with pipeline.__suppress_configure_warnings__():
            pipeline.configure(
                enable_cache=config.enable_cache,
                enable_artifact_metadata=config.enable_artifact_metadata,
                enable_artifact_visualization=config.enable_artifact_visualization,
                enable_step_logs=config.enable_step_logs,
                settings=config.settings,
                extra=config.extra,
                model=config.model,
                parameters=config.parameters,
            )

        for invocation_id in config.steps:
            if invocation_id not in pipeline.invocations:
                raise KeyError(
                    f"Configuration for step {invocation_id} cannot be applied to any pipeline step. Make sure that all configured steps are present in your pipeline."
                )

        # Override `enable_cache` of all steps if set at run level
        if config.enable_cache is not None:
            for invocation in pipeline.invocations.values():
                invocation.step.configure(enable_cache=config.enable_cache)

        # Override `enable_artifact_metadata` of all steps if set at run level
        if config.enable_artifact_metadata is not None:
            for invocation in pipeline.invocations.values():
                invocation.step.configure(
                    enable_artifact_metadata=config.enable_artifact_metadata
                )

        # Override `enable_artifact_visualization` if set at run level
        if config.enable_artifact_visualization is not None:
            for invocation in pipeline.invocations.values():
                invocation.step.configure(
                    enable_artifact_visualization=config.enable_artifact_visualization
                )

        # Override `enable_step_logs` if set at run level
        if config.enable_step_logs is not None:
            for invocation in pipeline.invocations.values():
                invocation.step.configure(
                    enable_step_logs=config.enable_step_logs
                )

    def _apply_stack_default_settings(
        self, pipeline: "Pipeline", stack: "Stack"
    ) -> None:
        """Applies stack default settings to a pipeline.

        Args:
            pipeline: The pipeline to which to apply the default settings.
            stack: The stack containing potential default settings.
        """
        pipeline_settings = pipeline.configuration.settings

        for component in stack.components.values():
            if not component.settings_class:
                continue

            settings_key = settings_utils.get_stack_component_setting_key(
                component
            )
            default_settings = self._get_default_settings(component)

            if settings_key in pipeline_settings:
                combined_settings = pydantic_utils.update_model(
                    default_settings, update=pipeline_settings[settings_key]
                )
                pipeline_settings[settings_key] = combined_settings
            else:
                pipeline_settings[settings_key] = default_settings

        with pipeline.__suppress_configure_warnings__():
            pipeline.configure(settings=pipeline_settings, merge=False)

    def _get_default_settings(
        self,
        stack_component: "StackComponent",
    ) -> "BaseSettings":
        """Gets default settings configured on a stack component.

        Args:
            stack_component: The stack component for which to get the settings.

        Returns:
            The settings configured on the stack component.
        """
        assert stack_component.settings_class
        # Exclude additional config attributes that aren't part of the settings
        field_names = set(stack_component.settings_class.__fields__)
        default_settings = stack_component.settings_class.parse_obj(
            stack_component.config.dict(
                include=field_names, exclude_unset=True, exclude_defaults=True
            )
        )
        return default_settings

    @staticmethod
    def _verify_run_name(run_name: str) -> None:
        """Verifies that the run name contains only valid placeholders.

        Args:
            run_name: The run name to verify.

        Raises:
            ValueError: If the run name contains invalid placeholders.
        """
        valid_placeholder_names = {"date", "time"}
        placeholders = {
            v[1] for v in string.Formatter().parse(run_name) if v[1]
        }
        if not placeholders.issubset(valid_placeholder_names):
            raise ValueError(
                f"Invalid run name {run_name}. Only the placeholders "
                f"{valid_placeholder_names} are allowed in run names."
            )

    def _verify_upstream_steps(
        self, invocation: "StepInvocation", pipeline: "Pipeline"
    ) -> None:
        """Verifies the upstream steps for a step invocation.

        Args:
            invocation: The step invocation for which to verify the upstream
                steps.
            pipeline: The parent pipeline of the invocation.

        Raises:
            RuntimeError: If an upstream step is missing.
        """
        available_steps = set(pipeline.invocations)
        invalid_upstream_steps = invocation.upstream_steps - available_steps

        if invalid_upstream_steps:
            raise RuntimeError(
                f"Invalid upstream steps: {invalid_upstream_steps}. Available "
                f"steps in this pipeline: {available_steps}."
            )

    def _filter_and_validate_settings(
        self,
        settings: Dict[str, "BaseSettings"],
        configuration_level: ConfigurationLevel,
        stack: "Stack",
    ) -> Dict[str, "BaseSettings"]:
        """Filters and validates settings.

        Args:
            settings: The settings to check.
            configuration_level: The level on which these settings
                were configured.
            stack: The stack on which the pipeline will run.

        Raises:
            TypeError: If settings with an unsupported configuration
                level were specified.

        Returns:
            The filtered settings.
        """
        validated_settings = {}

        for key, settings_instance in settings.items():
            resolver = SettingsResolver(key=key, settings=settings_instance)
            try:
                settings_instance = resolver.resolve(stack=stack)
            except KeyError:
                logger.info(
                    "Not including stack component settings with key `%s`.",
                    key,
                )
                continue

            if configuration_level not in settings_instance.LEVEL:
                raise TypeError(
                    f"The settings class {settings_instance.__class__} can not "
                    f"be specified on a {configuration_level.name} level."
                )
            validated_settings[key] = settings_instance

        return validated_settings

    def _get_step_spec(
        self,
        invocation: "StepInvocation",
    ) -> StepSpec:
        """Gets the spec for a step invocation.

        Args:
            invocation: The invocation for which to get the spec.

        Returns:
            The step spec.
        """
        inputs = {
            key: InputSpec(
                step_name=artifact.invocation_id,
                output_name=artifact.output_name,
            )
            for key, artifact in invocation.input_artifacts.items()
        }
        return StepSpec(
            source=invocation.step.resolve(),
            upstream_steps=sorted(invocation.upstream_steps),
            inputs=inputs,
            pipeline_parameter_name=invocation.id,
        )

    def _compile_step_invocation(
        self,
        invocation: "StepInvocation",
        pipeline_settings: Dict[str, "BaseSettings"],
        pipeline_extra: Dict[str, Any],
        stack: "Stack",
        step_config: Optional["StepConfigurationUpdate"],
        pipeline_failure_hook_source: Optional["Source"] = None,
        pipeline_success_hook_source: Optional["Source"] = None,
    ) -> Step:
        """Compiles a ZenML step.

        Args:
            invocation: The step invocation to compile.
            pipeline_settings: settings configured on the
                pipeline of the step.
            pipeline_extra: Extra values configured on the pipeline of the step.
            stack: The stack on which the pipeline will be run.
            step_config: Run configuration for the step.
            pipeline_failure_hook_source: Source for the failure hook.
            pipeline_success_hook_source: Source for the success hook.

        Returns:
            The compiled step.
        """
        # Copy the invocation (including its referenced step) before we apply
        # the step configuration which is exclusive to this invocation.
        invocation = copy.deepcopy(invocation)

        step = invocation.step
        if step_config:
            step._apply_configuration(
                step_config, runtime_parameters=invocation.parameters
            )

        step_spec = self._get_step_spec(invocation=invocation)
        step_settings = self._filter_and_validate_settings(
            settings=step.configuration.settings,
            configuration_level=ConfigurationLevel.STEP,
            stack=stack,
        )
        step_extra = step.configuration.extra
        step_on_failure_hook_source = step.configuration.failure_hook_source
        step_on_success_hook_source = step.configuration.success_hook_source

        step.configure(
            settings=pipeline_settings,
            extra=pipeline_extra,
            on_failure=pipeline_failure_hook_source,
            on_success=pipeline_success_hook_source,
            merge=False,
        )
        step.configure(
            settings=step_settings,
            extra=step_extra,
            on_failure=step_on_failure_hook_source,
            on_success=step_on_success_hook_source,
            merge=True,
        )

        parameters_to_ignore = (
            set(step_config.parameters) if step_config else set()
        )
        complete_step_configuration = invocation.finalize(
            parameters_to_ignore=parameters_to_ignore
        )
        return Step(spec=step_spec, config=complete_step_configuration)

    @staticmethod
    def _get_default_run_name(pipeline_name: str) -> str:
        """Gets the default name for a pipeline run.

        Args:
            pipeline_name: Name of the pipeline which will be run.

        Returns:
            Run name.
        """
        return f"{pipeline_name}-{{date}}-{{time}}"

    def _get_sorted_invocations(
        self,
        pipeline: "Pipeline",
    ) -> List[Tuple[str, "StepInvocation"]]:
        """Sorts the step invocations of a pipeline using topological sort.

        The resulting list of invocations will be in an order that can be
        executed sequentially without any conflicts.

        Args:
            pipeline: The pipeline of which to sort the invocations

        Returns:
            The sorted steps.
        """
        from zenml.orchestrators.dag_runner import reverse_dag
        from zenml.orchestrators.topsort import topsorted_layers

        # Sort step names using topological sort
        dag: Dict[str, List[str]] = {}
        for name, step in pipeline.invocations.items():
            self._verify_upstream_steps(invocation=step, pipeline=pipeline)
            dag[name] = list(step.upstream_steps)

        reversed_dag: Dict[str, List[str]] = reverse_dag(dag)
        layers = topsorted_layers(
            nodes=list(dag),
            get_node_id_fn=lambda node: node,
            get_parent_nodes=lambda node: dag[node],
            get_child_nodes=lambda node: reversed_dag[node],
        )
        sorted_step_names = [step for layer in layers for step in layer]
        sorted_invocations: List[Tuple[str, "StepInvocation"]] = [
            (name_in_pipeline, pipeline.invocations[name_in_pipeline])
            for name_in_pipeline in sorted_step_names
        ]
        return sorted_invocations

    @staticmethod
    def _ensure_required_stack_components_exist(
        stack: "Stack", steps: Mapping[str, "Step"]
    ) -> None:
        """Ensures that the stack components required for each step exist.

        Args:
            stack: The stack on which the pipeline should be deployed.
            steps: The steps of the pipeline.

        Raises:
            StackValidationError: If a required stack component is missing.
        """
        available_step_operators = (
            {stack.step_operator.name} if stack.step_operator else set()
        )
        available_experiment_trackers = (
            {stack.experiment_tracker.name}
            if stack.experiment_tracker
            else set()
        )

        for name, step in steps.items():
            step_operator = step.config.step_operator
            if step_operator and step_operator not in available_step_operators:
                raise StackValidationError(
                    f"Step `{name}` requires step operator "
                    f"'{step_operator}' which is not configured in "
                    f"the stack '{stack.name}'. Available step operators: "
                    f"{available_step_operators}."
                )

            experiment_tracker = step.config.experiment_tracker
            if (
                experiment_tracker
                and experiment_tracker not in available_experiment_trackers
            ):
                raise StackValidationError(
                    f"Step `{name}` requires experiment tracker "
                    f"'{experiment_tracker}' which is not "
                    f"configured in the stack '{stack.name}'. Available "
                    f"experiment trackers: {available_experiment_trackers}."
                )

    @staticmethod
    def _compute_pipeline_spec(
        pipeline: "Pipeline", step_specs: List["StepSpec"]
    ) -> "PipelineSpec":
        """Computes the pipeline spec.

        Args:
            pipeline: The pipeline for which to compute the spec.
            step_specs: The step specs for the pipeline.

        Returns:
            The pipeline spec.

        Raises:
            ValueError: If the pipeline has no steps.
        """
        from zenml.pipelines import BasePipeline

        if not step_specs:
            raise ValueError(
                f"Pipeline '{pipeline.name}' cannot be compiled because it has "
                f"no steps. Please make sure that your steps are decorated "
                "with `@step` and that at least one step is called within the "
                "pipeline. For more information, see "
                "https://docs.zenml.io/user-guide/starter-guide."
            )

        additional_spec_args: Dict[str, Any] = {}
        if isinstance(pipeline, BasePipeline):
            # use older spec version for legacy pipelines
            additional_spec_args["version"] = "0.3"
        else:
            additional_spec_args["source"] = pipeline.resolve()
            additional_spec_args["parameters"] = pipeline._parameters

        return PipelineSpec(steps=step_specs, **additional_spec_args)
compile(self, pipeline, stack, run_configuration)

Compiles a ZenML pipeline to a serializable representation.

Parameters:

Name Type Description Default
pipeline Pipeline

The pipeline to compile.

required
stack Stack

The stack on which the pipeline will run.

required
run_configuration PipelineRunConfiguration

The run configuration for this pipeline.

required

Returns:

Type Description
Tuple[zenml.models.v2.core.pipeline_deployment.PipelineDeploymentBase, zenml.config.pipeline_spec.PipelineSpec]

The compiled pipeline deployment and spec

Source code in zenml/config/compiler.py
def compile(
    self,
    pipeline: "Pipeline",
    stack: "Stack",
    run_configuration: PipelineRunConfiguration,
) -> Tuple[PipelineDeploymentBase, PipelineSpec]:
    """Compiles a ZenML pipeline to a serializable representation.

    Args:
        pipeline: The pipeline to compile.
        stack: The stack on which the pipeline will run.
        run_configuration: The run configuration for this pipeline.

    Returns:
        The compiled pipeline deployment and spec
    """
    logger.debug("Compiling pipeline `%s`.", pipeline.name)
    # Copy the pipeline before we apply any run-level configurations, so
    # we don't mess with the pipeline object/step objects in any way
    pipeline = copy.deepcopy(pipeline)
    self._apply_run_configuration(
        pipeline=pipeline, config=run_configuration
    )
    self._apply_stack_default_settings(pipeline=pipeline, stack=stack)
    if run_configuration.run_name:
        self._verify_run_name(run_configuration.run_name)

    pipeline_settings = self._filter_and_validate_settings(
        settings=pipeline.configuration.settings,
        configuration_level=ConfigurationLevel.PIPELINE,
        stack=stack,
    )
    with pipeline.__suppress_configure_warnings__():
        pipeline.configure(settings=pipeline_settings, merge=False)

    settings_to_passdown = {
        key: settings
        for key, settings in pipeline_settings.items()
        if ConfigurationLevel.STEP in settings.LEVEL
    }

    steps = {
        invocation_id: self._compile_step_invocation(
            invocation=invocation,
            pipeline_settings=settings_to_passdown,
            pipeline_extra=pipeline.configuration.extra,
            stack=stack,
            step_config=run_configuration.steps.get(invocation_id),
            pipeline_failure_hook_source=pipeline.configuration.failure_hook_source,
            pipeline_success_hook_source=pipeline.configuration.success_hook_source,
        )
        for invocation_id, invocation in self._get_sorted_invocations(
            pipeline=pipeline
        )
    }

    self._ensure_required_stack_components_exist(stack=stack, steps=steps)

    run_name = run_configuration.run_name or self._get_default_run_name(
        pipeline_name=pipeline.name
    )

    client_version, server_version = get_zenml_versions()

    deployment = PipelineDeploymentBase(
        run_name_template=run_name,
        pipeline_configuration=pipeline.configuration,
        step_configurations=steps,
        client_environment=get_run_environment_dict(),
        client_version=client_version,
        server_version=server_version,
    )

    step_specs = [step.spec for step in steps.values()]
    pipeline_spec = self._compute_pipeline_spec(
        pipeline=pipeline, step_specs=step_specs
    )

    logger.debug("Compiled pipeline deployment: %s", deployment)
    logger.debug("Compiled pipeline spec: %s", pipeline_spec)

    return deployment, pipeline_spec
compile_spec(self, pipeline)

Compiles a ZenML pipeline to a pipeline spec.

This method can be used when a pipeline spec is needed but the full deployment including stack information is not required.

Parameters:

Name Type Description Default
pipeline Pipeline

The pipeline to compile.

required

Returns:

Type Description
PipelineSpec

The compiled pipeline spec.

Source code in zenml/config/compiler.py
def compile_spec(self, pipeline: "Pipeline") -> PipelineSpec:
    """Compiles a ZenML pipeline to a pipeline spec.

    This method can be used when a pipeline spec is needed but the full
    deployment including stack information is not required.

    Args:
        pipeline: The pipeline to compile.

    Returns:
        The compiled pipeline spec.
    """
    logger.debug(
        "Compiling pipeline spec for pipeline `%s`.", pipeline.name
    )
    # Copy the pipeline before we connect the steps, so we don't mess with
    # the pipeline object/step objects in any way
    pipeline = copy.deepcopy(pipeline)

    invocations = [
        self._get_step_spec(invocation=invocation)
        for _, invocation in self._get_sorted_invocations(
            pipeline=pipeline
        )
    ]

    pipeline_spec = self._compute_pipeline_spec(
        pipeline=pipeline, step_specs=invocations
    )
    logger.debug("Compiled pipeline spec: %s", pipeline_spec)
    return pipeline_spec

get_zenml_versions()

Returns the version of ZenML on the client and server side.

Returns:

Type Description
Tuple[str, str]

the ZenML versions on the client and server side respectively.

Source code in zenml/config/compiler.py
def get_zenml_versions() -> Tuple[str, str]:
    """Returns the version of ZenML on the client and server side.

    Returns:
        the ZenML versions on the client and server side respectively.
    """
    from zenml.client import Client

    client = Client()
    server_version = client.zen_store.get_store_info().version

    return __version__, server_version

constants

ZenML settings constants.

docker_settings

Docker settings.

DockerSettings (BaseSettings) pydantic-model

Settings for building Docker images to run ZenML pipelines.

Build process:
  • No dockerfile specified: If any of the options regarding requirements, environment variables or copying files require us to build an image, ZenML will build this image. Otherwise the parent_image will be used to run the pipeline.
  • dockerfile specified: ZenML will first build an image based on the specified Dockerfile. If any of the options regarding requirements, environment variables or copying files require an additional image built on top of that, ZenML will build a second image. If not, the image build from the specified Dockerfile will be used to run the pipeline.
Requirements installation order:

Depending on the configuration of this object, requirements will be installed in the following order (each step optional): - The packages installed in your local python environment - The packages specified via the requirements attribute - The packages specified via the required_integrations and potentially stack requirements - The packages specified via the required_hub_plugins attribute

Attributes:

Name Type Description
parent_image Optional[str]

Full name of the Docker image that should be used as the parent for the image that will be built. Defaults to a ZenML image built for the active Python and ZenML version.

Additional notes: * If you specify a custom image here, you need to make sure it has ZenML installed. * If this is a non-local image, the environment which is running the pipeline and building the Docker image needs to be able to pull this image. * If a custom dockerfile is specified for this settings object, this parent image will be ignored.

dockerfile Optional[str]

Path to a custom Dockerfile that should be built. Depending on the other values you specify in this object, the resulting image will be used directly to run your pipeline or ZenML will use it as a parent image to build on top of. See the general docstring of this class for more information.

Additional notes: * If you specify this, the parent_image attribute will be ignored. * If you specify this, the image built from this Dockerfile needs to have ZenML installed.

build_context_root Optional[str]

Build context root for the Docker build, only used when the dockerfile attribute is set. If this is left empty, the build context will only contain the Dockerfile.

build_options Dict[str, Any]

Additional options that will be passed unmodified to the Docker build call when building an image using the specified dockerfile. You can use this to for example specify build args or a target stage. See https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build for a full list of available options.

skip_build bool

If set to True, the parent image will be used directly to run the steps of your pipeline.

target_repository str

Name of the Docker repository to which the image should be pushed. This repository will be appended to the registry URI of the container registry of your stack and should therefore not include any registry.

python_package_installer PythonPackageInstaller

The package installer to use for python packages.

python_package_installer_args Dict[str, Any]

Arguments to pass to the python package installer.

replicate_local_python_environment Union[List[str], zenml.config.docker_settings.PythonEnvironmentExportMethod]

If not None, ZenML will use the specified method to generate a requirements file that replicates the packages installed in the currently running python environment. This requirements file will then be installed in the Docker image.

requirements Union[NoneType, str, List[str]]

Path to a requirements file or a list of required pip packages. During the image build, these requirements will be installed using pip. If you need to use a different tool to resolve and/or install your packages, please use a custom parent image or specify a custom dockerfile.

required_integrations List[str]

List of ZenML integrations that should be installed. All requirements for the specified integrations will be installed inside the Docker image.

required_hub_plugins List[str]

List of ZenML Hub plugins to install. Expected format: '(/)=='. If no version is specified, the latest version is taken. The packages of required plugins and all their dependencies will be installed inside the Docker image.

install_stack_requirements bool

If True, ZenML will automatically detect if components of your active stack are part of a ZenML integration and install the corresponding requirements and apt packages. If you set this to False or use custom components in your stack, you need to make sure these get installed by specifying them in the requirements and apt_packages attributes.

apt_packages List[str]

APT packages to install inside the Docker image.

environment Dict[str, Any]

Dictionary of environment variables to set inside the Docker image.

dockerignore Optional[str]

Path to a dockerignore file to use when building the Docker image.

copy_files bool

DEPRECATED, use the source_files attribute instead.

copy_global_config bool

DEPRECATED/UNUSED.

user Optional[str]

If not None, will set the user, make it owner of the /app directory which contains all the user code and run the container entrypoint as this user.

source_files SourceFileMode

Defines how the user source files will be handled when building the Docker image. * INCLUDE: The files will be included in the Docker image. * DOWNLOAD: The files will be downloaded when running the image. If this is specified, the files must be inside a registered code repository and the repository must have no local changes, otherwise the build will fail. * DOWNLOAD_OR_INCLUDE: The files will be downloaded if they're inside a registered code repository and the repository has no local changes, otherwise they will be included in the image. * IGNORE: The files will not be included or downloaded in the image. If you use this option, you're responsible that all the files to run your steps exist in the right place.

Source code in zenml/config/docker_settings.py
class DockerSettings(BaseSettings):
    """Settings for building Docker images to run ZenML pipelines.

    Build process:
    --------------
    * No `dockerfile` specified: If any of the options regarding
    requirements, environment variables or copying files require us to build an
    image, ZenML will build this image. Otherwise the `parent_image` will be
    used to run the pipeline.
    * `dockerfile` specified: ZenML will first build an image based on the
    specified Dockerfile. If any of the options regarding
    requirements, environment variables or copying files require an additional
    image built on top of that, ZenML will build a second image. If not, the
    image build from the specified Dockerfile will be used to run the pipeline.

    Requirements installation order:
    --------------------------------
    Depending on the configuration of this object, requirements will be
    installed in the following order (each step optional):
    - The packages installed in your local python environment
    - The packages specified via the `requirements` attribute
    - The packages specified via the `required_integrations` and potentially
      stack requirements
    - The packages specified via the `required_hub_plugins` attribute

    Attributes:
        parent_image: Full name of the Docker image that should be
            used as the parent for the image that will be built. Defaults to
            a ZenML image built for the active Python and ZenML version.

            Additional notes:
            * If you specify a custom image here, you need to make sure it has
            ZenML installed.
            * If this is a non-local image, the environment which is running
            the pipeline and building the Docker image needs to be able to pull
            this image.
            * If a custom `dockerfile` is specified for this settings
            object, this parent image will be ignored.
        dockerfile: Path to a custom Dockerfile that should be built. Depending
            on the other values you specify in this object, the resulting
            image will be used directly to run your pipeline or ZenML will use
            it as a parent image to build on top of. See the general docstring
            of this class for more information.

            Additional notes:
            * If you specify this, the `parent_image` attribute will be ignored.
            * If you specify this, the image built from this Dockerfile needs
            to have ZenML installed.
        build_context_root: Build context root for the Docker build, only used
            when the `dockerfile` attribute is set. If this is left empty, the
            build context will only contain the Dockerfile.
        build_options: Additional options that will be passed unmodified to the
            Docker build call when building an image using the specified
            `dockerfile`. You can use this to for example specify build
            args or a target stage. See
            https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build
            for a full list of available options.
        skip_build: If set to `True`, the parent image will be used directly to
            run the steps of your pipeline.
        target_repository: Name of the Docker repository to which the
            image should be pushed. This repository will be appended to the
            registry URI of the container registry of your stack and should
            therefore **not** include any registry.
        python_package_installer: The package installer to use for python
            packages.
        python_package_installer_args: Arguments to pass to the python package
            installer.
        replicate_local_python_environment: If not `None`, ZenML will use the
            specified method to generate a requirements file that replicates
            the packages installed in the currently running python environment.
            This requirements file will then be installed in the Docker image.
        requirements: Path to a requirements file or a list of required pip
            packages. During the image build, these requirements will be
            installed using pip. If you need to use a different tool to
            resolve and/or install your packages, please use a custom parent
            image or specify a custom `dockerfile`.
        required_integrations: List of ZenML integrations that should be
            installed. All requirements for the specified integrations will
            be installed inside the Docker image.
        required_hub_plugins: List of ZenML Hub plugins to install.
            Expected format: '(<author_username>/)<plugin_name>==<version>'.
            If no version is specified, the latest version is taken. The
            packages of required plugins and all their dependencies will be
            installed inside the Docker image.
        install_stack_requirements: If `True`, ZenML will automatically detect
            if components of your active stack are part of a ZenML integration
            and install the corresponding requirements and apt packages.
            If you set this to `False` or use custom components in your stack,
            you need to make sure these get installed by specifying them in
            the `requirements` and `apt_packages` attributes.
        apt_packages: APT packages to install inside the Docker image.
        environment: Dictionary of environment variables to set inside the
            Docker image.
        dockerignore: Path to a dockerignore file to use when building the
            Docker image.
        copy_files: DEPRECATED, use the `source_files` attribute instead.
        copy_global_config: DEPRECATED/UNUSED.
        user: If not `None`, will set the user, make it owner of the `/app`
            directory which contains all the user code and run the container
            entrypoint as this user.
        source_files: Defines how the user source files will be handled when
            building the Docker image.
            * INCLUDE: The files will be included in the Docker image.
            * DOWNLOAD: The files will be downloaded when running the image. If
              this is specified, the files must be inside a registered code
              repository and the repository must have no local changes,
              otherwise the build will fail.
            * DOWNLOAD_OR_INCLUDE: The files will be downloaded if they're
              inside a registered code repository and the repository has no
              local changes, otherwise they will be included in the image.
            * IGNORE: The files will not be included or downloaded in the image.
              If you use this option, you're responsible that all the files
              to run your steps exist in the right place.
    """

    parent_image: Optional[str] = None
    dockerfile: Optional[str] = None
    build_context_root: Optional[str] = None
    build_options: Dict[str, Any] = {}
    skip_build: bool = False
    target_repository: str = "zenml"
    python_package_installer: PythonPackageInstaller = (
        PythonPackageInstaller.PIP
    )
    python_package_installer_args: Dict[str, Any] = {}
    replicate_local_python_environment: Optional[
        Union[List[str], PythonEnvironmentExportMethod]
    ] = None
    requirements: Union[None, str, List[str]] = None
    required_integrations: List[str] = []
    required_hub_plugins: List[str] = []
    install_stack_requirements: bool = True
    apt_packages: List[str] = []
    environment: Dict[str, Any] = {}
    dockerignore: Optional[str] = None
    copy_files: bool = True
    copy_global_config: bool = True
    user: Optional[str] = None

    source_files: SourceFileMode = SourceFileMode.DOWNLOAD_OR_INCLUDE

    _deprecation_validator = deprecation_utils.deprecate_pydantic_attributes(
        "copy_files", "copy_global_config"
    )

    @root_validator(pre=True)
    def _migrate_copy_files(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Migrates the value from the old copy_files attribute.

        Args:
            values: The settings values.

        Returns:
            The migrated settings values.
        """
        copy_files = values.get("copy_files", None)

        if copy_files is None:
            return values

        if values.get("source_files", None):
            # Ignore the copy files value in favor of the new source files
            logger.warning(
                "Both `copy_files` and `source_files` specified for the "
                "DockerSettings, ignoring the `copy_files` value."
            )
        elif copy_files is True:
            values["source_files"] = SourceFileMode.INCLUDE
        elif copy_files is False:
            values["source_files"] = SourceFileMode.IGNORE

        return values

    @root_validator
    def _validate_skip_build(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Ensures that a parent image is passed when trying to skip the build.

        Args:
            values: The settings values.

        Returns:
            The validated settings values.

        Raises:
            ValueError: If the build should be skipped but no parent image
                was specified.
        """
        skip_build = values.get("skip_build", False)
        parent_image = values.get("parent_image")

        if skip_build and not parent_image:
            raise ValueError(
                "Docker settings that specify `skip_build=True` must always "
                "contain a `parent_image`. This parent image will be used "
                "to run the steps of your pipeline directly without additional "
                "Docker builds on top of it."
            )

        return values

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # prevent extra attributes during model initialization
        extra = Extra.forbid
Config

Pydantic configuration class.

Source code in zenml/config/docker_settings.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # prevent extra attributes during model initialization
    extra = Extra.forbid

PythonEnvironmentExportMethod (Enum)

Different methods to export the local Python environment.

Source code in zenml/config/docker_settings.py
class PythonEnvironmentExportMethod(Enum):
    """Different methods to export the local Python environment."""

    PIP_FREEZE = "pip_freeze"
    POETRY_EXPORT = "poetry_export"

    @property
    def command(self) -> str:
        """Shell command that outputs local python packages.

        The output string must be something that can be interpreted as a
        requirements file for pip once it's written to a file.

        Returns:
            Shell command.
        """
        return {
            PythonEnvironmentExportMethod.PIP_FREEZE: "pip freeze",
            PythonEnvironmentExportMethod.POETRY_EXPORT: "poetry export --format=requirements.txt",
        }[self]

PythonPackageInstaller (Enum)

Different installers for python packages.

Source code in zenml/config/docker_settings.py
class PythonPackageInstaller(Enum):
    """Different installers for python packages."""

    PIP = "pip"
    UV = "uv"

SourceFileMode (Enum)

Different methods to handle source files in Docker images.

Source code in zenml/config/docker_settings.py
class SourceFileMode(Enum):
    """Different methods to handle source files in Docker images."""

    INCLUDE = "include"
    DOWNLOAD_OR_INCLUDE = "download_or_include"
    DOWNLOAD = "download"
    IGNORE = "ignore"

global_config

Functionality to support ZenML GlobalConfiguration.

GlobalConfigMetaClass (ModelMetaclass)

Global configuration metaclass.

This metaclass is used to enforce a singleton instance of the GlobalConfiguration class with the following additional properties:

  • the GlobalConfiguration is initialized automatically on import with the default configuration, if no config file exists yet.
  • the GlobalConfiguration undergoes a schema migration if the version of the config file is older than the current version of the ZenML package.
  • a default store is set if no store is configured yet.
Source code in zenml/config/global_config.py
class GlobalConfigMetaClass(ModelMetaclass):
    """Global configuration metaclass.

    This metaclass is used to enforce a singleton instance of the
    GlobalConfiguration class with the following additional properties:

    * the GlobalConfiguration is initialized automatically on import with the
    default configuration, if no config file exists yet.
    * the GlobalConfiguration undergoes a schema migration if the version of the
    config file is older than the current version of the ZenML package.
    * a default store is set if no store is configured yet.
    """

    def __init__(cls, *args: Any, **kwargs: Any) -> None:
        """Initialize a singleton class.

        Args:
            *args: positional arguments
            **kwargs: keyword arguments
        """
        super().__init__(*args, **kwargs)
        cls._global_config: Optional["GlobalConfiguration"] = None

    def __call__(cls, *args: Any, **kwargs: Any) -> "GlobalConfiguration":
        """Create or return the default global config instance.

        Args:
            *args: positional arguments
            **kwargs: keyword arguments

        Returns:
            The global GlobalConfiguration instance.
        """
        if not cls._global_config:
            cls._global_config = cast(
                "GlobalConfiguration", super().__call__(*args, **kwargs)
            )
            cls._global_config._migrate_config()
        return cls._global_config
__call__(cls, *args, **kwargs) special

Create or return the default global config instance.

Parameters:

Name Type Description Default
*args Any

positional arguments

()
**kwargs Any

keyword arguments

{}

Returns:

Type Description
GlobalConfiguration

The global GlobalConfiguration instance.

Source code in zenml/config/global_config.py
def __call__(cls, *args: Any, **kwargs: Any) -> "GlobalConfiguration":
    """Create or return the default global config instance.

    Args:
        *args: positional arguments
        **kwargs: keyword arguments

    Returns:
        The global GlobalConfiguration instance.
    """
    if not cls._global_config:
        cls._global_config = cast(
            "GlobalConfiguration", super().__call__(*args, **kwargs)
        )
        cls._global_config._migrate_config()
    return cls._global_config
__init__(cls, *args, **kwargs) special

Initialize a singleton class.

Parameters:

Name Type Description Default
*args Any

positional arguments

()
**kwargs Any

keyword arguments

{}
Source code in zenml/config/global_config.py
def __init__(cls, *args: Any, **kwargs: Any) -> None:
    """Initialize a singleton class.

    Args:
        *args: positional arguments
        **kwargs: keyword arguments
    """
    super().__init__(*args, **kwargs)
    cls._global_config: Optional["GlobalConfiguration"] = None

GlobalConfiguration (BaseModel) pydantic-model

Stores global configuration options.

Configuration options are read from a config file, but can be overwritten by environment variables. See GlobalConfiguration.__getattribute__ for more details.

Attributes:

Name Type Description
user_id

Unique user id.

user_email

Email address associated with this client.

user_email_opt_in

Whether the user has opted in to email communication.

analytics_opt_in

If a user agreed to sending analytics or not.

version

Version of ZenML that was last used to create or update the global config.

store

Store configuration.

active_stack_id

The ID of the active stack.

active_workspace_name

The name of the active workspace.

jwt_secret_key

The secret key used to sign and verify JWT tokens.

Source code in zenml/config/global_config.py
class GlobalConfiguration(BaseModel, metaclass=GlobalConfigMetaClass):
    """Stores global configuration options.

    Configuration options are read from a config file, but can be overwritten
    by environment variables. See `GlobalConfiguration.__getattribute__` for
    more details.

    Attributes:
        user_id: Unique user id.
        user_email: Email address associated with this client.
        user_email_opt_in: Whether the user has opted in to email communication.
        analytics_opt_in: If a user agreed to sending analytics or not.
        version: Version of ZenML that was last used to create or update the
            global config.
        store: Store configuration.
        active_stack_id: The ID of the active stack.
        active_workspace_name: The name of the active workspace.
        jwt_secret_key: The secret key used to sign and verify JWT tokens.
    """

    user_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    user_email: Optional[str] = None
    user_email_opt_in: Optional[bool] = None
    analytics_opt_in: bool = True
    version: Optional[str]
    store: Optional[StoreConfiguration]
    active_stack_id: Optional[uuid.UUID]
    active_workspace_name: Optional[str]

    _zen_store: Optional["BaseZenStore"] = None
    _active_workspace: Optional["WorkspaceResponse"] = None
    _active_stack: Optional["StackResponse"] = None

    def __init__(self, **data: Any) -> None:
        """Initializes a GlobalConfiguration using values from the config file.

        GlobalConfiguration is a singleton class: only one instance can exist.
        Calling this constructor multiple times will always yield the same
        instance.

        Args:
            data: Custom configuration options.
        """
        config_values = self._read_config()
        config_values.update(data)

        super().__init__(**config_values)

        if not fileio.exists(self._config_file):
            self._write_config()

    @classmethod
    def get_instance(cls) -> Optional["GlobalConfiguration"]:
        """Return the GlobalConfiguration singleton instance.

        Returns:
            The GlobalConfiguration singleton instance or None, if the
            GlobalConfiguration hasn't been initialized yet.
        """
        return cls._global_config

    @classmethod
    def _reset_instance(
        cls, config: Optional["GlobalConfiguration"] = None
    ) -> None:
        """Reset the GlobalConfiguration singleton instance.

        This method is only meant for internal use and testing purposes.

        Args:
            config: The GlobalConfiguration instance to set as the global
                singleton. If None, the global GlobalConfiguration singleton is
                reset to an empty value.
        """
        cls._global_config = config
        if config:
            config._write_config()

    @validator("version")
    def _validate_version(cls, v: Optional[str]) -> Optional[str]:
        """Validate the version attribute.

        Args:
            v: The version attribute value.

        Returns:
            The version attribute value.

        Raises:
            RuntimeError: If the version parsing fails.
        """
        if v is None:
            return v

        if not isinstance(version.parse(v), version.Version):
            # If the version parsing fails, it returns a `LegacyVersion`
            # instead. Check to make sure it's an actual `Version` object
            # which represents a valid version.
            raise RuntimeError(
                f"Invalid version in global configuration: {v}."
            )

        return v

    def __setattr__(self, key: str, value: Any) -> None:
        """Sets an attribute and persists it in the global configuration.

        Args:
            key: The attribute name.
            value: The attribute value.
        """
        super().__setattr__(key, value)
        if key.startswith("_"):
            return
        self._write_config()

    def __custom_getattribute__(self, key: str) -> Any:
        """Gets an attribute value for a specific key.

        If a value for this attribute was specified using an environment
        variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
        value can be parsed to the attribute type, the value from this
        environment variable is returned instead.

        Args:
            key: The attribute name.

        Returns:
            The attribute value.
        """
        value = super().__getattribute__(key)
        if key.startswith("_") or key not in type(self).__fields__:
            return value

        environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
        try:
            environment_variable_value = os.environ[environment_variable_name]
            # set the environment variable value to leverage Pydantic's type
            # conversion and validation
            super().__setattr__(key, environment_variable_value)
            return_value = super().__getattribute__(key)
            # set back the old value as we don't want to permanently store
            # the environment variable value here
            super().__setattr__(key, value)
            return return_value
        except (ValidationError, KeyError, TypeError):
            return value

    if not TYPE_CHECKING:
        # When defining __getattribute__, mypy allows accessing non-existent
        # attributes without failing
        # (see https://github.com/python/mypy/issues/13319).
        __getattribute__ = __custom_getattribute__

    def _migrate_config(self) -> None:
        """Migrates the global config to the latest version."""
        curr_version = version.parse(__version__)
        if self.version is None:
            logger.info(
                "Initializing the ZenML global configuration version to %s",
                curr_version,
            )
        else:
            config_version = version.parse(self.version)
            if config_version > curr_version:
                logger.error(
                    "The ZenML global configuration version (%s) is higher "
                    "than the version of ZenML currently being used (%s). "
                    "Read more about this issue and how to solve it here: "
                    "`https://docs.zenml.io/reference/global-settings#version-mismatch-downgrading`",
                    config_version,
                    curr_version,
                )
                # TODO [ENG-899]: Give more detailed instruction on how to
                #  resolve version mismatch.
                return

            if config_version == curr_version:
                return

            logger.info(
                "Migrating the ZenML global configuration from version %s "
                "to version %s...",
                config_version,
                curr_version,
            )

        # this will also trigger rewriting the config file to disk
        # to ensure the schema migration results are persisted
        self.version = __version__

    def _read_config(self) -> Dict[str, Any]:
        """Reads configuration options from disk.

        If the config file doesn't exist yet, this method returns an empty
        dictionary.

        Returns:
            A dictionary containing the configuration options.
        """
        config_file = self._config_file
        config_values = {}
        if fileio.exists(config_file):
            config_values = yaml_utils.read_yaml(config_file)

        if config_values is None:
            # This can happen for example if the config file is empty
            config_values = {}
        elif not isinstance(config_values, dict):
            logger.warning(
                "The global configuration file is not a valid YAML file. "
                "Creating a new global configuration file."
            )
            config_values = {}

        return config_values

    def _write_config(self) -> None:
        """Writes the global configuration options to disk."""
        # We never write the configuration file in a ZenML server environment
        # because this is a long-running process and the global configuration
        # variables are supplied via environment variables.
        if ENV_ZENML_SERVER in os.environ:
            logger.info(
                "Not writing the global configuration to disk in a ZenML "
                "server environment."
            )
            return

        config_file = self._config_file
        yaml_dict = json.loads(self.json(exclude_none=True))
        logger.debug(f"Writing config to {config_file}")

        if not fileio.exists(config_file):
            io_utils.create_dir_recursive_if_not_exists(self.config_directory)

        yaml_utils.write_yaml(config_file, yaml_dict)

    def _configure_store(
        self,
        config: StoreConfiguration,
        skip_default_registrations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Configure the global zen store.

        This method creates and initializes the global store according to the
        supplied configuration.

        Args:
            config: The new store configuration to use.
            skip_default_registrations: If `True`, the creation of the default
                stack and user in the store will be skipped.
            **kwargs: Additional keyword arguments to pass to the store
                constructor.
        """
        from zenml.zen_stores.base_zen_store import BaseZenStore

        if self.store == config and self._zen_store:
            # TODO: Do we actually need to create/initialize the store here
            #   or can we just return instead? We think this is just getting
            #   called for default registrations.
            BaseZenStore.create_store(
                config, skip_default_registrations, **kwargs
            )
            return

        # TODO: Revisit the flow regarding the registration of the default
        #  entities once the analytics v1 is removed.
        store = BaseZenStore.create_store(config, True, **kwargs)

        logger.debug(f"Configuring the global store to {store.config}")
        self.store = store.config
        self._zen_store = store

        if not skip_default_registrations:
            store._initialize_database()

        # Sanitize the global configuration to reflect the new store
        self._sanitize_config()
        self._write_config()

        local_stores_path = Path(self.local_stores_path)
        local_stores_path.mkdir(parents=True, exist_ok=True)

    def _sanitize_config(self) -> None:
        """Sanitize and save the global configuration.

        This method is called to ensure that the active stack and workspace
        are set to their default values, if possible.
        """
        # If running in a ZenML server environment, the active stack and
        # workspace are not relevant
        if ENV_ZENML_SERVER in os.environ:
            return
        active_workspace, active_stack = self.zen_store.validate_active_config(
            self.active_workspace_name,
            self.active_stack_id,
            config_name="global",
        )
        self.active_workspace_name = active_workspace.name
        self._active_workspace = active_workspace
        self.set_active_stack(active_stack)

    @property
    def _config_file(self) -> str:
        """Path to the file where global configuration options are stored.

        Returns:
            The path to the global configuration file.
        """
        return os.path.join(self.config_directory, "config.yaml")

    @property
    def config_directory(self) -> str:
        """Directory where the global configuration file is located.

        Returns:
            The directory where the global configuration file is located.
        """
        return io_utils.get_global_config_directory()

    @property
    def local_stores_path(self) -> str:
        """Path where local stores information is stored.

        Returns:
            The path where local stores information is stored.
        """
        if ENV_ZENML_LOCAL_STORES_PATH in os.environ:
            return os.environ[ENV_ZENML_LOCAL_STORES_PATH]

        return os.path.join(
            self.config_directory,
            LOCAL_STORES_DIRECTORY_NAME,
        )

    def get_config_environment_vars(self) -> Dict[str, str]:
        """Convert the global configuration to environment variables.

        Returns:
            Environment variables dictionary.
        """
        environment_vars = {}

        for key in self.__fields__.keys():
            if key == "store":
                # The store configuration uses its own environment variable
                # naming scheme
                continue

            value = getattr(self, key)
            if value is not None:
                environment_vars[CONFIG_ENV_VAR_PREFIX + key.upper()] = str(
                    value
                )

        store_dict = self.store_configuration.dict(exclude_none=True)

        # The secrets store and backup secrets store configurations use their
        # own environment variables naming scheme
        secrets_store_dict = store_dict.pop("secrets_store", None) or {}
        backup_secrets_store_dict = (
            store_dict.pop("backup_secrets_store", None) or {}
        )

        for key, value in store_dict.items():
            if key in ["username", "password"]:
                # Never include the username and password in the env vars. Use
                # the API token instead.
                continue

            environment_vars[ENV_ZENML_STORE_PREFIX + key.upper()] = str(value)

        for key, value in secrets_store_dict.items():
            environment_vars[ENV_ZENML_SECRETS_STORE_PREFIX + key.upper()] = (
                str(value)
            )

        for key, value in backup_secrets_store_dict.items():
            environment_vars[
                ENV_ZENML_BACKUP_SECRETS_STORE_PREFIX + key.upper()
            ] = str(value)

        return environment_vars

    def _get_store_configuration(
        self, baseline: Optional[StoreConfiguration] = None
    ) -> StoreConfiguration:
        """Get the store configuration.

        This method computes a store configuration starting from a baseline and
        applying the environment variables on top. If no baseline is provided,
        the following are used as a baseline:

        * the current store configuration, if it exists (e.g. if a store was
        configured in the global configuration file or explicitly set in the
        global configuration by calling `set_store`), or
        * the default store configuration, otherwise

        Args:
            baseline: Optional baseline store configuration to use.

        Returns:
            The store configuration.
        """
        from zenml.zen_stores.base_zen_store import BaseZenStore

        # Step 1: Create a baseline store configuration

        if baseline is not None:
            # Use the provided baseline store configuration
            store = baseline
        elif self.store is not None:
            # Use the current store configuration as a baseline
            store = self.store
        else:
            # Start with the default store configuration as a baseline
            store = self.get_default_store()

        # Step 2: Replace or update the baseline store configuration with the
        # environment variables

        env_store_config: Dict[str, str] = {}
        env_secrets_store_config: Dict[str, str] = {}
        env_backup_secrets_store_config: Dict[str, str] = {}
        for k, v in os.environ.items():
            if k.startswith(ENV_ZENML_STORE_PREFIX):
                env_store_config[k[len(ENV_ZENML_STORE_PREFIX) :].lower()] = v
            elif k.startswith(ENV_ZENML_SECRETS_STORE_PREFIX):
                env_secrets_store_config[
                    k[len(ENV_ZENML_SECRETS_STORE_PREFIX) :].lower()
                ] = v
            elif k.startswith(ENV_ZENML_BACKUP_SECRETS_STORE_PREFIX):
                env_backup_secrets_store_config[
                    k[len(ENV_ZENML_BACKUP_SECRETS_STORE_PREFIX) :].lower()
                ] = v

        if len(env_store_config):
            # As a convenience, we also infer the store type from the URL if
            # not explicitly set in the environment variables.
            if "type" not in env_store_config and "url" in env_store_config:
                env_store_config["type"] = BaseZenStore.get_store_type(
                    env_store_config["url"]
                )

            # We distinguish between two cases here: the environment variables
            # are used to completely replace the store configuration (i.e. when
            # the store type or URL is set using the environment variables), or
            # they are only used to update the store configuration. In the first
            # case, we replace the baseline store configuration with the
            # environment variables. In the second case, we only merge the
            # environment variables into the baseline store config.

            if "type" in env_store_config:
                logger.debug(
                    "Using environment variables to configure the store"
                )
                store = StoreConfiguration(
                    **env_store_config,
                )
            else:
                logger.debug(
                    "Using environment variables to update the default store"
                )
                store = store.copy(update=env_store_config, deep=True)

        # Step 3: Replace or update the baseline secrets store configuration
        # with the environment variables. This only applies to SQL stores.

        if store.type == StoreType.SQL:
            # We distinguish between two cases here: the environment
            # variables are used to completely replace the secrets store
            # configuration (i.e. when the secrets store type is set using
            # the environment variable), or they are only used to update the
            # store configuration. In the first case, we replace the
            # baseline secrets store configuration with the environment
            # variables. In the second case, we only merge the environment
            # variables into the baseline secrets store config (if any is
            # set).

            if len(env_secrets_store_config):
                if "type" in env_secrets_store_config:
                    logger.debug(
                        "Using environment variables to configure the secrets "
                        "store"
                    )
                    store.secrets_store = SecretsStoreConfiguration(
                        **env_secrets_store_config
                    )
                elif store.secrets_store:
                    logger.debug(
                        "Using environment variables to update the secrets "
                        "store"
                    )
                    store.secrets_store = store.secrets_store.copy(
                        update=env_secrets_store_config, deep=True
                    )

            if len(env_backup_secrets_store_config):
                if "type" in env_backup_secrets_store_config:
                    logger.debug(
                        "Using environment variables to configure the backup "
                        "secrets store"
                    )
                    store.backup_secrets_store = SecretsStoreConfiguration(
                        **env_backup_secrets_store_config
                    )
                elif store.backup_secrets_store:
                    logger.debug(
                        "Using environment variables to update the backup "
                        "secrets store"
                    )
                    store.backup_secrets_store = (
                        store.backup_secrets_store.copy(
                            update=env_backup_secrets_store_config, deep=True
                        )
                    )

        return store

    @property
    def store_configuration(self) -> StoreConfiguration:
        """Get the current store configuration.

        Returns:
            The store configuration.
        """
        # If the zen store is already initialized, we can get the store
        # configuration from there and disregard the global configuration.
        if self._zen_store is not None:
            return self._zen_store.config
        return self._get_store_configuration()

    def get_default_store(self) -> StoreConfiguration:
        """Get the default SQLite store configuration.

        Returns:
            The default SQLite store configuration.
        """
        from zenml.zen_stores.base_zen_store import BaseZenStore

        return BaseZenStore.get_default_store_config(
            path=os.path.join(
                self.local_stores_path,
                DEFAULT_STORE_DIRECTORY_NAME,
            )
        )

    def set_default_store(self) -> None:
        """Initializes and sets the default store configuration.

        Call this method to initialize or revert the store configuration to the
        default store.
        """
        # Apply the environment variables to the default store configuration
        default_store_cfg = self._get_store_configuration(
            baseline=self.get_default_store()
        )
        self._configure_store(default_store_cfg)
        logger.debug("Using the default store for the global config.")

    def uses_default_store(self) -> bool:
        """Check if the global configuration uses the default store.

        Returns:
            `True` if the global configuration uses the default store.
        """
        return self.store_configuration.url == self.get_default_store().url

    def set_store(
        self,
        config: StoreConfiguration,
        skip_default_registrations: bool = False,
        **kwargs: Any,
    ) -> None:
        """Update the active store configuration.

        Call this method to validate and update the active store configuration.

        Args:
            config: The new store configuration to use.
            skip_default_registrations: If `True`, the creation of the default
                stack and user in the store will be skipped.
            **kwargs: Additional keyword arguments to pass to the store
                constructor.
        """
        # Apply the environment variables to the custom store configuration
        config = self._get_store_configuration(baseline=config)
        self._configure_store(config, skip_default_registrations, **kwargs)
        logger.info("Updated the global store configuration.")

    @property
    def zen_store(self) -> "BaseZenStore":
        """Initialize and/or return the global zen store.

        If the store hasn't been initialized yet, it is initialized when this
        property is first accessed according to the global store configuration.

        Returns:
            The current zen store.
        """
        if self._zen_store is None:
            self._configure_store(self.store_configuration)
        assert self._zen_store is not None

        return self._zen_store

    def set_active_workspace(
        self, workspace: "WorkspaceResponse"
    ) -> "WorkspaceResponse":
        """Set the workspace for the local client.

        Args:
            workspace: The workspace to set active.

        Returns:
            The workspace that was set active.
        """
        self.active_workspace_name = workspace.name
        self._active_workspace = workspace
        # Sanitize the global configuration to reflect the new workspace
        self._sanitize_config()
        return workspace

    def set_active_stack(self, stack: "StackResponse") -> None:
        """Set the active stack for the local client.

        Args:
            stack: The model of the stack to set active.
        """
        self.active_stack_id = stack.id
        self._active_stack = stack

    def get_active_workspace(self) -> "WorkspaceResponse":
        """Get a model of the active workspace for the local client.

        Returns:
            The model of the active workspace.
        """
        workspace_name = self.get_active_workspace_name()

        if self._active_workspace is not None:
            return self._active_workspace

        workspace = self.zen_store.get_workspace(
            workspace_name_or_id=workspace_name,
        )
        return self.set_active_workspace(workspace)

    def get_active_workspace_name(self) -> str:
        """Get the name of the active workspace.

        If the active workspace doesn't exist yet, the ZenStore is reinitialized.

        Returns:
            The name of the active workspace.
        """
        if self.active_workspace_name is None:
            _ = self.zen_store
            assert self.active_workspace_name is not None

        return self.active_workspace_name

    def get_active_stack_id(self) -> UUID:
        """Get the ID of the active stack.

        If the active stack doesn't exist yet, the ZenStore is reinitialized.

        Returns:
            The active stack ID.
        """
        if self.active_stack_id is None:
            _ = self.zen_store
            assert self.active_stack_id is not None

        return self.active_stack_id

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes from configs of previous ZenML versions to
        # permit downgrading
        extra = "allow"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True

        # This is needed to allow correct handling of SecretStr values during
        # serialization.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value() if v else None
        }
config_directory: str property readonly

Directory where the global configuration file is located.

Returns:

Type Description
str

The directory where the global configuration file is located.

local_stores_path: str property readonly

Path where local stores information is stored.

Returns:

Type Description
str

The path where local stores information is stored.

store_configuration: StoreConfiguration property readonly

Get the current store configuration.

Returns:

Type Description
StoreConfiguration

The store configuration.

zen_store: BaseZenStore property readonly

Initialize and/or return the global zen store.

If the store hasn't been initialized yet, it is initialized when this property is first accessed according to the global store configuration.

Returns:

Type Description
BaseZenStore

The current zen store.

Config

Pydantic configuration class.

Source code in zenml/config/global_config.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes from configs of previous ZenML versions to
    # permit downgrading
    extra = "allow"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True

    # This is needed to allow correct handling of SecretStr values during
    # serialization.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value() if v else None
    }
__custom_getattribute__(self, key) special

Gets an attribute value for a specific key.

If a value for this attribute was specified using an environment variable called $(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME) and its value can be parsed to the attribute type, the value from this environment variable is returned instead.

Parameters:

Name Type Description Default
key str

The attribute name.

required

Returns:

Type Description
Any

The attribute value.

Source code in zenml/config/global_config.py
def __custom_getattribute__(self, key: str) -> Any:
    """Gets an attribute value for a specific key.

    If a value for this attribute was specified using an environment
    variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
    value can be parsed to the attribute type, the value from this
    environment variable is returned instead.

    Args:
        key: The attribute name.

    Returns:
        The attribute value.
    """
    value = super().__getattribute__(key)
    if key.startswith("_") or key not in type(self).__fields__:
        return value

    environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
    try:
        environment_variable_value = os.environ[environment_variable_name]
        # set the environment variable value to leverage Pydantic's type
        # conversion and validation
        super().__setattr__(key, environment_variable_value)
        return_value = super().__getattribute__(key)
        # set back the old value as we don't want to permanently store
        # the environment variable value here
        super().__setattr__(key, value)
        return return_value
    except (ValidationError, KeyError, TypeError):
        return value
__getattribute__(self, key) special

Gets an attribute value for a specific key.

If a value for this attribute was specified using an environment variable called $(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME) and its value can be parsed to the attribute type, the value from this environment variable is returned instead.

Parameters:

Name Type Description Default
key str

The attribute name.

required

Returns:

Type Description
Any

The attribute value.

Source code in zenml/config/global_config.py
def __custom_getattribute__(self, key: str) -> Any:
    """Gets an attribute value for a specific key.

    If a value for this attribute was specified using an environment
    variable called `$(CONFIG_ENV_VAR_PREFIX)$(ATTRIBUTE_NAME)` and its
    value can be parsed to the attribute type, the value from this
    environment variable is returned instead.

    Args:
        key: The attribute name.

    Returns:
        The attribute value.
    """
    value = super().__getattribute__(key)
    if key.startswith("_") or key not in type(self).__fields__:
        return value

    environment_variable_name = f"{CONFIG_ENV_VAR_PREFIX}{key.upper()}"
    try:
        environment_variable_value = os.environ[environment_variable_name]
        # set the environment variable value to leverage Pydantic's type
        # conversion and validation
        super().__setattr__(key, environment_variable_value)
        return_value = super().__getattribute__(key)
        # set back the old value as we don't want to permanently store
        # the environment variable value here
        super().__setattr__(key, value)
        return return_value
    except (ValidationError, KeyError, TypeError):
        return value
__init__(self, **data) special

Initializes a GlobalConfiguration using values from the config file.

GlobalConfiguration is a singleton class: only one instance can exist. Calling this constructor multiple times will always yield the same instance.

Parameters:

Name Type Description Default
data Any

Custom configuration options.

{}
Source code in zenml/config/global_config.py
def __init__(self, **data: Any) -> None:
    """Initializes a GlobalConfiguration using values from the config file.

    GlobalConfiguration is a singleton class: only one instance can exist.
    Calling this constructor multiple times will always yield the same
    instance.

    Args:
        data: Custom configuration options.
    """
    config_values = self._read_config()
    config_values.update(data)

    super().__init__(**config_values)

    if not fileio.exists(self._config_file):
        self._write_config()
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__setattr__(self, key, value) special

Sets an attribute and persists it in the global configuration.

Parameters:

Name Type Description Default
key str

The attribute name.

required
value Any

The attribute value.

required
Source code in zenml/config/global_config.py
def __setattr__(self, key: str, value: Any) -> None:
    """Sets an attribute and persists it in the global configuration.

    Args:
        key: The attribute name.
        value: The attribute value.
    """
    super().__setattr__(key, value)
    if key.startswith("_"):
        return
    self._write_config()
get_active_stack_id(self)

Get the ID of the active stack.

If the active stack doesn't exist yet, the ZenStore is reinitialized.

Returns:

Type Description
UUID

The active stack ID.

Source code in zenml/config/global_config.py
def get_active_stack_id(self) -> UUID:
    """Get the ID of the active stack.

    If the active stack doesn't exist yet, the ZenStore is reinitialized.

    Returns:
        The active stack ID.
    """
    if self.active_stack_id is None:
        _ = self.zen_store
        assert self.active_stack_id is not None

    return self.active_stack_id
get_active_workspace(self)

Get a model of the active workspace for the local client.

Returns:

Type Description
WorkspaceResponse

The model of the active workspace.

Source code in zenml/config/global_config.py
def get_active_workspace(self) -> "WorkspaceResponse":
    """Get a model of the active workspace for the local client.

    Returns:
        The model of the active workspace.
    """
    workspace_name = self.get_active_workspace_name()

    if self._active_workspace is not None:
        return self._active_workspace

    workspace = self.zen_store.get_workspace(
        workspace_name_or_id=workspace_name,
    )
    return self.set_active_workspace(workspace)
get_active_workspace_name(self)

Get the name of the active workspace.

If the active workspace doesn't exist yet, the ZenStore is reinitialized.

Returns:

Type Description
str

The name of the active workspace.

Source code in zenml/config/global_config.py
def get_active_workspace_name(self) -> str:
    """Get the name of the active workspace.

    If the active workspace doesn't exist yet, the ZenStore is reinitialized.

    Returns:
        The name of the active workspace.
    """
    if self.active_workspace_name is None:
        _ = self.zen_store
        assert self.active_workspace_name is not None

    return self.active_workspace_name
get_config_environment_vars(self)

Convert the global configuration to environment variables.

Returns:

Type Description
Dict[str, str]

Environment variables dictionary.

Source code in zenml/config/global_config.py
def get_config_environment_vars(self) -> Dict[str, str]:
    """Convert the global configuration to environment variables.

    Returns:
        Environment variables dictionary.
    """
    environment_vars = {}

    for key in self.__fields__.keys():
        if key == "store":
            # The store configuration uses its own environment variable
            # naming scheme
            continue

        value = getattr(self, key)
        if value is not None:
            environment_vars[CONFIG_ENV_VAR_PREFIX + key.upper()] = str(
                value
            )

    store_dict = self.store_configuration.dict(exclude_none=True)

    # The secrets store and backup secrets store configurations use their
    # own environment variables naming scheme
    secrets_store_dict = store_dict.pop("secrets_store", None) or {}
    backup_secrets_store_dict = (
        store_dict.pop("backup_secrets_store", None) or {}
    )

    for key, value in store_dict.items():
        if key in ["username", "password"]:
            # Never include the username and password in the env vars. Use
            # the API token instead.
            continue

        environment_vars[ENV_ZENML_STORE_PREFIX + key.upper()] = str(value)

    for key, value in secrets_store_dict.items():
        environment_vars[ENV_ZENML_SECRETS_STORE_PREFIX + key.upper()] = (
            str(value)
        )

    for key, value in backup_secrets_store_dict.items():
        environment_vars[
            ENV_ZENML_BACKUP_SECRETS_STORE_PREFIX + key.upper()
        ] = str(value)

    return environment_vars
get_default_store(self)

Get the default SQLite store configuration.

Returns:

Type Description
StoreConfiguration

The default SQLite store configuration.

Source code in zenml/config/global_config.py
def get_default_store(self) -> StoreConfiguration:
    """Get the default SQLite store configuration.

    Returns:
        The default SQLite store configuration.
    """
    from zenml.zen_stores.base_zen_store import BaseZenStore

    return BaseZenStore.get_default_store_config(
        path=os.path.join(
            self.local_stores_path,
            DEFAULT_STORE_DIRECTORY_NAME,
        )
    )
get_instance() classmethod

Return the GlobalConfiguration singleton instance.

Returns:

Type Description
Optional[GlobalConfiguration]

The GlobalConfiguration singleton instance or None, if the GlobalConfiguration hasn't been initialized yet.

Source code in zenml/config/global_config.py
@classmethod
def get_instance(cls) -> Optional["GlobalConfiguration"]:
    """Return the GlobalConfiguration singleton instance.

    Returns:
        The GlobalConfiguration singleton instance or None, if the
        GlobalConfiguration hasn't been initialized yet.
    """
    return cls._global_config
set_active_stack(self, stack)

Set the active stack for the local client.

Parameters:

Name Type Description Default
stack StackResponse

The model of the stack to set active.

required
Source code in zenml/config/global_config.py
def set_active_stack(self, stack: "StackResponse") -> None:
    """Set the active stack for the local client.

    Args:
        stack: The model of the stack to set active.
    """
    self.active_stack_id = stack.id
    self._active_stack = stack
set_active_workspace(self, workspace)

Set the workspace for the local client.

Parameters:

Name Type Description Default
workspace WorkspaceResponse

The workspace to set active.

required

Returns:

Type Description
WorkspaceResponse

The workspace that was set active.

Source code in zenml/config/global_config.py
def set_active_workspace(
    self, workspace: "WorkspaceResponse"
) -> "WorkspaceResponse":
    """Set the workspace for the local client.

    Args:
        workspace: The workspace to set active.

    Returns:
        The workspace that was set active.
    """
    self.active_workspace_name = workspace.name
    self._active_workspace = workspace
    # Sanitize the global configuration to reflect the new workspace
    self._sanitize_config()
    return workspace
set_default_store(self)

Initializes and sets the default store configuration.

Call this method to initialize or revert the store configuration to the default store.

Source code in zenml/config/global_config.py
def set_default_store(self) -> None:
    """Initializes and sets the default store configuration.

    Call this method to initialize or revert the store configuration to the
    default store.
    """
    # Apply the environment variables to the default store configuration
    default_store_cfg = self._get_store_configuration(
        baseline=self.get_default_store()
    )
    self._configure_store(default_store_cfg)
    logger.debug("Using the default store for the global config.")
set_store(self, config, skip_default_registrations=False, **kwargs)

Update the active store configuration.

Call this method to validate and update the active store configuration.

Parameters:

Name Type Description Default
config StoreConfiguration

The new store configuration to use.

required
skip_default_registrations bool

If True, the creation of the default stack and user in the store will be skipped.

False
**kwargs Any

Additional keyword arguments to pass to the store constructor.

{}
Source code in zenml/config/global_config.py
def set_store(
    self,
    config: StoreConfiguration,
    skip_default_registrations: bool = False,
    **kwargs: Any,
) -> None:
    """Update the active store configuration.

    Call this method to validate and update the active store configuration.

    Args:
        config: The new store configuration to use.
        skip_default_registrations: If `True`, the creation of the default
            stack and user in the store will be skipped.
        **kwargs: Additional keyword arguments to pass to the store
            constructor.
    """
    # Apply the environment variables to the custom store configuration
    config = self._get_store_configuration(baseline=config)
    self._configure_store(config, skip_default_registrations, **kwargs)
    logger.info("Updated the global store configuration.")
uses_default_store(self)

Check if the global configuration uses the default store.

Returns:

Type Description
bool

True if the global configuration uses the default store.

Source code in zenml/config/global_config.py
def uses_default_store(self) -> bool:
    """Check if the global configuration uses the default store.

    Returns:
        `True` if the global configuration uses the default store.
    """
    return self.store_configuration.url == self.get_default_store().url

pipeline_configurations

Pipeline configuration classes.

PipelineConfiguration (PipelineConfigurationUpdate) pydantic-model

Pipeline configuration class.

Source code in zenml/config/pipeline_configurations.py
class PipelineConfiguration(PipelineConfigurationUpdate):
    """Pipeline configuration class."""

    name: str

    @validator("name")
    def ensure_pipeline_name_allowed(cls, name: str) -> str:
        """Ensures the pipeline name is allowed.

        Args:
            name: Name of the pipeline.

        Returns:
            The validated name of the pipeline.

        Raises:
            ValueError: If the name is not allowed.
        """
        if name in DISALLOWED_PIPELINE_NAMES:
            raise ValueError(
                f"Pipeline name '{name}' is not allowed since '{name}' is a "
                "reserved key word. Please choose another name."
            )
        return name

    @property
    def docker_settings(self) -> "DockerSettings":
        """Docker settings of this pipeline.

        Returns:
            The Docker settings of this pipeline.
        """
        from zenml.config import DockerSettings

        model_or_dict: SettingsOrDict = self.settings.get(
            DOCKER_SETTINGS_KEY, {}
        )
        return DockerSettings.parse_obj(model_or_dict)
docker_settings: DockerSettings property readonly

Docker settings of this pipeline.

Returns:

Type Description
DockerSettings

The Docker settings of this pipeline.

ensure_pipeline_name_allowed(name) classmethod

Ensures the pipeline name is allowed.

Parameters:

Name Type Description Default
name str

Name of the pipeline.

required

Returns:

Type Description
str

The validated name of the pipeline.

Exceptions:

Type Description
ValueError

If the name is not allowed.

Source code in zenml/config/pipeline_configurations.py
@validator("name")
def ensure_pipeline_name_allowed(cls, name: str) -> str:
    """Ensures the pipeline name is allowed.

    Args:
        name: Name of the pipeline.

    Returns:
        The validated name of the pipeline.

    Raises:
        ValueError: If the name is not allowed.
    """
    if name in DISALLOWED_PIPELINE_NAMES:
        raise ValueError(
            f"Pipeline name '{name}' is not allowed since '{name}' is a "
            "reserved key word. Please choose another name."
        )
    return name

PipelineConfigurationUpdate (StrictBaseModel) pydantic-model

Class for pipeline configuration updates.

Source code in zenml/config/pipeline_configurations.py
class PipelineConfigurationUpdate(StrictBaseModel):
    """Class for pipeline configuration updates."""

    enable_cache: Optional[bool] = None
    enable_artifact_metadata: Optional[bool] = None
    enable_artifact_visualization: Optional[bool] = None
    enable_step_logs: Optional[bool] = None
    settings: Dict[str, BaseSettings] = {}
    extra: Dict[str, Any] = {}
    failure_hook_source: Optional[Source] = None
    success_hook_source: Optional[Source] = None
    model: Optional[Model] = None
    parameters: Optional[Dict[str, Any]] = None
    retry: Optional[StepRetryConfig] = None

    _convert_source = convert_source_validator(
        "failure_hook_source", "success_hook_source"
    )

pipeline_run_configuration

Pipeline run configuration class.

PipelineRunConfiguration (StrictBaseModel, YAMLSerializationMixin) pydantic-model

Class for pipeline run configurations.

Source code in zenml/config/pipeline_run_configuration.py
class PipelineRunConfiguration(
    StrictBaseModel, pydantic_utils.YAMLSerializationMixin
):
    """Class for pipeline run configurations."""

    run_name: Optional[str] = None
    enable_cache: Optional[bool] = None
    enable_artifact_metadata: Optional[bool] = None
    enable_artifact_visualization: Optional[bool] = None
    enable_step_logs: Optional[bool] = None
    schedule: Optional[Schedule] = None
    build: Union[PipelineBuildBase, UUID, None] = None
    steps: Dict[str, StepConfigurationUpdate] = {}
    settings: Dict[str, BaseSettings] = {}
    extra: Dict[str, Any] = {}
    model: Optional[Model] = None
    parameters: Optional[Dict[str, Any]] = None
    retry: Optional[StepRetryConfig] = None

pipeline_spec

Pipeline configuration classes.

PipelineSpec (StrictBaseModel) pydantic-model

Specification of a pipeline.

Source code in zenml/config/pipeline_spec.py
class PipelineSpec(StrictBaseModel):
    """Specification of a pipeline."""

    # Versions:
    # - 0.2: Legacy BasePipeline in release <=0.39.1, the upstream steps and
    #   inputs in the step specs refer to the step names, not the pipeline
    #   parameter names
    # - 0.3: Legacy BasePipeline in release >0.39.1, the upstream steps and
    #   inputs in the step specs refer to the pipeline parameter names
    # - 0.4: New Pipeline class, the upstream steps and
    #   inputs in the step specs refer to the pipeline parameter names
    version: str = "0.4"
    source: Optional[Source] = None
    parameters: Dict[str, Any] = {}
    steps: List[StepSpec]

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same pipeline.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same pipeline.
        """
        if isinstance(other, PipelineSpec):
            return self.steps == other.steps
        return NotImplemented

    @property
    def json_with_string_sources(self) -> str:
        """JSON representation with sources replaced by their import path.

        Returns:
            The JSON representation.
        """
        from packaging import version

        dict_ = self.dict()

        if self.source:
            dict_["source"] = self.source.import_path

        for step_dict in dict_["steps"]:
            step_dict["source"] = Source.parse_obj(
                step_dict["source"]
            ).import_path

        if version.parse(self.version) < version.parse("0.4"):
            # Keep backwards compatibility with old pipeline versions
            dict_.pop("source")
            dict_.pop("parameters")

        return json.dumps(dict_, sort_keys=False, default=pydantic_encoder)
json_with_string_sources: str property readonly

JSON representation with sources replaced by their import path.

Returns:

Type Description
str

The JSON representation.

__eq__(self, other) special

Returns whether the other object is referring to the same pipeline.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same pipeline.

Source code in zenml/config/pipeline_spec.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same pipeline.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same pipeline.
    """
    if isinstance(other, PipelineSpec):
        return self.steps == other.steps
    return NotImplemented

resource_settings

Resource settings class used to specify resources for a step.

ByteUnit (Enum)

Enum for byte units.

Source code in zenml/config/resource_settings.py
class ByteUnit(Enum):
    """Enum for byte units."""

    KB = "KB"
    KIB = "KiB"
    MB = "MB"
    MIB = "MiB"
    GB = "GB"
    GIB = "GiB"
    TB = "TB"
    TIB = "TiB"
    PB = "PB"
    PIB = "PiB"

    @property
    def byte_value(self) -> int:
        """Returns the amount of bytes that this unit represents.

        Returns:
            The byte value of this unit.
        """
        return {
            ByteUnit.KB: 10**3,
            ByteUnit.KIB: 1 << 10,
            ByteUnit.MB: 10**6,
            ByteUnit.MIB: 1 << 20,
            ByteUnit.GB: 10**9,
            ByteUnit.GIB: 1 << 30,
            ByteUnit.TB: 10**12,
            ByteUnit.TIB: 1 << 40,
            ByteUnit.PB: 10**15,
            ByteUnit.PIB: 1 << 50,
        }[self]

ResourceSettings (BaseSettings) pydantic-model

Hardware resource settings.

Attributes:

Name Type Description
cpu_count Optional[pydantic.types.PositiveFloat]

The amount of CPU cores that should be configured.

gpu_count Optional[pydantic.types.NonNegativeInt]

The amount of GPUs that should be configured.

memory Optional[str]

The amount of memory that should be configured.

Source code in zenml/config/resource_settings.py
class ResourceSettings(BaseSettings):
    """Hardware resource settings.

    Attributes:
        cpu_count: The amount of CPU cores that should be configured.
        gpu_count: The amount of GPUs that should be configured.
        memory: The amount of memory that should be configured.
    """

    cpu_count: Optional[PositiveFloat] = None
    gpu_count: Optional[NonNegativeInt] = None
    memory: Optional[str] = Field(regex=MEMORY_REGEX)

    @property
    def empty(self) -> bool:
        """Returns if this object is "empty" (=no values configured) or not.

        Returns:
            `True` if no values were configured, `False` otherwise.
        """
        # To detect whether this config is empty (= no values specified), we
        # check if there are any attributes which are explicitly set to any
        # value other than `None`.
        return len(self.dict(exclude_unset=True, exclude_none=True)) == 0

    def get_memory(
        self, unit: Union[str, ByteUnit] = ByteUnit.GB
    ) -> Optional[float]:
        """Gets the memory configuration in a specific unit.

        Args:
            unit: The unit to which the memory should be converted.

        Raises:
            ValueError: If the memory string is invalid.

        Returns:
            The memory configuration converted to the requested unit, or None
            if no memory was configured.
        """
        if not self.memory:
            return None

        if isinstance(unit, str):
            unit = ByteUnit(unit)

        memory = self.memory
        for memory_unit in ByteUnit:
            if memory.endswith(memory_unit.value):
                memory_value = int(memory[: -len(memory_unit.value)])
                return memory_value * memory_unit.byte_value / unit.byte_value
        else:
            # Should never happen due to the regex validation
            raise ValueError(f"Unable to parse memory unit from '{memory}'.")

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False

        # prevent extra attributes during model initialization
        extra = Extra.forbid
empty: bool property readonly

Returns if this object is "empty" (=no values configured) or not.

Returns:

Type Description
bool

True if no values were configured, False otherwise.

Config

Pydantic configuration class.

Source code in zenml/config/resource_settings.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False

    # prevent extra attributes during model initialization
    extra = Extra.forbid
get_memory(self, unit=<ByteUnit.GB: 'GB'>)

Gets the memory configuration in a specific unit.

Parameters:

Name Type Description Default
unit Union[str, zenml.config.resource_settings.ByteUnit]

The unit to which the memory should be converted.

<ByteUnit.GB: 'GB'>

Exceptions:

Type Description
ValueError

If the memory string is invalid.

Returns:

Type Description
Optional[float]

The memory configuration converted to the requested unit, or None if no memory was configured.

Source code in zenml/config/resource_settings.py
def get_memory(
    self, unit: Union[str, ByteUnit] = ByteUnit.GB
) -> Optional[float]:
    """Gets the memory configuration in a specific unit.

    Args:
        unit: The unit to which the memory should be converted.

    Raises:
        ValueError: If the memory string is invalid.

    Returns:
        The memory configuration converted to the requested unit, or None
        if no memory was configured.
    """
    if not self.memory:
        return None

    if isinstance(unit, str):
        unit = ByteUnit(unit)

    memory = self.memory
    for memory_unit in ByteUnit:
        if memory.endswith(memory_unit.value):
            memory_value = int(memory[: -len(memory_unit.value)])
            return memory_value * memory_unit.byte_value / unit.byte_value
    else:
        # Should never happen due to the regex validation
        raise ValueError(f"Unable to parse memory unit from '{memory}'.")

retry_config

Retry configuration for a step.

StepRetryConfig (StrictBaseModel) pydantic-model

Retry configuration for a step.

Delay is an integer (specified in seconds).

Source code in zenml/config/retry_config.py
class StepRetryConfig(StrictBaseModel):
    """Retry configuration for a step.

    Delay is an integer (specified in seconds).
    """

    max_retries: int = 1
    delay: int = 0  # in seconds
    backoff: int = 0

schedule

Class for defining a pipeline schedule.

Schedule (BaseModel) pydantic-model

Class for defining a pipeline schedule.

Attributes:

Name Type Description
name Optional[str]

Optional name to give to the schedule. If not set, a default name will be generated based on the pipeline name and the current date and time.

cron_expression Optional[str]

Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval.

start_time Optional[datetime.datetime]

datetime object to indicate when to start the schedule.

end_time Optional[datetime.datetime]

datetime object to indicate when to end the schedule.

interval_second Optional[datetime.timedelta]

datetime timedelta indicating the seconds between two recurring runs for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterward. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

run_once_start_time Optional[datetime.datetime]

datetime object to indicate when to run the pipeline once. This is useful for one-off runs.

Source code in zenml/config/schedule.py
class Schedule(BaseModel):
    """Class for defining a pipeline schedule.

    Attributes:
        name: Optional name to give to the schedule. If not set, a default name
            will be generated based on the pipeline name and the current date
            and time.
        cron_expression: Cron expression for the pipeline schedule. If a value
            for this is set it takes precedence over the start time + interval.
        start_time: datetime object to indicate when to start the schedule.
        end_time: datetime object to indicate when to end the schedule.
        interval_second: datetime timedelta indicating the seconds between two
            recurring runs for a periodic schedule.
        catchup: Whether the recurring run should catch up if behind schedule.
            For example, if the recurring run is paused for a while and
            re-enabled afterward. If catchup=True, the scheduler will catch
            up on (backfill) each missed interval. Otherwise, it only
            schedules the latest interval if more than one interval is ready to
            be scheduled. Usually, if your pipeline handles backfill
            internally, you should turn catchup off to avoid duplicate backfill.
        run_once_start_time: datetime object to indicate when to run the
            pipeline once. This is useful for one-off runs.
    """

    name: Optional[str] = None
    cron_expression: Optional[str] = None
    start_time: Optional[datetime.datetime] = None
    end_time: Optional[datetime.datetime] = None
    interval_second: Optional[datetime.timedelta] = None
    catchup: bool = False
    run_once_start_time: Optional[datetime.datetime] = None

    @root_validator
    def _ensure_cron_or_periodic_schedule_configured(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Ensures that the cron expression or start time + interval are set.

        Args:
            values: All attributes of the schedule.

        Returns:
            All schedule attributes.

        Raises:
            ValueError: If no cron expression or start time + interval were
                provided.
        """
        cron_expression = values.get("cron_expression")
        periodic_schedule = values.get("start_time") and values.get(
            "interval_second"
        )
        run_once_starts_at = values.get("run_once_start_time")

        if cron_expression and periodic_schedule:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "values for `start_time` and `interval_seconds`. The resulting "
                "behavior depends on the concrete orchestrator implementation "
                "but will usually ignore the interval and use the cron "
                "expression."
            )
            return values
        elif cron_expression and run_once_starts_at:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "a value for `run_once_start_time`. The resulting behavior "
                "depends on the concrete orchestrator implementation but will "
                "usually ignore the `run_once_start_time`."
            )
            return values
        elif cron_expression or periodic_schedule or run_once_starts_at:
            return values
        else:
            raise ValueError(
                "Either a cron expression, a start time and interval seconds "
                "or a run once start time "
                "need to be set for a valid schedule."
            )

    @property
    def utc_start_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC start time.

        Returns:
            Optional ISO-formatted string of the UTC start time.
        """
        if not self.start_time:
            return None

        return self.start_time.astimezone(datetime.timezone.utc).isoformat()

    @property
    def utc_end_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC end time.

        Returns:
            Optional ISO-formatted string of the UTC end time.
        """
        if not self.end_time:
            return None

        return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC end time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC end time.

utc_start_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC start time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC start time.

secret_reference_mixin

Secret reference mixin implementation.

SecretReferenceMixin (BaseModel) pydantic-model

Mixin class for secret references in pydantic model attributes.

Source code in zenml/config/secret_reference_mixin.py
class SecretReferenceMixin(BaseModel):
    """Mixin class for secret references in pydantic model attributes."""

    def __init__(
        self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
    ) -> None:
        """Ensures that secret references are only passed for valid fields.

        This method ensures that secret references are not passed for fields
        that explicitly prevent them or require pydantic validation.

        Args:
            warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
            **kwargs: Arguments to initialize this object.

        Raises:
            ValueError: If an attribute that requires custom pydantic validation
                or an attribute which explicitly disallows secret references
                is passed as a secret reference.
        """
        for key, value in kwargs.items():
            try:
                field = self.__class__.__fields__[key]
            except KeyError:
                # Value for a private attribute or non-existing field, this
                # will fail during the upcoming pydantic validation
                continue

            if value is None:
                continue

            if not secret_utils.is_secret_reference(value):
                if (
                    secret_utils.is_secret_field(field)
                    and warn_about_plain_text_secrets
                ):
                    logger.warning(
                        "You specified a plain-text value for the sensitive "
                        f"attribute `{key}`. This is currently only a warning, "
                        "but future versions of ZenML will require you to pass "
                        "in sensitive information as secrets. Check out the "
                        "documentation on how to configure values with secrets "
                        "here: https://docs.zenml.io/getting-started/deploying-zenml/manage-the-deployed-services/secret-management"
                    )
                continue

            if secret_utils.is_clear_text_field(field):
                raise ValueError(
                    f"Passing the `{key}` attribute as a secret reference is "
                    "not allowed."
                )

            requires_validation = field.pre_validators or field.post_validators
            if requires_validation:
                raise ValueError(
                    f"Passing the attribute `{key}` as a secret reference is "
                    "not allowed as additional validation is required for "
                    "this attribute."
                )

        super().__init__(**kwargs)

    def __custom_getattribute__(self, key: str) -> Any:
        """Returns the (potentially resolved) attribute value for the given key.

        An attribute value may be either specified directly, or as a secret
        reference. In case of a secret reference, this method resolves the
        reference and returns the secret value instead.

        Args:
            key: The key for which to get the attribute value.

        Raises:
            KeyError: If the secret or secret key don't exist.

        Returns:
            The (potentially resolved) attribute value.
        """
        value = super().__getattribute__(key)

        if not secret_utils.is_secret_reference(value):
            return value

        from zenml.client import Client

        secret_ref = secret_utils.parse_secret_reference(value)

        # Try to resolve the secret using the secret store
        try:
            secret = Client().get_secret_by_name_and_scope(
                name=secret_ref.name,
            )
        except (KeyError, NotImplementedError):
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key}: "
                f"The secret {secret_ref.name} does not exist."
            )

        if secret_ref.key not in secret.values:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key}: "
                f"The secret {secret_ref.name} does not contain a value "
                f"for key {secret_ref.key}. Available keys: "
                f"{set(secret.values.keys())}."
            )

        return secret.secret_values[secret_ref.key]

    if not TYPE_CHECKING:
        # When defining __getattribute__, mypy allows accessing non-existent
        # attributes without failing
        # (see https://github.com/python/mypy/issues/13319).
        __getattribute__ = __custom_getattribute__

    @property
    def required_secrets(self) -> Set[secret_utils.SecretReference]:
        """All required secrets for this object.

        Returns:
            The required secrets of this object.
        """
        return {
            secret_utils.parse_secret_reference(v)
            for v in self.dict().values()
            if secret_utils.is_secret_reference(v)
        }
required_secrets: Set[zenml.utils.secret_utils.SecretReference] property readonly

All required secrets for this object.

Returns:

Type Description
Set[zenml.utils.secret_utils.SecretReference]

The required secrets of this object.

__custom_getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/config/secret_reference_mixin.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    from zenml.client import Client

    secret_ref = secret_utils.parse_secret_reference(value)

    # Try to resolve the secret using the secret store
    try:
        secret = Client().get_secret_by_name_and_scope(
            name=secret_ref.name,
        )
    except (KeyError, NotImplementedError):
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not exist."
        )

    if secret_ref.key not in secret.values:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not contain a value "
            f"for key {secret_ref.key}. Available keys: "
            f"{set(secret.values.keys())}."
        )

    return secret.secret_values[secret_ref.key]
__getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/config/secret_reference_mixin.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    from zenml.client import Client

    secret_ref = secret_utils.parse_secret_reference(value)

    # Try to resolve the secret using the secret store
    try:
        secret = Client().get_secret_by_name_and_scope(
            name=secret_ref.name,
        )
    except (KeyError, NotImplementedError):
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not exist."
        )

    if secret_ref.key not in secret.values:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key}: "
            f"The secret {secret_ref.name} does not contain a value "
            f"for key {secret_ref.key}. Available keys: "
            f"{set(secret.values.keys())}."
        )

    return secret.secret_values[secret_ref.key]
__init__(self, warn_about_plain_text_secrets=False, **kwargs) special

Ensures that secret references are only passed for valid fields.

This method ensures that secret references are not passed for fields that explicitly prevent them or require pydantic validation.

Parameters:

Name Type Description Default
warn_about_plain_text_secrets bool

If true, then warns about using plain-text secrets.

False
**kwargs Any

Arguments to initialize this object.

{}

Exceptions:

Type Description
ValueError

If an attribute that requires custom pydantic validation or an attribute which explicitly disallows secret references is passed as a secret reference.

Source code in zenml/config/secret_reference_mixin.py
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.__fields__[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/manage-the-deployed-services/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = field.pre_validators or field.post_validators
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)

secrets_store_config

Functionality to support ZenML secrets store configurations.

SecretsStoreConfiguration (BaseModel) pydantic-model

Generic secrets store configuration.

The store configurations of concrete secrets store implementations must inherit from this class and validate any extra attributes that are configured in addition to those defined in this class.

Attributes:

Name Type Description
type SecretsStoreType

The type of store backend.

class_path Optional[str]

The Python class path of the store backend. Should point to a subclass of BaseSecretsStore. This is optional and only required if the store backend is not one of the built-in implementations.

Source code in zenml/config/secrets_store_config.py
class SecretsStoreConfiguration(BaseModel):
    """Generic secrets store configuration.

    The store configurations of concrete secrets store implementations must
    inherit from this class and validate any extra attributes that are
    configured in addition to those defined in this class.

    Attributes:
        type: The type of store backend.
        class_path: The Python class path of the store backend. Should point to
            a subclass of `BaseSecretsStore`. This is optional and only
            required if the store backend is not one of the built-in
            implementations.
    """

    type: SecretsStoreType
    class_path: Optional[str] = None

    @root_validator
    def validate_custom(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate that class_path is set for custom secrets stores.

        Args:
            values: Dict representing user-specified runtime settings.

        Returns:
            Validated settings.

        Raises:
            ValueError: If class_path is not set when using an custom secrets
                store.
        """
        if not values.get("type"):
            return values
        if values["type"] == SecretsStoreType.CUSTOM:
            if values["class_path"] is None:
                raise ValueError(
                    "A class_path must be set when using a custom secrets "
                    "store implementation."
                )
        elif values["class_path"] is not None:
            raise ValueError(
                f"The class_path attribute is not supported for the "
                f"{values['type']} secrets store type."
            )

        return values

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes to be set in the base class. The concrete
        # classes are responsible for validating the attributes.
        extra = "allow"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
Config

Pydantic configuration class.

Source code in zenml/config/secrets_store_config.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes to be set in the base class. The concrete
    # classes are responsible for validating the attributes.
    extra = "allow"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
validate_custom(values) classmethod

Validate that class_path is set for custom secrets stores.

Parameters:

Name Type Description Default
values Dict[str, Any]

Dict representing user-specified runtime settings.

required

Returns:

Type Description
Dict[str, Any]

Validated settings.

Exceptions:

Type Description
ValueError

If class_path is not set when using an custom secrets store.

Source code in zenml/config/secrets_store_config.py
@root_validator
def validate_custom(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Validate that class_path is set for custom secrets stores.

    Args:
        values: Dict representing user-specified runtime settings.

    Returns:
        Validated settings.

    Raises:
        ValueError: If class_path is not set when using an custom secrets
            store.
    """
    if not values.get("type"):
        return values
    if values["type"] == SecretsStoreType.CUSTOM:
        if values["class_path"] is None:
            raise ValueError(
                "A class_path must be set when using a custom secrets "
                "store implementation."
            )
    elif values["class_path"] is not None:
        raise ValueError(
            f"The class_path attribute is not supported for the "
            f"{values['type']} secrets store type."
        )

    return values

server_config

Functionality to support ZenML GlobalConfiguration.

ServerConfiguration (BaseModel) pydantic-model

ZenML Server configuration attributes.

Attributes:

Name Type Description
deployment_type ServerDeploymentType

The type of ZenML server deployment that is running.

server_url Optional[str]

The URL where the ZenML server API is reachable. Must be configured for features that involve triggering workloads from the ZenML dashboard (e.g., running pipelines). If not specified, the clients will use the same URL used to connect them to the ZenML server.

dashboard_url Optional[str]

The URL where the ZenML dashboard is reachable. If not specified, the server_url value is used. This should be configured if the dashboard is served from a different URL than the ZenML server.

root_url_path str

The root URL path for the ZenML API and dashboard.

auth_scheme AuthScheme

The authentication scheme used by the ZenML server.

jwt_token_algorithm str

The algorithm used to sign and verify JWT tokens.

jwt_token_issuer Optional[str]

The issuer of the JWT tokens. If not specified, the issuer is set to the ZenML Server ID.

jwt_token_audience Optional[str]

The audience of the JWT tokens. If not specified, the audience is set to the ZenML Server ID.

jwt_token_leeway_seconds int

The leeway in seconds allowed when verifying the expiration time of JWT tokens.

jwt_token_expire_minutes Optional[int]

The expiration time of JWT tokens in minutes. If not specified, generated JWT tokens will not be set to expire.

jwt_secret_key str

The secret key used to sign and verify JWT tokens. If not specified, a random secret key is generated.

auth_cookie_name Optional[str]

The name of the http-only cookie used to store the JWT token. If not specified, the cookie name is set to a value computed from the ZenML server ID.

auth_cookie_domain Optional[str]

The domain of the http-only cookie used to store the JWT token. If not specified, the cookie will be valid for the domain where the ZenML server is running.

cors_allow_origins Optional[List[str]]

The origins allowed to make cross-origin requests to the ZenML server. If not specified, all origins are allowed.

max_failed_device_auth_attempts int

The maximum number of failed OAuth 2.0 device authentication attempts before the device is locked.

device_auth_timeout int

The timeout in seconds after which a pending OAuth 2.0 device authorization request expires.

device_auth_polling_interval int

The polling interval in seconds used to poll the OAuth 2.0 device authorization endpoint.

device_expiration_minutes Optional[int]

The time in minutes that an OAuth 2.0 device is allowed to be used to authenticate with the ZenML server. If not set or if jwt_token_expire_minutes is not set, the devices are allowed to be used indefinitely. This controls the expiration time of the JWT tokens issued to clients after they have authenticated with the ZenML server using an OAuth 2.0 device.

trusted_device_expiration_minutes Optional[int]

The time in minutes that a trusted OAuth 2.0 device is allowed to be used to authenticate with the ZenML server. If not set or if jwt_token_expire_minutes is not set, the devices are allowed to be used indefinitely. This controls the expiration time of the JWT tokens issued to clients after they have authenticated with the ZenML server using an OAuth 2.0 device that has been marked as trusted.

external_login_url Optional[str]

The login URL of an external authenticator service to use with the EXTERNAL authentication scheme.

external_user_info_url Optional[str]

The user info URL of an external authenticator service to use with the EXTERNAL authentication scheme.

external_cookie_name Optional[str]

The name of the http-only cookie used to store the bearer token used to authenticate with the external authenticator service. Must be specified if the EXTERNAL authentication scheme is used.

external_server_id Optional[uuid.UUID]

The ID of the ZenML server to use with the EXTERNAL authentication scheme. If not specified, the regular ZenML server ID is used.

metadata Dict[str, Any]

Additional metadata to be associated with the ZenML server.

rbac_implementation_source Optional[str]

Source pointing to a class implementing the RBAC interface defined by zenml.zen_server.rbac_interface.RBACInterface. If not specified, RBAC will not be enabled for this server.

feature_gate_implementation_source Optional[str]

Source pointing to a class implementing the feature gate interface defined by zenml.zen_server.feature_gate.feature_gate_interface.FeatureGateInterface. If not specified, feature usage will not be gated/tracked for this server.

workload_manager_implementation_source Optional[str]

Source pointing to a class implementing the workload management interface.

pipeline_run_auth_window int

The default time window in minutes for which a pipeline run action is allowed to authenticate with the ZenML server.

login_rate_limit_minute int

The number of login attempts allowed per minute.

login_rate_limit_day int

The number of login attempts allowed per day.

secure_headers_server Union[bool, str]

Custom value to be set in the Server HTTP header to identify the server. If not specified, or if set to one of the reserved values enabled, yes, true, on, the Server header will be set to the default value (ZenML server ID). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the Server header will not be included in responses.

secure_headers_hsts Union[bool, str]

The server header value to be set in the HTTP header Strict-Transport-Security. If not specified, or if set to one of the reserved values enabled, yes, true, on, the Strict-Transport-Security header will be set to the default value (max-age=63072000; includeSubdomains). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the Strict-Transport-Security header will not be included in responses.

secure_headers_xfo Union[bool, str]

The server header value to be set in the HTTP header X-Frame-Options. If not specified, or if set to one of the reserved values enabled, yes, true, on, the X-Frame-Options header will be set to the default value (SAMEORIGIN). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the X-Frame-Options header will not be included in responses.

secure_headers_xxp Union[bool, str]

The server header value to be set in the HTTP header X-XSS-Protection. If not specified, or if set to one of the reserved values enabled, yes, true, on, the X-XSS-Protection header will be set to the default value (0). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the X-XSS-Protection header will not be included in responses. NOTE: this header is deprecated and should always be set to 0. The Content-Security-Policy header should be used instead.

secure_headers_content Union[bool, str]

The server header value to be set in the HTTP header X-Content-Type-Options. If not specified, or if set to one of the reserved values enabled, yes, true, on, the X-Content-Type-Options header will be set to the default value (nosniff). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the X-Content-Type-Options header will not be included in responses.

secure_headers_csp Union[bool, str]

The server header value to be set in the HTTP header Content-Security-Policy. If not specified, or if set to one of the reserved values enabled, yes, true, on, the Content-Security-Policy header will be set to a default value that is compatible with the ZenML dashboard. If set to one of the reserved values disabled, no, none, false, off or to an empty string, the Content-Security-Policy header will not be included in responses.

secure_headers_referrer Union[bool, str]

The server header value to be set in the HTTP header Referrer-Policy. If not specified, or if set to one of the reserved values enabled, yes, true, on, the Referrer-Policy header will be set to the default value (no-referrer-when-downgrade). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the Referrer-Policy header will not be included in responses.

secure_headers_cache Union[bool, str]

The server header value to be set in the HTTP header Cache-Control. If not specified, or if set to one of the reserved values enabled, yes, true, on, the Cache-Control header will be set to the default value (no-store, no-cache, must-revalidate). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the Cache-Control header will not be included in responses.

secure_headers_permissions Union[bool, str]

The server header value to be set in the HTTP header Permissions-Policy. If not specified, or if set to one of the reserved values enabled, yes, true, on, the Permissions-Policy header will be set to the default value (accelerometer=(), camera=(), geolocation=(), gyroscope=(), magnetometer=(), microphone=(), payment=(), usb=()). If set to one of the reserved values disabled, no, none, false, off or to an empty string, the Permissions-Policy header will not be included in responses.

use_legacy_dashboard bool

Whether to use the legacy dashboard. If set to True, the dashboard will be used with the old UI. If set to False, the new dashboard will be used.

server_name str

The name of the ZenML server. Used only during initial deployment. Can be changed later as a part of the server settings.

display_announcements bool

Whether to display announcements about ZenML in the dashboard. Used only during initial deployment. Can be changed later as a part of the server settings.

display_updates bool

Whether to display notifications about ZenML updates in the dashboard. Used only during initial deployment. Can be changed later as a part of the server settings.

auto_activate bool

Whether to automatically activate the server and create a default admin user account with an empty password during the initial deployment.

Source code in zenml/config/server_config.py
class ServerConfiguration(BaseModel):
    """ZenML Server configuration attributes.

    Attributes:
        deployment_type: The type of ZenML server deployment that is running.
        server_url: The URL where the ZenML server API is reachable. Must be
            configured for features that involve triggering workloads from the
            ZenML dashboard (e.g., running pipelines). If not specified, the
            clients will use the same URL used to connect them to the ZenML
            server.
        dashboard_url: The URL where the ZenML dashboard is reachable.
            If not specified, the `server_url` value is used. This should be
            configured if the dashboard is served from a different URL than the
            ZenML server.
        root_url_path: The root URL path for the ZenML API and dashboard.
        auth_scheme: The authentication scheme used by the ZenML server.
        jwt_token_algorithm: The algorithm used to sign and verify JWT tokens.
        jwt_token_issuer: The issuer of the JWT tokens. If not specified, the
            issuer is set to the ZenML Server ID.
        jwt_token_audience: The audience of the JWT tokens. If not specified,
            the audience is set to the ZenML Server ID.
        jwt_token_leeway_seconds: The leeway in seconds allowed when verifying
            the expiration time of JWT tokens.
        jwt_token_expire_minutes: The expiration time of JWT tokens in minutes.
            If not specified, generated JWT tokens will not be set to expire.
        jwt_secret_key: The secret key used to sign and verify JWT tokens. If
            not specified, a random secret key is generated.
        auth_cookie_name: The name of the http-only cookie used to store the JWT
            token. If not specified, the cookie name is set to a value computed
            from the ZenML server ID.
        auth_cookie_domain: The domain of the http-only cookie used to store the
            JWT token. If not specified, the cookie will be valid for the
            domain where the ZenML server is running.
        cors_allow_origins: The origins allowed to make cross-origin requests
            to the ZenML server. If not specified, all origins are allowed.
        max_failed_device_auth_attempts: The maximum number of failed OAuth 2.0
            device authentication attempts before the device is locked.
        device_auth_timeout: The timeout in seconds after which a pending OAuth
            2.0 device authorization request expires.
        device_auth_polling_interval: The polling interval in seconds used to
            poll the OAuth 2.0 device authorization endpoint.
        device_expiration_minutes: The time in minutes that an OAuth 2.0 device is
            allowed to be used to authenticate with the ZenML server. If not
            set or if `jwt_token_expire_minutes` is not set, the devices are
            allowed to be used indefinitely. This controls the expiration time
            of the JWT tokens issued to clients after they have authenticated
            with the ZenML server using an OAuth 2.0 device.
        trusted_device_expiration_minutes: The time in minutes that a trusted OAuth 2.0
            device is allowed to be used to authenticate with the ZenML server.
            If not set or if `jwt_token_expire_minutes` is not set, the devices
            are allowed to be used indefinitely. This controls the expiration
            time of the JWT tokens issued to clients after they have
            authenticated with the ZenML server using an OAuth 2.0 device
            that has been marked as trusted.
        external_login_url: The login URL of an external authenticator service
            to use with the `EXTERNAL` authentication scheme.
        external_user_info_url: The user info URL of an external authenticator
            service to use with the `EXTERNAL` authentication scheme.
        external_cookie_name: The name of the http-only cookie used to store the
            bearer token used to authenticate with the external authenticator
            service. Must be specified if the `EXTERNAL` authentication scheme
            is used.
        external_server_id: The ID of the ZenML server to use with the
            `EXTERNAL` authentication scheme. If not specified, the regular
            ZenML server ID is used.
        metadata: Additional metadata to be associated with the ZenML server.
        rbac_implementation_source: Source pointing to a class implementing
            the RBAC interface defined by
            `zenml.zen_server.rbac_interface.RBACInterface`. If not specified,
            RBAC will not be enabled for this server.
        feature_gate_implementation_source: Source pointing to a class
            implementing the feature gate interface defined by
            `zenml.zen_server.feature_gate.feature_gate_interface.FeatureGateInterface`.
            If not specified, feature usage will not be gated/tracked for this
            server.
        workload_manager_implementation_source: Source pointing to a class
            implementing the workload management interface.
        pipeline_run_auth_window: The default time window in minutes for which
            a pipeline run action is allowed to authenticate with the ZenML
            server.
        login_rate_limit_minute: The number of login attempts allowed per minute.
        login_rate_limit_day: The number of login attempts allowed per day.
        secure_headers_server: Custom value to be set in the `Server` HTTP
            header to identify the server. If not specified, or if set to one of
            the reserved values `enabled`, `yes`, `true`, `on`, the `Server`
            header will be set to the default value (ZenML server ID). If set to
            one of the reserved values `disabled`, `no`, `none`, `false`, `off`
            or to an empty string, the `Server` header will not be included in
            responses.
        secure_headers_hsts: The server header value to be set in the HTTP
            header `Strict-Transport-Security`. If not specified, or if set to
            one of the reserved values `enabled`, `yes`, `true`, `on`, the
            `Strict-Transport-Security` header will be set to the default value
            (`max-age=63072000; includeSubdomains`). If set to one of
            the reserved values `disabled`, `no`, `none`, `false`, `off` or to
            an empty string, the `Strict-Transport-Security` header will not be
            included in responses.
        secure_headers_xfo: The server header value to be set in the HTTP
            header `X-Frame-Options`. If not specified, or if set to one of the
            reserved values `enabled`, `yes`, `true`, `on`, the `X-Frame-Options`
            header will be set to the default value (`SAMEORIGIN`). If set to
            one of the reserved values `disabled`, `no`, `none`, `false`, `off`
            or to an empty string, the `X-Frame-Options` header will not be
            included in responses.
        secure_headers_xxp: The server header value to be set in the HTTP
            header `X-XSS-Protection`. If not specified, or if set to one of the
            reserved values `enabled`, `yes`, `true`, `on`, the `X-XSS-Protection`
            header will be set to the default value (`0`). If set to one of the
            reserved values `disabled`, `no`, `none`, `false`, `off` or
            to an empty string, the `X-XSS-Protection` header will not be
            included in responses. NOTE: this header is deprecated and should
            always be set to `0`. The `Content-Security-Policy` header should be
            used instead.
        secure_headers_content: The server header value to be set in the HTTP
            header `X-Content-Type-Options`. If not specified, or if set to one
            of the reserved values `enabled`, `yes`, `true`, `on`, the
            `X-Content-Type-Options` header will be set to the default value
            (`nosniff`). If set to one of the reserved values `disabled`, `no`,
            `none`, `false`, `off` or to an empty string, the
            `X-Content-Type-Options` header will not be included in responses.
        secure_headers_csp: The server header value to be set in the HTTP
            header `Content-Security-Policy`. If not specified, or if set to one
            of the reserved values `enabled`, `yes`, `true`, `on`, the
            `Content-Security-Policy` header will be set to a default value
            that is compatible with the ZenML dashboard. If set to one of the
            reserved values `disabled`, `no`, `none`, `false`, `off` or to an
            empty string, the `Content-Security-Policy` header will not be
            included in responses.
        secure_headers_referrer: The server header value to be set in the HTTP
            header `Referrer-Policy`. If not specified, or if set to one of the
            reserved values `enabled`, `yes`, `true`, `on`, the `Referrer-Policy`
            header will be set to the default value
            (`no-referrer-when-downgrade`). If set to one of the reserved values
            `disabled`, `no`, `none`, `false`, `off` or to an empty string, the
            `Referrer-Policy` header will not be included in responses.
        secure_headers_cache: The server header value to be set in the HTTP
            header `Cache-Control`. If not specified, or if set to one of the
            reserved values `enabled`, `yes`, `true`, `on`, the `Cache-Control`
            header will be set to the default value
            (`no-store, no-cache, must-revalidate`). If set to one of the
            reserved values `disabled`, `no`, `none`, `false`, `off` or to an
            empty string, the `Cache-Control` header will not be included in
            responses.
        secure_headers_permissions: The server header value to be set in the
            HTTP header `Permissions-Policy`. If not specified, or if set to one
            of the reserved values `enabled`, `yes`, `true`, `on`, the
            `Permissions-Policy` header will be set to the default value
            (`accelerometer=(), camera=(), geolocation=(), gyroscope=(),
            magnetometer=(), microphone=(), payment=(), usb=()`). If set to
            one of the reserved values `disabled`, `no`, `none`, `false`, `off`
            or to an empty string, the `Permissions-Policy` header will not be
            included in responses.
        use_legacy_dashboard: Whether to use the legacy dashboard. If set to
            `True`, the dashboard will be used with the old UI. If set to
            `False`, the new dashboard will be used.
        server_name: The name of the ZenML server. Used only during initial
            deployment. Can be changed later as a part of the server settings.
        display_announcements: Whether to display announcements about ZenML in
            the dashboard. Used only during initial deployment. Can be changed
            later as a part of the server settings.
        display_updates: Whether to display notifications about ZenML updates in
            the dashboard. Used only during initial deployment. Can be changed
            later as a part of the server settings.
        auto_activate: Whether to automatically activate the server and create a
            default admin user account with an empty password during the initial
            deployment.
    """

    deployment_type: ServerDeploymentType = ServerDeploymentType.OTHER
    server_url: Optional[str] = None
    dashboard_url: Optional[str] = None
    root_url_path: str = ""
    metadata: Dict[str, Any] = {}
    auth_scheme: AuthScheme = AuthScheme.OAUTH2_PASSWORD_BEARER
    jwt_token_algorithm: str = DEFAULT_ZENML_JWT_TOKEN_ALGORITHM
    jwt_token_issuer: Optional[str] = None
    jwt_token_audience: Optional[str] = None
    jwt_token_leeway_seconds: int = DEFAULT_ZENML_JWT_TOKEN_LEEWAY
    jwt_token_expire_minutes: Optional[int] = None
    jwt_secret_key: str = Field(default_factory=generate_jwt_secret_key)
    auth_cookie_name: Optional[str] = None
    auth_cookie_domain: Optional[str] = None
    cors_allow_origins: Optional[List[str]] = None
    max_failed_device_auth_attempts: int = (
        DEFAULT_ZENML_SERVER_MAX_DEVICE_AUTH_ATTEMPTS
    )
    device_auth_timeout: int = DEFAULT_ZENML_SERVER_DEVICE_AUTH_TIMEOUT
    device_auth_polling_interval: int = (
        DEFAULT_ZENML_SERVER_DEVICE_AUTH_POLLING
    )
    device_expiration_minutes: Optional[int] = None
    trusted_device_expiration_minutes: Optional[int] = None

    external_login_url: Optional[str] = None
    external_user_info_url: Optional[str] = None
    external_cookie_name: Optional[str] = None
    external_server_id: Optional[UUID] = None

    rbac_implementation_source: Optional[str] = None
    feature_gate_implementation_source: Optional[str] = None
    workload_manager_implementation_source: Optional[str] = None
    pipeline_run_auth_window: int = (
        DEFAULT_ZENML_SERVER_PIPELINE_RUN_AUTH_WINDOW
    )

    rate_limit_enabled: bool = False
    login_rate_limit_minute: int = DEFAULT_ZENML_SERVER_LOGIN_RATE_LIMIT_MINUTE
    login_rate_limit_day: int = DEFAULT_ZENML_SERVER_LOGIN_RATE_LIMIT_DAY

    secure_headers_server: Union[bool, str] = True
    secure_headers_hsts: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_HSTS
    )
    secure_headers_xfo: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_XFO
    )
    secure_headers_xxp: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_XXP
    )
    secure_headers_content: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_CONTENT
    )
    secure_headers_csp: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_CSP
    )
    secure_headers_referrer: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_REFERRER
    )
    secure_headers_cache: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_CACHE
    )
    secure_headers_permissions: Union[bool, str] = (
        DEFAULT_ZENML_SERVER_SECURE_HEADERS_PERMISSIONS
    )
    use_legacy_dashboard: bool = DEFAULT_ZENML_SERVER_USE_LEGACY_DASHBOARD

    server_name: str = DEFAULT_ZENML_SERVER_NAME
    display_announcements: bool = True
    display_updates: bool = True
    auto_activate: bool = False

    _deployment_id: Optional[UUID] = None

    @root_validator(pre=True)
    def _validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate the server configuration.

        Args:
            values: The server configuration values.

        Returns:
            The validated server configuration values.

        Raises:
            ValueError: If the server configuration is invalid.
        """
        if values.get("auth_scheme") == AuthScheme.EXTERNAL:
            # If the authentication scheme is set to `EXTERNAL`, the
            # external authenticator URLs must be specified.
            if not values.get("external_login_url") or not values.get(
                "external_user_info_url"
            ):
                raise ValueError(
                    "The external login and user info authenticator "
                    "URLs must be specified when using the EXTERNAL "
                    "authentication scheme."
                )

            # If the authentication scheme is set to `EXTERNAL`, the
            # external cookie name must be specified.
            if not values.get("external_cookie_name"):
                raise ValueError(
                    "The external cookie name must be specified when "
                    "using the EXTERNAL authentication scheme."
                )

        if cors_allow_origins := values.get("cors_allow_origins"):
            origins = cors_allow_origins.split(",")
            values["cors_allow_origins"] = origins
        else:
            values["cors_allow_origins"] = ["*"]

        # if metadata is a string, convert it to a dictionary
        if isinstance(values.get("metadata"), str):
            try:
                values["metadata"] = json.loads(values["metadata"])
            except json.JSONDecodeError as e:
                raise ValueError(
                    f"The server metadata is not a valid JSON string: {e}"
                )

        # if one of the secure headers options is set to a boolean value, set
        # the corresponding value
        for k, v in values.copy().items():
            if k.startswith("secure_headers_") and isinstance(v, str):
                if v.lower() in ["disabled", "no", "none", "false", "off", ""]:
                    values[k] = False
                if v.lower() in ["enabled", "yes", "true", "on"]:
                    # Revert to the default value if the header is enabled
                    del values[k]

        return values

    @property
    def deployment_id(self) -> UUID:
        """Get the ZenML server deployment ID.

        Returns:
            The ZenML server deployment ID.
        """
        from zenml.config.global_config import GlobalConfiguration

        if self._deployment_id:
            return self._deployment_id

        self._deployment_id = (
            GlobalConfiguration().zen_store.get_deployment_id()
        )

        return self._deployment_id

    @property
    def rbac_enabled(self) -> bool:
        """Whether RBAC is enabled on the server or not.

        Returns:
            Whether RBAC is enabled on the server or not.
        """
        return self.rbac_implementation_source is not None

    @property
    def feature_gate_enabled(self) -> bool:
        """Whether feature gating is enabled on the server or not.

        Returns:
            Whether feature gating is enabled on the server or not.
        """
        return self.feature_gate_implementation_source is not None

    @property
    def workload_manager_enabled(self) -> bool:
        """Whether workload management is enabled on the server or not.

        Returns:
            Whether workload management is enabled on the server or not.
        """
        return self.workload_manager_implementation_source is not None

    def get_jwt_token_issuer(self) -> str:
        """Get the JWT token issuer.

        If not configured, the issuer is set to the ZenML Server ID.

        Returns:
            The JWT token issuer.
        """
        if self.jwt_token_issuer:
            return self.jwt_token_issuer

        self.jwt_token_issuer = str(self.deployment_id)

        return self.jwt_token_issuer

    def get_jwt_token_audience(self) -> str:
        """Get the JWT token audience.

        If not configured, the audience is set to the ZenML Server ID.

        Returns:
            The JWT token audience.
        """
        if self.jwt_token_audience:
            return self.jwt_token_audience

        self.jwt_token_audience = str(self.deployment_id)

        return self.jwt_token_audience

    def get_auth_cookie_name(self) -> str:
        """Get the authentication cookie name.

        If not configured, the cookie name is set to a value computed from the
        ZenML server ID.

        Returns:
            The authentication cookie name.
        """
        if self.auth_cookie_name:
            return self.auth_cookie_name

        self.auth_cookie_name = f"zenml-server-{self.deployment_id}"

        return self.auth_cookie_name

    def get_external_server_id(self) -> UUID:
        """Get the external server ID.

        If not configured, the regular ZenML server ID is used.

        Returns:
            The external server ID.
        """
        if self.external_server_id:
            return self.external_server_id

        self.external_server_id = self.deployment_id

        return self.external_server_id

    @classmethod
    def get_server_config(cls) -> "ServerConfiguration":
        """Get the server configuration.

        Returns:
            The server configuration.
        """
        env_server_config: Dict[str, Any] = {}
        for k, v in os.environ.items():
            if v == "":
                continue
            if k.startswith(ENV_ZENML_SERVER_PREFIX):
                env_server_config[
                    k[len(ENV_ZENML_SERVER_PREFIX) :].lower()
                ] = v

        return ServerConfiguration(**env_server_config)

    class Config:
        """Pydantic configuration class."""

        # Allow extra attributes from configs of previous ZenML versions to
        # permit downgrading
        extra = "allow"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True

        # This is needed to allow correct handling of SecretStr values during
        # serialization.
        json_encoders = {
            SecretStr: lambda v: v.get_secret_value() if v else None
        }
deployment_id: UUID property readonly

Get the ZenML server deployment ID.

Returns:

Type Description
UUID

The ZenML server deployment ID.

feature_gate_enabled: bool property readonly

Whether feature gating is enabled on the server or not.

Returns:

Type Description
bool

Whether feature gating is enabled on the server or not.

rbac_enabled: bool property readonly

Whether RBAC is enabled on the server or not.

Returns:

Type Description
bool

Whether RBAC is enabled on the server or not.

workload_manager_enabled: bool property readonly

Whether workload management is enabled on the server or not.

Returns:

Type Description
bool

Whether workload management is enabled on the server or not.

Config

Pydantic configuration class.

Source code in zenml/config/server_config.py
class Config:
    """Pydantic configuration class."""

    # Allow extra attributes from configs of previous ZenML versions to
    # permit downgrading
    extra = "allow"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True

    # This is needed to allow correct handling of SecretStr values during
    # serialization.
    json_encoders = {
        SecretStr: lambda v: v.get_secret_value() if v else None
    }
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

Get the authentication cookie name.

If not configured, the cookie name is set to a value computed from the ZenML server ID.

Returns:

Type Description
str

The authentication cookie name.

Source code in zenml/config/server_config.py
def get_auth_cookie_name(self) -> str:
    """Get the authentication cookie name.

    If not configured, the cookie name is set to a value computed from the
    ZenML server ID.

    Returns:
        The authentication cookie name.
    """
    if self.auth_cookie_name:
        return self.auth_cookie_name

    self.auth_cookie_name = f"zenml-server-{self.deployment_id}"

    return self.auth_cookie_name
get_external_server_id(self)

Get the external server ID.

If not configured, the regular ZenML server ID is used.

Returns:

Type Description
UUID

The external server ID.

Source code in zenml/config/server_config.py
def get_external_server_id(self) -> UUID:
    """Get the external server ID.

    If not configured, the regular ZenML server ID is used.

    Returns:
        The external server ID.
    """
    if self.external_server_id:
        return self.external_server_id

    self.external_server_id = self.deployment_id

    return self.external_server_id
get_jwt_token_audience(self)

Get the JWT token audience.

If not configured, the audience is set to the ZenML Server ID.

Returns:

Type Description
str

The JWT token audience.

Source code in zenml/config/server_config.py
def get_jwt_token_audience(self) -> str:
    """Get the JWT token audience.

    If not configured, the audience is set to the ZenML Server ID.

    Returns:
        The JWT token audience.
    """
    if self.jwt_token_audience:
        return self.jwt_token_audience

    self.jwt_token_audience = str(self.deployment_id)

    return self.jwt_token_audience
get_jwt_token_issuer(self)

Get the JWT token issuer.

If not configured, the issuer is set to the ZenML Server ID.

Returns:

Type Description
str

The JWT token issuer.

Source code in zenml/config/server_config.py
def get_jwt_token_issuer(self) -> str:
    """Get the JWT token issuer.

    If not configured, the issuer is set to the ZenML Server ID.

    Returns:
        The JWT token issuer.
    """
    if self.jwt_token_issuer:
        return self.jwt_token_issuer

    self.jwt_token_issuer = str(self.deployment_id)

    return self.jwt_token_issuer
get_server_config() classmethod

Get the server configuration.

Returns:

Type Description
ServerConfiguration

The server configuration.

Source code in zenml/config/server_config.py
@classmethod
def get_server_config(cls) -> "ServerConfiguration":
    """Get the server configuration.

    Returns:
        The server configuration.
    """
    env_server_config: Dict[str, Any] = {}
    for k, v in os.environ.items():
        if v == "":
            continue
        if k.startswith(ENV_ZENML_SERVER_PREFIX):
            env_server_config[
                k[len(ENV_ZENML_SERVER_PREFIX) :].lower()
            ] = v

    return ServerConfiguration(**env_server_config)

generate_jwt_secret_key()

Generate a random JWT secret key.

This key is used to sign and verify generated JWT tokens.

Returns:

Type Description
str

A random JWT secret key.

Source code in zenml/config/server_config.py
def generate_jwt_secret_key() -> str:
    """Generate a random JWT secret key.

    This key is used to sign and verify generated JWT tokens.

    Returns:
        A random JWT secret key.
    """
    return token_hex(32)

settings_resolver

Class for resolving settings.

SettingsResolver

Class for resolving settings.

This class converts a BaseSettings instance to the correct subclass depending on the key for which these settings were specified.

Source code in zenml/config/settings_resolver.py
class SettingsResolver:
    """Class for resolving settings.

    This class converts a `BaseSettings` instance to the correct subclass
    depending on the key for which these settings were specified.
    """

    def __init__(self, key: str, settings: "BaseSettings"):
        """Checks if the settings key is valid.

        Args:
            key: Settings key.
            settings: The settings.

        Raises:
            ValueError: If the settings key is invalid.
        """
        if not settings_utils.is_valid_setting_key(key):
            raise ValueError(
                f"Invalid setting key `{key}`. Setting keys can either refer "
                "to general settings (available keys: "
                f"{set(settings_utils.get_general_settings())}) or stack "
                "component specific settings. Stack component specific keys "
                "are of the format "
                "`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
            )

        self._key = key
        self._settings = settings

    def resolve(self, stack: "Stack") -> "BaseSettings":
        """Resolves settings for the given stack.

        Args:
            stack: The stack for which to resolve the settings.

        Returns:
            The resolved settings.
        """
        if settings_utils.is_general_setting_key(self._key):
            target_class = self._resolve_general_settings_class()
        else:
            target_class = self._resolve_stack_component_setting_class(
                stack=stack
            )

        return self._convert_settings(target_class=target_class)

    def _resolve_general_settings_class(
        self,
    ) -> Type["BaseSettings"]:
        """Resolves general settings.

        Returns:
            The resolved settings.
        """
        return settings_utils.get_general_settings()[self._key]

    def _resolve_stack_component_setting_class(
        self, stack: "Stack"
    ) -> Type["BaseSettings"]:
        """Resolves stack component settings with the given stack.

        Args:
            stack: The stack to use for resolving.

        Raises:
            KeyError: If the stack contains no settings for the key.

        Returns:
            The resolved settings.
        """
        settings_class = stack.setting_classes.get(self._key)
        if not settings_class:
            raise KeyError(
                f"Failed to resolve settings for key {self._key}: "
                "No settings for this key exist in the stack. "
                "Available settings: "
                f"{set(stack.setting_classes)}"
            )

        return settings_class

    def _convert_settings(self, target_class: Type["T"]) -> "T":
        """Converts the settings to their correct class.

        Args:
            target_class: The correct settings class.

        Raises:
            SettingsResolvingError: If the conversion failed.

        Returns:
            The converted settings.
        """
        settings_dict = self._settings.dict()
        try:
            return target_class(**settings_dict)
        except ValidationError:
            raise SettingsResolvingError(
                f"Failed to convert settings `{settings_dict}` to expected "
                f"class {target_class}."
            )
__init__(self, key, settings) special

Checks if the settings key is valid.

Parameters:

Name Type Description Default
key str

Settings key.

required
settings BaseSettings

The settings.

required

Exceptions:

Type Description
ValueError

If the settings key is invalid.

Source code in zenml/config/settings_resolver.py
def __init__(self, key: str, settings: "BaseSettings"):
    """Checks if the settings key is valid.

    Args:
        key: Settings key.
        settings: The settings.

    Raises:
        ValueError: If the settings key is invalid.
    """
    if not settings_utils.is_valid_setting_key(key):
        raise ValueError(
            f"Invalid setting key `{key}`. Setting keys can either refer "
            "to general settings (available keys: "
            f"{set(settings_utils.get_general_settings())}) or stack "
            "component specific settings. Stack component specific keys "
            "are of the format "
            "`<STACK_COMPONENT_TYPE>.<STACK_COMPONENT_FLAVOR>`."
        )

    self._key = key
    self._settings = settings
resolve(self, stack)

Resolves settings for the given stack.

Parameters:

Name Type Description Default
stack Stack

The stack for which to resolve the settings.

required

Returns:

Type Description
BaseSettings

The resolved settings.

Source code in zenml/config/settings_resolver.py
def resolve(self, stack: "Stack") -> "BaseSettings":
    """Resolves settings for the given stack.

    Args:
        stack: The stack for which to resolve the settings.

    Returns:
        The resolved settings.
    """
    if settings_utils.is_general_setting_key(self._key):
        target_class = self._resolve_general_settings_class()
    else:
        target_class = self._resolve_stack_component_setting_class(
            stack=stack
        )

    return self._convert_settings(target_class=target_class)

source

Source classes.

CodeRepositorySource (Source) pydantic-model

Source representing an object from a code repository.

Attributes:

Name Type Description
repository_id UUID

The code repository ID.

commit str

The commit.

subdirectory str

The subdirectory of the source root inside the code repository.

Source code in zenml/config/source.py
class CodeRepositorySource(Source):
    """Source representing an object from a code repository.

    Attributes:
        repository_id: The code repository ID.
        commit: The commit.
        subdirectory: The subdirectory of the source root inside the code
            repository.
    """

    repository_id: UUID
    commit: str
    subdirectory: str
    type: SourceType = SourceType.CODE_REPOSITORY

    @validator("type")
    def _validate_type(cls, value: SourceType) -> SourceType:
        """Validate the source type.

        Args:
            value: The source type.

        Raises:
            ValueError: If the source type is not `CODE_REPOSITORY`.

        Returns:
            The source type.
        """
        if value != SourceType.CODE_REPOSITORY:
            raise ValueError("Invalid source type.")

        return value

DistributionPackageSource (Source) pydantic-model

Source representing an object from a distribution package.

Attributes:

Name Type Description
package_name str

Name of the package.

version Optional[str]

The package version.

Source code in zenml/config/source.py
class DistributionPackageSource(Source):
    """Source representing an object from a distribution package.

    Attributes:
        package_name: Name of the package.
        version: The package version.
    """

    package_name: str
    version: Optional[str] = None
    type: SourceType = SourceType.DISTRIBUTION_PACKAGE

    @validator("type")
    def _validate_type(cls, value: SourceType) -> SourceType:
        """Validate the source type.

        Args:
            value: The source type.

        Raises:
            ValueError: If the source type is not `DISTRIBUTION_PACKAGE`.

        Returns:
            The source type.
        """
        if value != SourceType.DISTRIBUTION_PACKAGE:
            raise ValueError("Invalid source type.")

        return value

Source (BaseModel) pydantic-model

Source specification.

A source specifies a module name as well as an optional attribute of that module. These values can be used to import the module and get the value of the attribute inside the module.

Examples:

The source Source(module="zenml.config.source", attribute="Source") references the class that this docstring is describing. This class is defined in the zenml.config.source module and the name of the attribute is the class name Source.

Attributes:

Name Type Description
module str

The module name.

attribute Optional[str]

Optional name of the attribute inside the module.

type SourceType

The type of the source.

Source code in zenml/config/source.py
class Source(BaseModel):
    """Source specification.

    A source specifies a module name as well as an optional attribute of that
    module. These values can be used to import the module and get the value
    of the attribute inside the module.

    Example:
        The source `Source(module="zenml.config.source", attribute="Source")`
        references the class that this docstring is describing. This class is
        defined in the `zenml.config.source` module and the name of the
        attribute is the class name `Source`.

    Attributes:
        module: The module name.
        attribute: Optional name of the attribute inside the module.
        type: The type of the source.
    """

    module: str
    attribute: Optional[str] = None
    type: SourceType

    @classmethod
    def from_import_path(
        cls, import_path: str, is_module_path: bool = False
    ) -> "Source":
        """Creates a source from an import path.

        Args:
            import_path: The import path.
            is_module_path: If the import path points to a module or not.

        Raises:
            ValueError: If the import path is empty.

        Returns:
            The source.
        """
        if not import_path:
            raise ValueError(
                "Invalid empty import path. The import path needs to refer "
                "to a Python module and an optional attribute of that module."
            )

        # Remove internal version pins for backwards compatibility
        if "@" in import_path:
            import_path = import_path.split("@", 1)[0]

        if is_module_path or "." not in import_path:
            module = import_path
            attribute = None
        else:
            module, attribute = import_path.rsplit(".", maxsplit=1)

        return Source(
            module=module, attribute=attribute, type=SourceType.UNKNOWN
        )

    @property
    def import_path(self) -> str:
        """The import path of the source.

        Returns:
            The import path of the source.
        """
        if self.attribute:
            return f"{self.module}.{self.attribute}"
        else:
            return self.module

    @property
    def is_internal(self) -> bool:
        """If the source is internal (=from the zenml package).

        Returns:
            True if the source is internal, False otherwise
        """
        if self.type not in {SourceType.UNKNOWN, SourceType.INTERNAL}:
            return False

        return self.module.split(".", maxsplit=1)[0] == "zenml"

    @property
    def is_module_source(self) -> bool:
        """If the source is a module source.

        Returns:
            If the source is a module source.
        """
        return self.attribute is None

    class Config:
        """Pydantic config class."""

        extra = Extra.allow
import_path: str property readonly

The import path of the source.

Returns:

Type Description
str

The import path of the source.

is_internal: bool property readonly

If the source is internal (=from the zenml package).

Returns:

Type Description
bool

True if the source is internal, False otherwise

is_module_source: bool property readonly

If the source is a module source.

Returns:

Type Description
bool

If the source is a module source.

Config

Pydantic config class.

Source code in zenml/config/source.py
class Config:
    """Pydantic config class."""

    extra = Extra.allow
from_import_path(import_path, is_module_path=False) classmethod

Creates a source from an import path.

Parameters:

Name Type Description Default
import_path str

The import path.

required
is_module_path bool

If the import path points to a module or not.

False

Exceptions:

Type Description
ValueError

If the import path is empty.

Returns:

Type Description
Source

The source.

Source code in zenml/config/source.py
@classmethod
def from_import_path(
    cls, import_path: str, is_module_path: bool = False
) -> "Source":
    """Creates a source from an import path.

    Args:
        import_path: The import path.
        is_module_path: If the import path points to a module or not.

    Raises:
        ValueError: If the import path is empty.

    Returns:
        The source.
    """
    if not import_path:
        raise ValueError(
            "Invalid empty import path. The import path needs to refer "
            "to a Python module and an optional attribute of that module."
        )

    # Remove internal version pins for backwards compatibility
    if "@" in import_path:
        import_path = import_path.split("@", 1)[0]

    if is_module_path or "." not in import_path:
        module = import_path
        attribute = None
    else:
        module, attribute = import_path.rsplit(".", maxsplit=1)

    return Source(
        module=module, attribute=attribute, type=SourceType.UNKNOWN
    )

SourceType (Enum)

Enum representing different types of sources.

Source code in zenml/config/source.py
class SourceType(Enum):
    """Enum representing different types of sources."""

    USER = "user"
    BUILTIN = "builtin"
    INTERNAL = "internal"
    DISTRIBUTION_PACKAGE = "distribution_package"
    CODE_REPOSITORY = "code_repository"
    UNKNOWN = "unknown"

convert_source_validator(*attributes)

Function to convert pydantic fields containing legacy class paths.

In older versions, sources (sometimes also called class paths) like zenml.materializers.BuiltInMaterializer were stored as strings in our configuration classes. These strings got replaced by a separate class, and this function returns a validator to convert those old strings to the new classes.

Parameters:

Name Type Description Default
*attributes str

List of attributes to convert.

()

Returns:

Type Description
AnyClassMethod

Pydantic validator class method to be used on BaseModel subclasses to convert source fields.

Source code in zenml/config/source.py
def convert_source_validator(*attributes: str) -> "AnyClassMethod":
    """Function to convert pydantic fields containing legacy class paths.

    In older versions, sources (sometimes also called class paths) like
    `zenml.materializers.BuiltInMaterializer` were stored as strings in our
    configuration classes. These strings got replaced by a separate class, and
    this function returns a validator to convert those old strings to the new
    classes.

    Args:
        *attributes: List of attributes to convert.

    Returns:
        Pydantic validator class method to be used on BaseModel subclasses
        to convert source fields.
    """

    @validator(*attributes, pre=True, allow_reuse=True)
    def _convert_source(
        cls: Type[BaseModel], value: Union[Source, str, None]
    ) -> Optional[Source]:
        """Converts an old source string to a source object.

        Args:
            cls: The class on which the attributes are defined.
            value: Source string or object.

        Returns:
            The converted source.
        """
        if isinstance(value, str):
            value = Source.from_import_path(value)

        return value

    return _convert_source

step_configurations

Pipeline configuration classes.

ArtifactConfiguration (PartialArtifactConfiguration) pydantic-model

Class representing a complete input/output artifact configuration.

Source code in zenml/config/step_configurations.py
class ArtifactConfiguration(PartialArtifactConfiguration):
    """Class representing a complete input/output artifact configuration."""

    materializer_source: Tuple[Source, ...]

    @validator("materializer_source", pre=True)
    def _convert_source(
        cls, value: Union[Source, Dict[str, Any], str, Tuple[Source, ...]]
    ) -> Tuple[Source, ...]:
        """Converts old source strings to tuples of source objects.

        Args:
            value: Source string or object.

        Returns:
            The converted source.
        """
        if isinstance(value, str):
            value = (Source.from_import_path(value),)
        elif isinstance(value, dict):
            value = (Source.parse_obj(value),)
        elif isinstance(value, Source):
            value = (value,)

        return value

InputSpec (StrictBaseModel) pydantic-model

Step input specification.

Source code in zenml/config/step_configurations.py
class InputSpec(StrictBaseModel):
    """Step input specification."""

    step_name: str
    output_name: str

PartialArtifactConfiguration (StrictBaseModel) pydantic-model

Class representing a partial input/output artifact configuration.

Source code in zenml/config/step_configurations.py
class PartialArtifactConfiguration(StrictBaseModel):
    """Class representing a partial input/output artifact configuration."""

    materializer_source: Optional[Tuple[Source, ...]] = None
    # TODO: This could be moved to the `PipelineDeployment` as it's the same
    # for all steps/outputs
    default_materializer_source: Optional[Source] = None

    @root_validator(pre=True)
    def _remove_deprecated_attributes(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Removes deprecated attributes from the values dict.

        Args:
            values: The values dict used to instantiate the model.

        Returns:
            The values dict without deprecated attributes.
        """
        deprecated_attributes = ["artifact_source"]
        for deprecated_attribute in deprecated_attributes:
            if deprecated_attribute in values:
                values.pop(deprecated_attribute)
        return values

    @validator("materializer_source", pre=True)
    def _convert_source(
        cls,
        value: Union[None, Source, Dict[str, Any], str, Tuple[Source, ...]],
    ) -> Optional[Tuple[Source, ...]]:
        """Converts old source strings to tuples of source objects.

        Args:
            value: Source string or object.

        Returns:
            The converted source.
        """
        if isinstance(value, str):
            value = (Source.from_import_path(value),)
        elif isinstance(value, dict):
            value = (Source.parse_obj(value),)
        elif isinstance(value, Source):
            value = (value,)

        return value

PartialStepConfiguration (StepConfigurationUpdate) pydantic-model

Class representing a partial step configuration.

Source code in zenml/config/step_configurations.py
class PartialStepConfiguration(StepConfigurationUpdate):
    """Class representing a partial step configuration."""

    name: str
    caching_parameters: Mapping[str, Any] = {}
    external_input_artifacts: Mapping[str, ExternalArtifactConfiguration] = {}
    model_artifacts_or_metadata: Mapping[str, ModelVersionDataLazyLoader] = {}
    client_lazy_loaders: Mapping[str, ClientLazyLoader] = {}
    outputs: Mapping[str, PartialArtifactConfiguration] = {}

    # Override the deprecation validator as we do not want to deprecate the
    # `name`` attribute on this class.
    _deprecation_validator = deprecation_utils.deprecate_pydantic_attributes()

    @root_validator(pre=True)
    def _remove_deprecated_attributes(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Removes deprecated attributes from the values dict.

        Args:
            values: The values dict used to instantiate the model.

        Returns:
            The values dict without deprecated attributes.
        """
        deprecated_attributes = ["docstring", "inputs"]
        for deprecated_attribute in deprecated_attributes:
            if deprecated_attribute in values:
                values.pop(deprecated_attribute)
        return values

Step (StrictBaseModel) pydantic-model

Class representing a ZenML step.

Source code in zenml/config/step_configurations.py
class Step(StrictBaseModel):
    """Class representing a ZenML step."""

    spec: StepSpec
    config: StepConfiguration

StepConfiguration (PartialStepConfiguration) pydantic-model

Step configuration class.

Source code in zenml/config/step_configurations.py
class StepConfiguration(PartialStepConfiguration):
    """Step configuration class."""

    outputs: Mapping[str, ArtifactConfiguration] = {}

    @property
    def resource_settings(self) -> "ResourceSettings":
        """Resource settings of this step configuration.

        Returns:
            The resource settings of this step configuration.
        """
        from zenml.config import ResourceSettings

        model_or_dict: SettingsOrDict = self.settings.get(
            RESOURCE_SETTINGS_KEY, {}
        )
        return ResourceSettings.parse_obj(model_or_dict)

    @property
    def docker_settings(self) -> "DockerSettings":
        """Docker settings of this step configuration.

        Returns:
            The Docker settings of this step configuration.
        """
        from zenml.config import DockerSettings

        model_or_dict: SettingsOrDict = self.settings.get(
            DOCKER_SETTINGS_KEY, {}
        )
        return DockerSettings.parse_obj(model_or_dict)
docker_settings: DockerSettings property readonly

Docker settings of this step configuration.

Returns:

Type Description
DockerSettings

The Docker settings of this step configuration.

resource_settings: ResourceSettings property readonly

Resource settings of this step configuration.

Returns:

Type Description
ResourceSettings

The resource settings of this step configuration.

StepConfigurationUpdate (StrictBaseModel) pydantic-model

Class for step configuration updates.

Source code in zenml/config/step_configurations.py
class StepConfigurationUpdate(StrictBaseModel):
    """Class for step configuration updates."""

    name: Optional[str] = None
    enable_cache: Optional[bool] = None
    enable_artifact_metadata: Optional[bool] = None
    enable_artifact_visualization: Optional[bool] = None
    enable_step_logs: Optional[bool] = None
    step_operator: Optional[str] = None
    experiment_tracker: Optional[str] = None
    parameters: Dict[str, Any] = {}
    settings: Dict[str, BaseSettings] = {}
    extra: Dict[str, Any] = {}
    failure_hook_source: Optional[Source] = None
    success_hook_source: Optional[Source] = None
    model: Optional[Model] = None
    retry: Optional[StepRetryConfig] = None

    outputs: Mapping[str, PartialArtifactConfiguration] = {}

    _convert_source = convert_source_validator(
        "failure_hook_source", "success_hook_source"
    )
    _deprecation_validator = deprecation_utils.deprecate_pydantic_attributes(
        "name"
    )

StepSpec (StrictBaseModel) pydantic-model

Specification of a pipeline.

Source code in zenml/config/step_configurations.py
class StepSpec(StrictBaseModel):
    """Specification of a pipeline."""

    source: Source
    upstream_steps: List[str]
    inputs: Dict[str, InputSpec] = {}
    # The default value is to ensure compatibility with specs of version <0.2
    pipeline_parameter_name: str = ""

    _convert_source = convert_source_validator("source")

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same step.

        This is the case if the other objects is a `StepSpec` with the same
        `upstream_steps` and a `source` that meets one of the following
        conditions:
            - it is the same as the `source` of this step
            - it refers to the same absolute path as the `source` of this step

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same step.
        """
        if isinstance(other, StepSpec):
            if self.upstream_steps != other.upstream_steps:
                return False

            if self.inputs != other.inputs:
                return False

            if self.pipeline_parameter_name != other.pipeline_parameter_name:
                return False

            return self.source.import_path == other.source.import_path

        return NotImplemented
__eq__(self, other) special

Returns whether the other object is referring to the same step.

This is the case if the other objects is a StepSpec with the same upstream_steps and a source that meets one of the following !!! conditions - it is the same as the source of this step - it refers to the same absolute path as the source of this step

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same step.

Source code in zenml/config/step_configurations.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same step.

    This is the case if the other objects is a `StepSpec` with the same
    `upstream_steps` and a `source` that meets one of the following
    conditions:
        - it is the same as the `source` of this step
        - it refers to the same absolute path as the `source` of this step

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same step.
    """
    if isinstance(other, StepSpec):
        if self.upstream_steps != other.upstream_steps:
            return False

        if self.inputs != other.inputs:
            return False

        if self.pipeline_parameter_name != other.pipeline_parameter_name:
            return False

        return self.source.import_path == other.source.import_path

    return NotImplemented

step_run_info

Step run info.

StepRunInfo (StrictBaseModel) pydantic-model

All information necessary to run a step.

Source code in zenml/config/step_run_info.py
class StepRunInfo(StrictBaseModel):
    """All information necessary to run a step."""

    step_run_id: UUID
    run_id: UUID
    run_name: str
    pipeline_step_name: str

    config: StepConfiguration
    pipeline: PipelineConfiguration

    def get_image(self, key: str) -> str:
        """Gets the Docker image for the given key.

        Args:
            key: The key for which to get the image.

        Raises:
            RuntimeError: If the run does not have an associated build.

        Returns:
            The image name or digest.
        """
        from zenml.client import Client

        run = Client().get_pipeline_run(self.run_id)
        if not run.build:
            raise RuntimeError(
                f"Missing build for run {run.id}. This is probably because "
                "the build was manually deleted."
            )

        return run.build.get_image(
            component_key=key, step=self.pipeline_step_name
        )
get_image(self, key)

Gets the Docker image for the given key.

Parameters:

Name Type Description Default
key str

The key for which to get the image.

required

Exceptions:

Type Description
RuntimeError

If the run does not have an associated build.

Returns:

Type Description
str

The image name or digest.

Source code in zenml/config/step_run_info.py
def get_image(self, key: str) -> str:
    """Gets the Docker image for the given key.

    Args:
        key: The key for which to get the image.

    Raises:
        RuntimeError: If the run does not have an associated build.

    Returns:
        The image name or digest.
    """
    from zenml.client import Client

    run = Client().get_pipeline_run(self.run_id)
    if not run.build:
        raise RuntimeError(
            f"Missing build for run {run.id}. This is probably because "
            "the build was manually deleted."
        )

    return run.build.get_image(
        component_key=key, step=self.pipeline_step_name
    )

store_config

Functionality to support ZenML store configurations.

StoreConfiguration (BaseModel) pydantic-model

Generic store configuration.

The store configurations of concrete store implementations must inherit from this class and validate any extra attributes that are configured in addition to those defined in this class.

Attributes:

Name Type Description
type StoreType

The type of store backend.

url str

The URL of the store backend.

secrets_store Optional[zenml.config.secrets_store_config.SecretsStoreConfiguration]

The configuration of the secrets store to use to store secrets. If not set, secrets management is disabled.

backup_secrets_store Optional[zenml.config.secrets_store_config.SecretsStoreConfiguration]

The configuration of the secrets store to use to store backups of secrets. If not set, backup and restore of secrets are disabled.

Source code in zenml/config/store_config.py
class StoreConfiguration(BaseModel):
    """Generic store configuration.

    The store configurations of concrete store implementations must inherit from
    this class and validate any extra attributes that are configured in addition
    to those defined in this class.

    Attributes:
        type: The type of store backend.
        url: The URL of the store backend.
        secrets_store: The configuration of the secrets store to use to store
            secrets. If not set, secrets management is disabled.
        backup_secrets_store: The configuration of the secrets store to use to
            store backups of secrets. If not set, backup and restore of secrets
            are disabled.
    """

    type: StoreType
    url: str
    secrets_store: Optional[SecretsStoreConfiguration] = None
    backup_secrets_store: Optional[SecretsStoreConfiguration] = None

    @classmethod
    def supports_url_scheme(cls, url: str) -> bool:
        """Check if a URL scheme is supported by this store.

        Concrete store configuration classes should override this method to
        check if a URL scheme is supported by the store.

        Args:
            url: The URL to check.

        Returns:
            True if the URL scheme is supported, False otherwise.
        """
        return True

    @root_validator(pre=True)
    def validate_secrets_store(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate the secrets store configuration.

        Args:
            values: The values of the store configuration.

        Returns:
            The values of the store configuration.
        """
        if values.get("secrets_store") is None:
            return values

        # Remove the legacy REST secrets store configuration since it is no
        # longer supported/needed
        secrets_store = values["secrets_store"]
        if isinstance(secrets_store, dict):
            secrets_store_type = secrets_store.get("type")
            if secrets_store_type == "rest":
                del values["secrets_store"]

        return values

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes to be set in the base class. The concrete
        # classes are responsible for validating the attributes.
        extra = "allow"
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
Config

Pydantic configuration class.

Source code in zenml/config/store_config.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes to be set in the base class. The concrete
    # classes are responsible for validating the attributes.
    extra = "allow"
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
supports_url_scheme(url) classmethod

Check if a URL scheme is supported by this store.

Concrete store configuration classes should override this method to check if a URL scheme is supported by the store.

Parameters:

Name Type Description Default
url str

The URL to check.

required

Returns:

Type Description
bool

True if the URL scheme is supported, False otherwise.

Source code in zenml/config/store_config.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
    """Check if a URL scheme is supported by this store.

    Concrete store configuration classes should override this method to
    check if a URL scheme is supported by the store.

    Args:
        url: The URL to check.

    Returns:
        True if the URL scheme is supported, False otherwise.
    """
    return True
validate_secrets_store(values) classmethod

Validate the secrets store configuration.

Parameters:

Name Type Description Default
values Dict[str, Any]

The values of the store configuration.

required

Returns:

Type Description
Dict[str, Any]

The values of the store configuration.

Source code in zenml/config/store_config.py
@root_validator(pre=True)
def validate_secrets_store(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Validate the secrets store configuration.

    Args:
        values: The values of the store configuration.

    Returns:
        The values of the store configuration.
    """
    if values.get("secrets_store") is None:
        return values

    # Remove the legacy REST secrets store configuration since it is no
    # longer supported/needed
    secrets_store = values["secrets_store"]
    if isinstance(secrets_store, dict):
        secrets_store_type = secrets_store.get("type")
        if secrets_store_type == "rest":
            del values["secrets_store"]

    return values

strict_base_model

Strict immutable pydantic model.

StrictBaseModel (BaseModel) pydantic-model

Immutable pydantic model which prevents extra attributes.

Source code in zenml/config/strict_base_model.py
class StrictBaseModel(BaseModel):
    """Immutable pydantic model which prevents extra attributes."""

    class Config:
        """Pydantic config class."""

        allow_mutation = False
        extra = Extra.forbid
Config

Pydantic config class.

Source code in zenml/config/strict_base_model.py
class Config:
    """Pydantic config class."""

    allow_mutation = False
    extra = Extra.forbid