Skip to content

Kubernetes

zenml.integrations.kubernetes

Kubernetes integration for Kubernetes-native orchestration.

The Kubernetes integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubernetes orchestrator with the CLI tool.

Attributes

KUBERNETES = 'kubernetes' module-attribute

KUBERNETES_ORCHESTRATOR_FLAVOR = 'kubernetes' module-attribute

KUBERNETES_STEP_OPERATOR_FLAVOR = 'kubernetes' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

KubernetesIntegration

Bases: Integration

Definition of Kubernetes integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Kubernetes integration.

Returns:

Type Description
List[Type[Flavor]]

List of new stack component flavors.

Source code in src/zenml/integrations/kubernetes/__init__.py
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Kubernetes integration.

    Returns:
        List of new stack component flavors.
    """
    from zenml.integrations.kubernetes.flavors import (
        KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor
    )

    return [KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor]

Modules

flavors

Kubernetes integration flavors.

Classes
KubernetesOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, KubernetesOrchestratorSettings

Configuration for the Kubernetes orchestrator.

Attributes:

Name Type Description
incluster bool

If True, the orchestrator will run the pipeline inside the same cluster in which it itself is running. This requires the client to run in a Kubernetes pod itself. If set, the kubernetes_context config option is ignored. If the stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_context Optional[str]

Name of a Kubernetes context to run pipelines in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory.

kubernetes_namespace str

Name of the Kubernetes namespace to be used. If not provided, zenml namespace will be used.

local bool

If True, the orchestrator will assume it is connected to a local kubernetes cluster and will perform additional validations and operations to allow using the orchestrator in combination with other local stack components that store data in the local filesystem (i.e. it will mount the local stores directory into the pipeline containers).

skip_local_validations bool

If True, the local validations will be skipped.

parallel_step_startup_waiting_period Optional[float]

How long to wait in between starting parallel steps. This can be used to distribute server load when running pipelines with a huge amount of parallel steps.

pass_zenml_token_as_secret bool

If True, the ZenML token will be passed as a Kubernetes secret to the pods. For this to work, the Kubernetes client must have permissions to create secrets in the namespace.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

KubernetesOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Kubernetes orchestrator flavor.

Attributes
config_class: Type[KubernetesOrchestratorConfig] property

Returns KubernetesOrchestratorConfig config class.

Returns:

Type Description
Type[KubernetesOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes orchestrator.

Attributes:

Name Type Description
synchronous bool

If True, the client running a pipeline using this orchestrator waits until all steps finish running. If False, the client returns immediately and the pipeline is executed asynchronously. Defaults to True.

timeout int

How many seconds to wait for synchronous runs. 0 means to wait for an unlimited duration.

service_account_name Optional[str]

Name of the service account to use for the orchestrator pod. If not provided, a new service account with "edit" permissions will be created.

step_pod_service_account_name Optional[str]

Name of the service account to use for the step pods. If not provided, the default service account will be used.

privileged bool

If the container should be run in privileged mode.

pod_settings Optional[KubernetesPodSettings]

Pod settings to apply to pods executing the steps.

orchestrator_pod_settings Optional[KubernetesPodSettings]

Pod settings to apply to the pod which is launching the actual steps.

pod_startup_timeout int

The maximum time to wait for a pending step pod to start (in seconds).

pod_failure_max_retries int

The maximum number of times to retry a step pod if the step Kubernetes pod fails to start

pod_failure_retry_delay int

The delay in seconds between pod failure retries and pod startup retries (in seconds)

pod_failure_backoff float

The backoff factor for pod failure retries and pod startup retries.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
KubernetesStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, KubernetesStepOperatorSettings

Configuration for the Kubernetes step operator.

Attributes:

Name Type Description
kubernetes_namespace str

Name of the Kubernetes namespace to be used.

incluster bool

If True, the step operator will run the pipeline inside the same cluster in which the orchestrator is running. For this to work, the pod running the orchestrator needs permissions to create new pods. If set, the kubernetes_context config option is ignored. If the stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_context Optional[str]

Name of a Kubernetes context to run pipelines in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

KubernetesStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Kubernetes step operator flavor.

Attributes
config_class: Type[KubernetesStepOperatorConfig] property

Returns KubernetesStepOperatorConfig config class.

Returns:

Type Description
Type[KubernetesStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes step operator.

Attributes:

Name Type Description
pod_settings Optional[KubernetesPodSettings]

Pod settings to apply to pods executing the steps.

service_account_name Optional[str]

Name of the service account to use for the pod.

privileged bool

If the container should be run in privileged mode.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
kubernetes_orchestrator_flavor

Kubernetes orchestrator flavor.

Classes
KubernetesOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, KubernetesOrchestratorSettings

Configuration for the Kubernetes orchestrator.

Attributes:

Name Type Description
incluster bool

If True, the orchestrator will run the pipeline inside the same cluster in which it itself is running. This requires the client to run in a Kubernetes pod itself. If set, the kubernetes_context config option is ignored. If the stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_context Optional[str]

Name of a Kubernetes context to run pipelines in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory.

kubernetes_namespace str

Name of the Kubernetes namespace to be used. If not provided, zenml namespace will be used.

local bool

If True, the orchestrator will assume it is connected to a local kubernetes cluster and will perform additional validations and operations to allow using the orchestrator in combination with other local stack components that store data in the local filesystem (i.e. it will mount the local stores directory into the pipeline containers).

skip_local_validations bool

If True, the local validations will be skipped.

parallel_step_startup_waiting_period Optional[float]

How long to wait in between starting parallel steps. This can be used to distribute server load when running pipelines with a huge amount of parallel steps.

pass_zenml_token_as_secret bool

If True, the ZenML token will be passed as a Kubernetes secret to the pods. For this to work, the Kubernetes client must have permissions to create secrets in the namespace.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

KubernetesOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Kubernetes orchestrator flavor.

Attributes
config_class: Type[KubernetesOrchestratorConfig] property

Returns KubernetesOrchestratorConfig config class.

Returns:

Type Description
Type[KubernetesOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes orchestrator.

Attributes:

Name Type Description
synchronous bool

If True, the client running a pipeline using this orchestrator waits until all steps finish running. If False, the client returns immediately and the pipeline is executed asynchronously. Defaults to True.

timeout int

How many seconds to wait for synchronous runs. 0 means to wait for an unlimited duration.

service_account_name Optional[str]

Name of the service account to use for the orchestrator pod. If not provided, a new service account with "edit" permissions will be created.

step_pod_service_account_name Optional[str]

Name of the service account to use for the step pods. If not provided, the default service account will be used.

privileged bool

If the container should be run in privileged mode.

pod_settings Optional[KubernetesPodSettings]

Pod settings to apply to pods executing the steps.

orchestrator_pod_settings Optional[KubernetesPodSettings]

Pod settings to apply to the pod which is launching the actual steps.

pod_startup_timeout int

The maximum time to wait for a pending step pod to start (in seconds).

pod_failure_max_retries int

The maximum number of times to retry a step pod if the step Kubernetes pod fails to start

pod_failure_retry_delay int

The delay in seconds between pod failure retries and pod startup retries (in seconds)

pod_failure_backoff float

The backoff factor for pod failure retries and pod startup retries.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
kubernetes_step_operator_flavor

Kubernetes step operator flavor.

Classes
KubernetesStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, KubernetesStepOperatorSettings

Configuration for the Kubernetes step operator.

Attributes:

Name Type Description
kubernetes_namespace str

Name of the Kubernetes namespace to be used.

incluster bool

If True, the step operator will run the pipeline inside the same cluster in which the orchestrator is running. For this to work, the pod running the orchestrator needs permissions to create new pods. If set, the kubernetes_context config option is ignored. If the stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_context Optional[str]

Name of a Kubernetes context to run pipelines in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

KubernetesStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Kubernetes step operator flavor.

Attributes
config_class: Type[KubernetesStepOperatorConfig] property

Returns KubernetesStepOperatorConfig config class.

Returns:

Type Description
Type[KubernetesStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes step operator.

Attributes:

Name Type Description
pod_settings Optional[KubernetesPodSettings]

Pod settings to apply to pods executing the steps.

service_account_name Optional[str]

Name of the service account to use for the pod.

privileged bool

If the container should be run in privileged mode.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)

orchestrators

Kubernetes-native orchestration.

Classes
KubernetesOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator for running ZenML pipelines using native Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesOrchestratorConfig property

Returns the KubernetesOrchestratorConfig config.

Returns:

Type Description
KubernetesOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Defines the validator that checks whether the stack is valid.

Returns:

Type Description
Optional[StackValidator]

Stack validator.

Functions
apply_default_resource_requests(memory: str, cpu: Optional[str] = None, pod_settings: Optional[KubernetesPodSettings] = None) -> KubernetesPodSettings classmethod

Applies default resource requests to a pod settings object.

Parameters:

Name Type Description Default
memory str

The memory resource request.

required
cpu Optional[str]

The CPU resource request.

None
pod_settings Optional[KubernetesPodSettings]

The pod settings to update. A new one will be created if not provided.

None

Returns:

Type Description
KubernetesPodSettings

The new or updated pod settings.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@classmethod
def apply_default_resource_requests(
    cls,
    memory: str,
    cpu: Optional[str] = None,
    pod_settings: Optional[KubernetesPodSettings] = None,
) -> KubernetesPodSettings:
    """Applies default resource requests to a pod settings object.

    Args:
        memory: The memory resource request.
        cpu: The CPU resource request.
        pod_settings: The pod settings to update. A new one will be created
            if not provided.

    Returns:
        The new or updated pod settings.
    """
    resources = {
        "requests": {"memory": memory},
    }
    if cpu:
        resources["requests"]["cpu"] = cpu
    if not pod_settings:
        pod_settings = KubernetesPodSettings(resources=resources)
    elif not pod_settings.resources:
        # We can't update the pod settings in place (because it's a frozen
        # pydantic model), so we have to create a new one.
        pod_settings = KubernetesPodSettings(
            **pod_settings.model_dump(exclude_unset=True),
            resources=resources,
        )
    else:
        set_requests = pod_settings.resources.get("requests", {})
        resources["requests"].update(set_requests)
        pod_settings.resources["requests"] = resources["requests"]

    return pod_settings
get_kube_client(incluster: Optional[bool] = None) -> k8s_client.ApiClient

Getter for the Kubernetes API client.

Parameters:

Name Type Description Default
incluster Optional[bool]

Whether to use the in-cluster config or not. Overrides the incluster setting in the config.

None

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

if the Kubernetes connector behaves unexpectedly.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_kube_client(
    self, incluster: Optional[bool] = None
) -> k8s_client.ApiClient:
    """Getter for the Kubernetes API client.

    Args:
        incluster: Whether to use the in-cluster config or not. Overrides
            the `incluster` setting in the config.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: if the Kubernetes connector behaves unexpectedly.
    """
    if incluster is None:
        incluster = self.config.incluster

    if incluster:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
get_kubernetes_contexts() -> Tuple[List[str], str]

Get list of configured Kubernetes contexts and the active context.

Raises:

Type Description
RuntimeError

if the Kubernetes configuration cannot be loaded.

Returns:

Name Type Description
context_name List[str]

List of configured Kubernetes contexts

active_context_name str

Name of the active Kubernetes context.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
    """Get list of configured Kubernetes contexts and the active context.

    Raises:
        RuntimeError: if the Kubernetes configuration cannot be loaded.

    Returns:
        context_name: List of configured Kubernetes contexts
        active_context_name: Name of the active Kubernetes context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException as e:
        raise RuntimeError(
            "Could not load the Kubernetes configuration"
        ) from e

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_KUBERNETES_RUN_ID}."
        )
get_token_secret_name(deployment_id: UUID) -> str

Returns the name of the secret that contains the ZenML token.

Parameters:

Name Type Description Default
deployment_id UUID

The ID of the deployment.

required

Returns:

Type Description
str

The name of the secret that contains the ZenML token.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
380
381
382
383
384
385
386
387
388
389
def get_token_secret_name(self, deployment_id: UUID) -> str:
    """Returns the name of the secret that contains the ZenML token.

    Args:
        deployment_id: The ID of the deployment.

    Returns:
        The name of the secret that contains the ZenML token.
    """
    return f"zenml-token-{deployment_id}"
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Runs the pipeline in Kubernetes.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If the Kubernetes orchestrator is not configured.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Runs the pipeline in Kubernetes.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If the Kubernetes orchestrator is not configured.
    """
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not yet supported for "
                "the Kubernetes orchestrator, ignoring resource "
                "configuration for step %s.",
                step_name,
            )

    pipeline_name = deployment.pipeline_configuration.name

    # We already make sure the orchestrator run name has the correct length
    # to make sure we don't cut off the randomized suffix later when
    # sanitizing the pod name. This avoids any pod naming collisions.
    max_length = kube_utils.calculate_max_pod_name_length_for_namespace(
        namespace=self.config.kubernetes_namespace
    )
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name, max_length=max_length
    )
    pod_name = kube_utils.sanitize_pod_name(
        orchestrator_run_name, namespace=self.config.kubernetes_namespace
    )

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    # Build entrypoint command and args for the orchestrator pod.
    # This will internally also build the command/args for all step pods.
    command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
    args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
        run_name=orchestrator_run_name,
        deployment_id=deployment.id,
        kubernetes_namespace=self.config.kubernetes_namespace,
        run_id=placeholder_run.id if placeholder_run else None,
    )

    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )

    # Authorize pod to run Kubernetes commands inside the cluster.
    service_account_name = self._get_service_account_name(settings)

    # We set some default minimum resource requests for the orchestrator pod
    # here if the user has not specified any, because the orchestrator pod
    # takes up some memory resources itself and, if not specified, the pod
    # will be scheduled on any node regardless of available memory and risk
    # negatively impacting or even crashing the node due to memory pressure.
    orchestrator_pod_settings = self.apply_default_resource_requests(
        memory="400Mi",
        cpu="100m",
        pod_settings=settings.orchestrator_pod_settings,
    )

    if self.config.pass_zenml_token_as_secret:
        secret_name = self.get_token_secret_name(deployment.id)
        token = environment.pop("ZENML_STORE_API_TOKEN")
        kube_utils.create_or_update_secret(
            core_api=self._k8s_core_api,
            namespace=self.config.kubernetes_namespace,
            secret_name=secret_name,
            data={KUBERNETES_SECRET_TOKEN_KEY_NAME: token},
        )
        orchestrator_pod_settings.env.append(
            {
                "name": "ZENML_STORE_API_TOKEN",
                "valueFrom": {
                    "secretKeyRef": {
                        "name": secret_name,
                        "key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
                    }
                },
            }
        )

    # Schedule as CRON job if CRON schedule is given.
    if deployment.schedule:
        if not deployment.schedule.cron_expression:
            raise RuntimeError(
                "The Kubernetes orchestrator only supports scheduling via "
                "CRON jobs, but the run was configured with a manual "
                "schedule. Use `Schedule(cron_expression=...)` instead."
            )
        cron_expression = deployment.schedule.cron_expression
        cron_job_manifest = build_cron_job_manifest(
            cron_expression=cron_expression,
            run_name=orchestrator_run_name,
            pod_name=pod_name,
            pipeline_name=pipeline_name,
            image_name=image,
            command=command,
            args=args,
            service_account_name=service_account_name,
            privileged=False,
            pod_settings=orchestrator_pod_settings,
            env=environment,
            mount_local_stores=self.config.is_local,
        )

        self._k8s_batch_api.create_namespaced_cron_job(
            body=cron_job_manifest,
            namespace=self.config.kubernetes_namespace,
        )
        logger.info(
            f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
            f'`"{cron_expression}"`.'
        )
        return
    else:
        # Create and run the orchestrator pod.
        pod_manifest = build_pod_manifest(
            run_name=orchestrator_run_name,
            pod_name=pod_name,
            pipeline_name=pipeline_name,
            image_name=image,
            command=command,
            args=args,
            privileged=False,
            pod_settings=orchestrator_pod_settings,
            service_account_name=service_account_name,
            env=environment,
            mount_local_stores=self.config.is_local,
        )

        self._k8s_core_api.create_namespaced_pod(
            namespace=self.config.kubernetes_namespace,
            body=pod_manifest,
        )

        # Wait for the orchestrator pod to finish and stream logs.
        if settings.synchronous:
            logger.info("Waiting for Kubernetes orchestrator pod...")
            kube_utils.wait_pod(
                kube_client_fn=self.get_kube_client,
                pod_name=pod_name,
                namespace=self.config.kubernetes_namespace,
                exit_condition_lambda=kube_utils.pod_is_done,
                timeout_sec=settings.timeout,
                stream_logs=True,
            )
        else:
            logger.info(
                f"Orchestration started asynchronously in pod "
                f"`{self.config.kubernetes_namespace}:{pod_name}`. "
                f"Run the following command to inspect the logs: "
                f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
            )
Modules
kube_utils

Utilities for Kubernetes related functions.

Internal interface: no backwards compatibility guarantees. Adjusted from https://github.com/tensorflow/tfx/blob/master/tfx/utils/kube_utils.py.

Classes
PodPhase

Bases: Enum

Phase of the Kubernetes pod.

Pod phases are defined in https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.

Functions
calculate_max_pod_name_length_for_namespace(namespace: str) -> int

Calculate the max pod length for a certain namespace.

Parameters:

Name Type Description Default
namespace str

The namespace in which the pod will be created.

required

Returns:

Type Description
int

The maximum pod name length.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def calculate_max_pod_name_length_for_namespace(namespace: str) -> int:
    """Calculate the max pod length for a certain namespace.

    Args:
        namespace: The namespace in which the pod will be created.

    Returns:
        The maximum pod name length.
    """
    # Kubernetes allows Pod names to have 253 characters. However, when
    # creating a pod they try to create a log file which is called
    # <NAMESPACE>_<POD_NAME>_<UUID>, which adds additional characters and
    # runs into filesystem limitations for filename lengths (255). We therefore
    # subtract the length of a UUID (36), the two underscores and the
    # namespace length from the max filename length.
    return 255 - 38 - len(namespace)
create_edit_service_account(core_api: k8s_client.CoreV1Api, rbac_api: k8s_client.RbacAuthorizationV1Api, service_account_name: str, namespace: str, role_binding_name: str = 'zenml-edit') -> None

Create a new Kubernetes service account with "edit" rights.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
rbac_api RbacAuthorizationV1Api

Client of Rbac Authorization V1 API of Kubernetes API.

required
service_account_name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

required
role_binding_name str

Name of the role binding. Defaults to "zenml-edit".

'zenml-edit'
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def create_edit_service_account(
    core_api: k8s_client.CoreV1Api,
    rbac_api: k8s_client.RbacAuthorizationV1Api,
    service_account_name: str,
    namespace: str,
    role_binding_name: str = "zenml-edit",
) -> None:
    """Create a new Kubernetes service account with "edit" rights.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        rbac_api: Client of Rbac Authorization V1 API of Kubernetes API.
        service_account_name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".
        role_binding_name: Name of the role binding. Defaults to "zenml-edit".
    """
    rb_manifest = build_role_binding_manifest_for_service_account(
        name=role_binding_name,
        role_name="edit",
        service_account_name=service_account_name,
        namespace=namespace,
    )
    _if_not_exists(rbac_api.create_namespaced_role_binding)(
        namespace=namespace, body=rb_manifest
    )

    sa_manifest = build_service_account_manifest(
        name=service_account_name, namespace=namespace
    )
    _if_not_exists(core_api.create_namespaced_service_account)(
        namespace=namespace,
        body=sa_manifest,
    )
create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None

Create a Kubernetes namespace.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

Kubernetes namespace. Defaults to "default".

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
369
370
371
372
373
374
375
376
377
def create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None:
    """Create a Kubernetes namespace.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: Kubernetes namespace. Defaults to "default".
    """
    manifest = build_namespace_manifest(namespace)
    _if_not_exists(core_api.create_namespace)(body=manifest)
create_or_update_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str, data: Dict[str, Optional[str]]) -> None

Create a Kubernetes secret if it doesn't exist, or update it if it does.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to create or update the secret.

required
secret_name str

The name of the secret to create or update.

required
data Dict[str, Optional[str]]

The secret data. If the value is None, the key will be removed from the secret.

required

Raises:

Type Description
ApiException

If the secret creation failed for any reason other than the secret already existing.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def create_or_update_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
    data: Dict[str, Optional[str]],
) -> None:
    """Create a Kubernetes secret if it doesn't exist, or update it if it does.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to create or update the secret.
        secret_name: The name of the secret to create or update.
        data: The secret data. If the value is None, the key will be removed
            from the secret.

    Raises:
        ApiException: If the secret creation failed for any reason other than
            the secret already existing.
    """
    try:
        create_secret(core_api, namespace, secret_name, data)
    except ApiException as e:
        if e.status != 409:
            raise
        update_secret(core_api, namespace, secret_name, data)
create_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str, data: Dict[str, Optional[str]]) -> None

Create a Kubernetes secret.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to create the secret.

required
secret_name str

The name of the secret to create.

required
data Dict[str, Optional[str]]

The secret data.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def create_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
    data: Dict[str, Optional[str]],
) -> None:
    """Create a Kubernetes secret.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to create the secret.
        secret_name: The name of the secret to create.
        data: The secret data.
    """
    core_api.create_namespaced_secret(
        namespace=namespace,
        body=build_secret_manifest(name=secret_name, data=data),
    )
delete_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str) -> None

Delete a Kubernetes secret.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to delete the secret.

required
secret_name str

The name of the secret to delete.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def delete_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
) -> None:
    """Delete a Kubernetes secret.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to delete the secret.
        secret_name: The name of the secret to delete.
    """
    core_api.delete_namespaced_secret(
        name=secret_name,
        namespace=namespace,
    )
get_pod(core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str) -> Optional[k8s_client.V1Pod]

Get a pod from Kubernetes metadata API.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of CoreV1Api of Kubernetes API.

required
pod_name str

The name of the pod.

required
namespace str

The namespace of the pod.

required

Raises:

Type Description
RuntimeError

When it sees unexpected errors from Kubernetes API.

Returns:

Type Description
Optional[V1Pod]

The found pod object. None if it's not found.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def get_pod(
    core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str
) -> Optional[k8s_client.V1Pod]:
    """Get a pod from Kubernetes metadata API.

    Args:
        core_api: Client of `CoreV1Api` of Kubernetes API.
        pod_name: The name of the pod.
        namespace: The namespace of the pod.

    Raises:
        RuntimeError: When it sees unexpected errors from Kubernetes API.

    Returns:
        The found pod object. None if it's not found.
    """
    try:
        return core_api.read_namespaced_pod(name=pod_name, namespace=namespace)
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            return None
        raise RuntimeError from e
is_inside_kubernetes() -> bool

Check whether we are inside a Kubernetes cluster or on a remote host.

Returns:

Type Description
bool

True if inside a Kubernetes cluster, else False.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
69
70
71
72
73
74
75
76
77
78
79
def is_inside_kubernetes() -> bool:
    """Check whether we are inside a Kubernetes cluster or on a remote host.

    Returns:
        True if inside a Kubernetes cluster, else False.
    """
    try:
        k8s_config.load_incluster_config()
        return True
    except k8s_config.ConfigException:
        return False
load_kube_config(incluster: bool = False, context: Optional[str] = None) -> None

Load the Kubernetes client config.

Parameters:

Name Type Description Default
incluster bool

Whether to load the in-cluster config.

False
context Optional[str]

Name of the Kubernetes context. If not provided, uses the currently active context. Will be ignored if incluster is True.

None
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def load_kube_config(
    incluster: bool = False, context: Optional[str] = None
) -> None:
    """Load the Kubernetes client config.

    Args:
        incluster: Whether to load the in-cluster config.
        context: Name of the Kubernetes context. If not provided, uses the
            currently active context. Will be ignored if `incluster` is True.
    """
    if incluster:
        k8s_config.load_incluster_config()
    else:
        k8s_config.load_kube_config(context=context)
pod_failed(pod: k8s_client.V1Pod) -> bool

Check if pod status is 'Failed'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

True if pod status is 'Failed' else False.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
168
169
170
171
172
173
174
175
176
177
def pod_failed(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is 'Failed'.

    Args:
        pod: Kubernetes pod.

    Returns:
        True if pod status is 'Failed' else False.
    """
    return pod.status.phase == PodPhase.FAILED.value  # type: ignore[no-any-return]
pod_is_done(pod: k8s_client.V1Pod) -> bool

Check if pod status is 'Succeeded'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

True if pod status is 'Succeeded' else False.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
180
181
182
183
184
185
186
187
188
189
def pod_is_done(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is 'Succeeded'.

    Args:
        pod: Kubernetes pod.

    Returns:
        True if pod status is 'Succeeded' else False.
    """
    return pod.status.phase == PodPhase.SUCCEEDED.value  # type: ignore[no-any-return]
pod_is_not_pending(pod: k8s_client.V1Pod) -> bool

Check if pod status is not 'Pending'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

False if the pod status is 'Pending' else True.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
156
157
158
159
160
161
162
163
164
165
def pod_is_not_pending(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is not 'Pending'.

    Args:
        pod: Kubernetes pod.

    Returns:
        False if the pod status is 'Pending' else True.
    """
    return pod.status.phase != PodPhase.PENDING.value  # type: ignore[no-any-return]
sanitize_label(label: str) -> str

Sanitize a label for a Kubernetes resource.

Parameters:

Name Type Description Default
label str

The label to sanitize.

required

Returns:

Type Description
str

The sanitized label.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def sanitize_label(label: str) -> str:
    """Sanitize a label for a Kubernetes resource.

    Args:
        label: The label to sanitize.

    Returns:
        The sanitized label.
    """
    # https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#rfc-1035-label-names
    label = re.sub(r"[^a-z0-9-]", "-", label.lower())
    label = re.sub(r"^[-]+", "", label)
    label = re.sub(r"[-]+$", "", label)
    label = re.sub(r"[-]+", "-", label)

    return label[:63]
sanitize_pod_name(pod_name: str, namespace: str) -> str

Sanitize pod names so they conform to Kubernetes pod naming convention.

Parameters:

Name Type Description Default
pod_name str

Arbitrary input pod name.

required
namespace str

Namespace in which the Pod will be created.

required

Returns:

Type Description
str

Sanitized pod name.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def sanitize_pod_name(pod_name: str, namespace: str) -> str:
    """Sanitize pod names so they conform to Kubernetes pod naming convention.

    Args:
        pod_name: Arbitrary input pod name.
        namespace: Namespace in which the Pod will be created.

    Returns:
        Sanitized pod name.
    """
    # https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-subdomain-names
    pod_name = re.sub(r"[^a-z0-9-]", "-", pod_name.lower())
    pod_name = re.sub(r"^[-]+", "", pod_name)
    pod_name = re.sub(r"[-]+$", "", pod_name)
    pod_name = re.sub(r"[-]+", "-", pod_name)

    allowed_length = calculate_max_pod_name_length_for_namespace(
        namespace=namespace
    )
    return pod_name[:allowed_length]
update_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str, data: Dict[str, Optional[str]]) -> None

Update a Kubernetes secret.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to update the secret.

required
secret_name str

The name of the secret to update.

required
data Dict[str, Optional[str]]

The secret data. If the value is None, the key will be removed from the secret.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
def update_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
    data: Dict[str, Optional[str]],
) -> None:
    """Update a Kubernetes secret.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to update the secret.
        secret_name: The name of the secret to update.
        data: The secret data. If the value is None, the key will be removed
            from the secret.
    """
    core_api.patch_namespaced_secret(
        namespace=namespace,
        name=secret_name,
        body=build_secret_manifest(name=secret_name, data=data),
    )
wait_pod(kube_client_fn: Callable[[], k8s_client.ApiClient], pod_name: str, namespace: str, exit_condition_lambda: Callable[[k8s_client.V1Pod], bool], timeout_sec: int = 0, exponential_backoff: bool = False, stream_logs: bool = False) -> k8s_client.V1Pod

Wait for a pod to meet an exit condition.

Parameters:

Name Type Description Default
kube_client_fn Callable[[], ApiClient]

the kube client fn is a function that is called periodically and is used to get a CoreV1Api client for the Kubernetes API. It should cache the client to avoid unnecessary overhead but should also instantiate a new client if the previous one is using credentials that are about to expire.

required
pod_name str

The name of the pod.

required
namespace str

The namespace of the pod.

required
exit_condition_lambda Callable[[V1Pod], bool]

A lambda which will be called periodically to wait for a pod to exit. The function returns True to exit.

required
timeout_sec int

Timeout in seconds to wait for pod to reach exit condition, or 0 to wait for an unlimited duration. Defaults to unlimited.

0
exponential_backoff bool

Whether to use exponential back off for polling. Defaults to False.

False
stream_logs bool

Whether to stream the pod logs to zenml.logger.info(). Defaults to False.

False

Raises:

Type Description
RuntimeError

when the function times out.

Returns:

Type Description
V1Pod

The pod object which meets the exit condition.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def wait_pod(
    kube_client_fn: Callable[[], k8s_client.ApiClient],
    pod_name: str,
    namespace: str,
    exit_condition_lambda: Callable[[k8s_client.V1Pod], bool],
    timeout_sec: int = 0,
    exponential_backoff: bool = False,
    stream_logs: bool = False,
) -> k8s_client.V1Pod:
    """Wait for a pod to meet an exit condition.

    Args:
        kube_client_fn: the kube client fn is a function that is called
            periodically and is used to get a `CoreV1Api` client for
            the Kubernetes API. It should cache the client to avoid
            unnecessary overhead but should also instantiate a new client if
            the previous one is using credentials that are about to expire.
        pod_name: The name of the pod.
        namespace: The namespace of the pod.
        exit_condition_lambda: A lambda
            which will be called periodically to wait for a pod to exit. The
            function returns True to exit.
        timeout_sec: Timeout in seconds to wait for pod to reach exit
            condition, or 0 to wait for an unlimited duration.
            Defaults to unlimited.
        exponential_backoff: Whether to use exponential back off for polling.
            Defaults to False.
        stream_logs: Whether to stream the pod logs to
            `zenml.logger.info()`. Defaults to False.

    Raises:
        RuntimeError: when the function times out.

    Returns:
        The pod object which meets the exit condition.
    """
    start_time = utc_now()

    # Link to exponential back-off algorithm used here:
    # https://cloud.google.com/storage/docs/exponential-backoff
    backoff_interval = 1
    maximum_backoff = 32

    logged_lines = 0

    while True:
        kube_client = kube_client_fn()
        core_api = k8s_client.CoreV1Api(kube_client)

        resp = get_pod(core_api, pod_name, namespace)

        if resp is None:
            raise RuntimeError(f"Pod `{namespace}:{pod_name}` not found.")

        # Stream logs to `zenml.logger.info()`.
        # TODO: can we do this without parsing all logs every time?
        if stream_logs and pod_is_not_pending(resp):
            response = core_api.read_namespaced_pod_log(
                name=pod_name,
                namespace=namespace,
                _preload_content=False,
            )
            raw_data = response.data
            decoded_log = raw_data.decode("utf-8", errors="replace")
            logs = decoded_log.splitlines()
            if len(logs) > logged_lines:
                for line in logs[logged_lines:]:
                    logger.info(line)
                logged_lines = len(logs)

        # Raise an error if the pod failed.
        if pod_failed(resp):
            raise RuntimeError(f"Pod `{namespace}:{pod_name}` failed.")

        # Check if pod is in desired state (e.g. finished / running / ...).
        if exit_condition_lambda(resp):
            return resp

        # Check if wait timed out.
        elapse_time = utc_now() - start_time
        if elapse_time.seconds >= timeout_sec and timeout_sec != 0:
            raise RuntimeError(
                f"Waiting for pod `{namespace}:{pod_name}` timed out after "
                f"{timeout_sec} seconds."
            )

        # Wait (using exponential backoff).
        time.sleep(backoff_interval)
        if exponential_backoff and backoff_interval < maximum_backoff:
            backoff_interval *= 2
kubernetes_orchestrator

Kubernetes-native orchestrator.

Classes
KubernetesOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator for running ZenML pipelines using native Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesOrchestratorConfig property

Returns the KubernetesOrchestratorConfig config.

Returns:

Type Description
KubernetesOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Defines the validator that checks whether the stack is valid.

Returns:

Type Description
Optional[StackValidator]

Stack validator.

Functions
apply_default_resource_requests(memory: str, cpu: Optional[str] = None, pod_settings: Optional[KubernetesPodSettings] = None) -> KubernetesPodSettings classmethod

Applies default resource requests to a pod settings object.

Parameters:

Name Type Description Default
memory str

The memory resource request.

required
cpu Optional[str]

The CPU resource request.

None
pod_settings Optional[KubernetesPodSettings]

The pod settings to update. A new one will be created if not provided.

None

Returns:

Type Description
KubernetesPodSettings

The new or updated pod settings.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@classmethod
def apply_default_resource_requests(
    cls,
    memory: str,
    cpu: Optional[str] = None,
    pod_settings: Optional[KubernetesPodSettings] = None,
) -> KubernetesPodSettings:
    """Applies default resource requests to a pod settings object.

    Args:
        memory: The memory resource request.
        cpu: The CPU resource request.
        pod_settings: The pod settings to update. A new one will be created
            if not provided.

    Returns:
        The new or updated pod settings.
    """
    resources = {
        "requests": {"memory": memory},
    }
    if cpu:
        resources["requests"]["cpu"] = cpu
    if not pod_settings:
        pod_settings = KubernetesPodSettings(resources=resources)
    elif not pod_settings.resources:
        # We can't update the pod settings in place (because it's a frozen
        # pydantic model), so we have to create a new one.
        pod_settings = KubernetesPodSettings(
            **pod_settings.model_dump(exclude_unset=True),
            resources=resources,
        )
    else:
        set_requests = pod_settings.resources.get("requests", {})
        resources["requests"].update(set_requests)
        pod_settings.resources["requests"] = resources["requests"]

    return pod_settings
get_kube_client(incluster: Optional[bool] = None) -> k8s_client.ApiClient

Getter for the Kubernetes API client.

Parameters:

Name Type Description Default
incluster Optional[bool]

Whether to use the in-cluster config or not. Overrides the incluster setting in the config.

None

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

if the Kubernetes connector behaves unexpectedly.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_kube_client(
    self, incluster: Optional[bool] = None
) -> k8s_client.ApiClient:
    """Getter for the Kubernetes API client.

    Args:
        incluster: Whether to use the in-cluster config or not. Overrides
            the `incluster` setting in the config.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: if the Kubernetes connector behaves unexpectedly.
    """
    if incluster is None:
        incluster = self.config.incluster

    if incluster:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
get_kubernetes_contexts() -> Tuple[List[str], str]

Get list of configured Kubernetes contexts and the active context.

Raises:

Type Description
RuntimeError

if the Kubernetes configuration cannot be loaded.

Returns:

Name Type Description
context_name List[str]

List of configured Kubernetes contexts

active_context_name str

Name of the active Kubernetes context.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
    """Get list of configured Kubernetes contexts and the active context.

    Raises:
        RuntimeError: if the Kubernetes configuration cannot be loaded.

    Returns:
        context_name: List of configured Kubernetes contexts
        active_context_name: Name of the active Kubernetes context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException as e:
        raise RuntimeError(
            "Could not load the Kubernetes configuration"
        ) from e

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_KUBERNETES_RUN_ID}."
        )
get_token_secret_name(deployment_id: UUID) -> str

Returns the name of the secret that contains the ZenML token.

Parameters:

Name Type Description Default
deployment_id UUID

The ID of the deployment.

required

Returns:

Type Description
str

The name of the secret that contains the ZenML token.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
380
381
382
383
384
385
386
387
388
389
def get_token_secret_name(self, deployment_id: UUID) -> str:
    """Returns the name of the secret that contains the ZenML token.

    Args:
        deployment_id: The ID of the deployment.

    Returns:
        The name of the secret that contains the ZenML token.
    """
    return f"zenml-token-{deployment_id}"
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Runs the pipeline in Kubernetes.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If the Kubernetes orchestrator is not configured.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Runs the pipeline in Kubernetes.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If the Kubernetes orchestrator is not configured.
    """
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not yet supported for "
                "the Kubernetes orchestrator, ignoring resource "
                "configuration for step %s.",
                step_name,
            )

    pipeline_name = deployment.pipeline_configuration.name

    # We already make sure the orchestrator run name has the correct length
    # to make sure we don't cut off the randomized suffix later when
    # sanitizing the pod name. This avoids any pod naming collisions.
    max_length = kube_utils.calculate_max_pod_name_length_for_namespace(
        namespace=self.config.kubernetes_namespace
    )
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name, max_length=max_length
    )
    pod_name = kube_utils.sanitize_pod_name(
        orchestrator_run_name, namespace=self.config.kubernetes_namespace
    )

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    # Build entrypoint command and args for the orchestrator pod.
    # This will internally also build the command/args for all step pods.
    command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
    args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
        run_name=orchestrator_run_name,
        deployment_id=deployment.id,
        kubernetes_namespace=self.config.kubernetes_namespace,
        run_id=placeholder_run.id if placeholder_run else None,
    )

    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )

    # Authorize pod to run Kubernetes commands inside the cluster.
    service_account_name = self._get_service_account_name(settings)

    # We set some default minimum resource requests for the orchestrator pod
    # here if the user has not specified any, because the orchestrator pod
    # takes up some memory resources itself and, if not specified, the pod
    # will be scheduled on any node regardless of available memory and risk
    # negatively impacting or even crashing the node due to memory pressure.
    orchestrator_pod_settings = self.apply_default_resource_requests(
        memory="400Mi",
        cpu="100m",
        pod_settings=settings.orchestrator_pod_settings,
    )

    if self.config.pass_zenml_token_as_secret:
        secret_name = self.get_token_secret_name(deployment.id)
        token = environment.pop("ZENML_STORE_API_TOKEN")
        kube_utils.create_or_update_secret(
            core_api=self._k8s_core_api,
            namespace=self.config.kubernetes_namespace,
            secret_name=secret_name,
            data={KUBERNETES_SECRET_TOKEN_KEY_NAME: token},
        )
        orchestrator_pod_settings.env.append(
            {
                "name": "ZENML_STORE_API_TOKEN",
                "valueFrom": {
                    "secretKeyRef": {
                        "name": secret_name,
                        "key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
                    }
                },
            }
        )

    # Schedule as CRON job if CRON schedule is given.
    if deployment.schedule:
        if not deployment.schedule.cron_expression:
            raise RuntimeError(
                "The Kubernetes orchestrator only supports scheduling via "
                "CRON jobs, but the run was configured with a manual "
                "schedule. Use `Schedule(cron_expression=...)` instead."
            )
        cron_expression = deployment.schedule.cron_expression
        cron_job_manifest = build_cron_job_manifest(
            cron_expression=cron_expression,
            run_name=orchestrator_run_name,
            pod_name=pod_name,
            pipeline_name=pipeline_name,
            image_name=image,
            command=command,
            args=args,
            service_account_name=service_account_name,
            privileged=False,
            pod_settings=orchestrator_pod_settings,
            env=environment,
            mount_local_stores=self.config.is_local,
        )

        self._k8s_batch_api.create_namespaced_cron_job(
            body=cron_job_manifest,
            namespace=self.config.kubernetes_namespace,
        )
        logger.info(
            f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
            f'`"{cron_expression}"`.'
        )
        return
    else:
        # Create and run the orchestrator pod.
        pod_manifest = build_pod_manifest(
            run_name=orchestrator_run_name,
            pod_name=pod_name,
            pipeline_name=pipeline_name,
            image_name=image,
            command=command,
            args=args,
            privileged=False,
            pod_settings=orchestrator_pod_settings,
            service_account_name=service_account_name,
            env=environment,
            mount_local_stores=self.config.is_local,
        )

        self._k8s_core_api.create_namespaced_pod(
            namespace=self.config.kubernetes_namespace,
            body=pod_manifest,
        )

        # Wait for the orchestrator pod to finish and stream logs.
        if settings.synchronous:
            logger.info("Waiting for Kubernetes orchestrator pod...")
            kube_utils.wait_pod(
                kube_client_fn=self.get_kube_client,
                pod_name=pod_name,
                namespace=self.config.kubernetes_namespace,
                exit_condition_lambda=kube_utils.pod_is_done,
                timeout_sec=settings.timeout,
                stream_logs=True,
            )
        else:
            logger.info(
                f"Orchestration started asynchronously in pod "
                f"`{self.config.kubernetes_namespace}:{pod_name}`. "
                f"Run the following command to inspect the logs: "
                f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
            )
Functions Modules
kubernetes_orchestrator_entrypoint

Entrypoint of the Kubernetes master/orchestrator pod.

Classes Functions
main() -> None

Entrypoint of the k8s master/orchestrator pod.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def main() -> None:
    """Entrypoint of the k8s master/orchestrator pod."""
    # Log to the container's stdout so it can be streamed by the client.
    logger.info("Kubernetes orchestrator pod started.")

    # Parse / extract args.
    args = parse_args()

    orchestrator_run_id = socket.gethostname()

    client = Client()

    deployment_config = client.get_deployment(args.deployment_id)

    pipeline_dag = {
        step_name: step.spec.upstream_steps
        for step_name, step in deployment_config.step_configurations.items()
    }
    step_command = StepEntrypointConfiguration.get_entrypoint_command()

    active_stack = client.active_stack
    mount_local_stores = active_stack.orchestrator.config.is_local

    # Get a Kubernetes client from the active Kubernetes orchestrator, but
    # override the `incluster` setting to `True` since we are running inside
    # the Kubernetes cluster.
    orchestrator = active_stack.orchestrator
    assert isinstance(orchestrator, KubernetesOrchestrator)
    kube_client = orchestrator.get_kube_client(incluster=True)
    core_api = k8s_client.CoreV1Api(kube_client)

    env = get_config_environment_vars()
    env[ENV_ZENML_KUBERNETES_RUN_ID] = orchestrator_run_id

    def run_step_on_kubernetes(step_name: str) -> None:
        """Run a pipeline step in a separate Kubernetes pod.

        Args:
            step_name: Name of the step.

        Raises:
            Exception: If the pod fails to start.
            TimeoutError: If the pod is still in a pending state after the
                maximum wait time has elapsed.
        """
        # Define Kubernetes pod name.
        pod_name = f"{orchestrator_run_id}-{step_name}"
        pod_name = kube_utils.sanitize_pod_name(
            pod_name, namespace=args.kubernetes_namespace
        )

        image = KubernetesOrchestrator.get_image(
            deployment=deployment_config, step_name=step_name
        )
        step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name, deployment_id=deployment_config.id
        )

        step_config = deployment_config.step_configurations[step_name].config

        kubernetes_settings = step_config.settings.get(
            "orchestrator.kubernetes", None
        )

        orchestrator_settings = {}
        if kubernetes_settings is not None:
            orchestrator_settings = kubernetes_settings.model_dump()

        settings = KubernetesOrchestratorSettings.model_validate(
            orchestrator_settings
        )

        # We set some default minimum memory resource requests for the step pod
        # here if the user has not specified any, because the step pod takes up
        # some memory resources itself and, if not specified, the pod will be
        # scheduled on any node regardless of available memory and risk
        # negatively impacting or even crashing the node due to memory pressure.
        pod_settings = KubernetesOrchestrator.apply_default_resource_requests(
            memory="400Mi",
            pod_settings=settings.pod_settings,
        )

        if orchestrator.config.pass_zenml_token_as_secret:
            env.pop("ZENML_STORE_API_TOKEN", None)
            secret_name = orchestrator.get_token_secret_name(
                deployment_config.id
            )
            pod_settings.env.append(
                {
                    "name": "ZENML_STORE_API_TOKEN",
                    "valueFrom": {
                        "secretKeyRef": {
                            "name": secret_name,
                            "key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
                        }
                    },
                }
            )

        # Define Kubernetes pod manifest.
        pod_manifest = build_pod_manifest(
            pod_name=pod_name,
            run_name=args.run_name,
            pipeline_name=deployment_config.pipeline_configuration.name,
            image_name=image,
            command=step_command,
            args=step_args,
            env=env,
            privileged=settings.privileged,
            pod_settings=pod_settings,
            service_account_name=settings.step_pod_service_account_name
            or settings.service_account_name,
            mount_local_stores=mount_local_stores,
        )

        retries = 0
        max_retries = settings.pod_failure_max_retries
        delay: float = settings.pod_failure_retry_delay
        backoff = settings.pod_failure_backoff

        while retries < max_retries:
            try:
                # Create and run pod.
                core_api.create_namespaced_pod(
                    namespace=args.kubernetes_namespace,
                    body=pod_manifest,
                )
                break
            except Exception as e:
                retries += 1
                if retries < max_retries:
                    logger.debug(
                        f"Pod for step `{step_name}` failed to start: {e}"
                    )
                    logger.error(
                        f"Failed to create pod for step `{step_name}`. "
                        f"Retrying in {delay} seconds..."
                    )
                    time.sleep(delay)
                    delay *= backoff
                else:
                    logger.error(
                        f"Failed to create pod for step `{step_name}` after "
                        f"{max_retries} retries. Exiting."
                    )
                    raise

        # Wait for pod to start
        max_wait = settings.pod_startup_timeout
        total_wait: float = 0
        delay = settings.pod_failure_retry_delay
        while True:
            pod = kube_utils.get_pod(
                core_api, pod_name, args.kubernetes_namespace
            )
            if not pod or kube_utils.pod_is_not_pending(pod):
                break
            if total_wait >= max_wait:
                # Have to delete the pending pod so it doesn't start running
                # later on.
                try:
                    core_api.delete_namespaced_pod(
                        name=pod_name,
                        namespace=args.kubernetes_namespace,
                    )
                except Exception:
                    pass
                raise TimeoutError(
                    f"Pod for step `{step_name}` is still in a pending state "
                    f"after {total_wait} seconds. Exiting."
                )

            if total_wait + delay > max_wait:
                delay = max_wait - total_wait
            total_wait += delay
            time.sleep(delay)
            delay *= backoff

        # Wait for pod to finish.
        logger.info(f"Waiting for pod of step `{step_name}` to finish...")
        try:
            kube_utils.wait_pod(
                kube_client_fn=lambda: orchestrator.get_kube_client(
                    incluster=True
                ),
                pod_name=pod_name,
                namespace=args.kubernetes_namespace,
                exit_condition_lambda=kube_utils.pod_is_done,
                stream_logs=True,
            )

            logger.info(f"Pod for step `{step_name}` completed.")
        except Exception:
            logger.error(f"Pod for step `{step_name}` failed.")

            raise

    def finalize_run(node_states: Dict[str, NodeStatus]) -> None:
        """Finalize the run.

        Args:
            node_states: The states of the nodes.
        """
        try:
            # Some steps may have failed because the pods could not be created.
            # We need to check for this and mark the step run as failed if so.

            # Fetch the pipeline run using any means possible.
            list_args: Dict[str, Any] = {}
            if args.run_id:
                # For a run triggered outside of a schedule, we can use the
                # placeholder run ID to find the pipeline run.
                list_args = dict(id=UUID(args.run_id))
            else:
                # For a run triggered by a schedule, we can only use the
                # orchestrator run ID to find the pipeline run.
                list_args = dict(orchestrator_run_id=orchestrator_run_id)

            pipeline_runs = client.list_pipeline_runs(
                hydrate=True,
                project=deployment_config.project.id,
                deployment_id=deployment_config.id,
                **list_args,
            )
            if not len(pipeline_runs):
                # No pipeline run found, so we can't mark any step runs as failed.
                return

            pipeline_run = pipeline_runs[0]
            pipeline_failed = False

            for step_name, node_state in node_states.items():
                if node_state != NodeStatus.FAILED:
                    continue

                pipeline_failed = True

                # If steps failed for any reason, we need to mark the step run as
                # failed, if it exists and it wasn't already in a final state.

                step_run = pipeline_run.steps.get(step_name)

                # Try to update the step run status, if it exists and is in
                # a transient state.
                if step_run and step_run.status in {
                    ExecutionStatus.INITIALIZING,
                    ExecutionStatus.RUNNING,
                }:
                    publish_utils.publish_failed_step_run(step_run.id)

            # If any steps failed and the pipeline run is still in a transient
            # state, we need to mark it as failed.
            if pipeline_failed and pipeline_run.status in {
                ExecutionStatus.INITIALIZING,
                ExecutionStatus.RUNNING,
            }:
                publish_utils.publish_failed_pipeline_run(pipeline_run.id)
        except AuthorizationException:
            # If a step of the pipeline failed or all of them completed
            # successfully, the pipeline run will be finished and the API token
            # will be invalidated. We catch this exception and do nothing here,
            # as the pipeline run status will already have been published.
            pass

    parallel_node_startup_waiting_period = (
        orchestrator.config.parallel_step_startup_waiting_period or 0.0
    )
    try:
        ThreadedDagRunner(
            dag=pipeline_dag,
            run_fn=run_step_on_kubernetes,
            finalize_fn=finalize_run,
            parallel_node_startup_waiting_period=parallel_node_startup_waiting_period,
        ).run()
        logger.info("Orchestration pod completed.")
    finally:
        if (
            orchestrator.config.pass_zenml_token_as_secret
            and deployment_config.schedule is None
        ):
            secret_name = orchestrator.get_token_secret_name(
                deployment_config.id
            )
            try:
                kube_utils.delete_secret(
                    core_api=core_api,
                    namespace=args.kubernetes_namespace,
                    secret_name=secret_name,
                )
            except k8s_client.rest.ApiException as e:
                logger.error(f"Error cleaning up secret {secret_name}: {e}")
parse_args() -> argparse.Namespace

Parse entrypoint arguments.

Returns:

Type Description
Namespace

Parsed args.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
50
51
52
53
54
55
56
57
58
59
60
61
def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.

    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--run_name", type=str, required=True)
    parser.add_argument("--deployment_id", type=str, required=True)
    parser.add_argument("--kubernetes_namespace", type=str, required=True)
    parser.add_argument("--run_id", type=str, required=False)
    return parser.parse_args()
Modules
kubernetes_orchestrator_entrypoint_configuration

Entrypoint configuration for the Kubernetes master/orchestrator pod.

Classes
KubernetesOrchestratorEntrypointConfiguration

Entrypoint configuration for the k8s master/orchestrator pod.

Functions
get_entrypoint_arguments(run_name: str, deployment_id: UUID, kubernetes_namespace: str, run_id: Optional[UUID] = None) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
run_name str

Name of the ZenML run.

required
deployment_id UUID

ID of the deployment.

required
kubernetes_namespace str

Name of the Kubernetes namespace.

required
run_id Optional[UUID]

Optional ID of the pipeline run. Not set for scheduled runs.

None

Returns:

Type Description
List[str]

List of entrypoint arguments.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@classmethod
def get_entrypoint_arguments(
    cls,
    run_name: str,
    deployment_id: "UUID",
    kubernetes_namespace: str,
    run_id: Optional["UUID"] = None,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        run_name: Name of the ZenML run.
        deployment_id: ID of the deployment.
        kubernetes_namespace: Name of the Kubernetes namespace.
        run_id: Optional ID of the pipeline run. Not set for scheduled runs.

    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{RUN_NAME_OPTION}",
        run_name,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
        f"--{NAMESPACE_OPTION}",
        kubernetes_namespace,
    ]

    if run_id:
        args.append(f"--{RUN_ID_OPTION}")
        args.append(str(run_id))

    return args
get_entrypoint_command() -> List[str] classmethod

Returns a command that runs the entrypoint module.

Returns:

Type Description
List[str]

Entrypoint command.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
44
45
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options() -> Set[str] classmethod

Gets all the options required for running this entrypoint.

Returns:

Type Description
Set[str]

Entrypoint options.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
30
31
32
33
34
35
36
37
38
39
40
41
42
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.

    Returns:
        Entrypoint options.
    """
    options = {
        RUN_NAME_OPTION,
        DEPLOYMENT_ID_OPTION,
        NAMESPACE_OPTION,
    }
    return options
manifest_utils

Utility functions for building manifests for k8s pods.

Classes Functions
add_local_stores_mount(pod_spec: k8s_client.V1PodSpec) -> None

Makes changes in place to the configuration of the pod spec.

Configures mounted volumes for stack components that write to a local path.

Parameters:

Name Type Description Default
pod_spec V1PodSpec

The pod spec to update.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def add_local_stores_mount(
    pod_spec: k8s_client.V1PodSpec,
) -> None:
    """Makes changes in place to the configuration of the pod spec.

    Configures mounted volumes for stack components that write to a local
    path.

    Args:
        pod_spec: The pod spec to update.
    """
    assert len(pod_spec.containers) == 1
    container_spec: k8s_client.V1Container = pod_spec.containers[0]

    stack = Client().active_stack

    stack.check_local_paths()

    local_stores_path = GlobalConfiguration().local_stores_path

    host_path = k8s_client.V1HostPathVolumeSource(
        path=local_stores_path, type="Directory"
    )

    pod_spec.volumes = pod_spec.volumes or []
    pod_spec.volumes.append(
        k8s_client.V1Volume(
            name="local-stores",
            host_path=host_path,
        )
    )
    container_spec.volume_mounts = container_spec.volume_mounts or []
    container_spec.volume_mounts.append(
        k8s_client.V1VolumeMount(
            name="local-stores",
            mount_path=local_stores_path,
        )
    )

    if sys.platform == "win32":
        # File permissions are not checked on Windows. This if clause
        # prevents mypy from complaining about unused 'type: ignore'
        # statements
        pass
    else:
        # Run KFP containers in the context of the local UID/GID
        # to ensure that the local stores can be shared
        # with the local pipeline runs.
        pod_spec.security_context = k8s_client.V1SecurityContext(
            run_as_user=os.getuid(),
            run_as_group=os.getgid(),
        )

    container_spec.env = container_spec.env or []
    container_spec.env.append(
        k8s_client.V1EnvVar(
            name=ENV_ZENML_LOCAL_STORES_PATH,
            value=local_stores_path,
        )
    )
add_pod_settings(pod_spec: k8s_client.V1PodSpec, settings: KubernetesPodSettings) -> None

Updates pod spec fields in place if passed in orchestrator settings.

Parameters:

Name Type Description Default
pod_spec V1PodSpec

Pod spec to update.

required
settings KubernetesPodSettings

Pod settings to apply.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def add_pod_settings(
    pod_spec: k8s_client.V1PodSpec,
    settings: KubernetesPodSettings,
) -> None:
    """Updates pod `spec` fields in place if passed in orchestrator settings.

    Args:
        pod_spec: Pod spec to update.
        settings: Pod settings to apply.
    """
    if settings.node_selectors:
        pod_spec.node_selector = settings.node_selectors

    if settings.affinity:
        pod_spec.affinity = settings.affinity

    if settings.tolerations:
        pod_spec.tolerations = settings.tolerations

    for container in pod_spec.containers:
        assert isinstance(container, k8s_client.V1Container)
        container._resources = settings.resources
        if settings.volume_mounts:
            if container.volume_mounts:
                container.volume_mounts.extend(settings.volume_mounts)
            else:
                container.volume_mounts = settings.volume_mounts

        if settings.env:
            if container.env:
                container.env.extend(settings.env)
            else:
                container.env = settings.env

        if settings.env_from:
            if container.env_from:
                container.env_from.extend(settings.env_from)
            else:
                container.env_from = settings.env_from

    if settings.volumes:
        if pod_spec.volumes:
            pod_spec.volumes.extend(settings.volumes)
        else:
            pod_spec.volumes = settings.volumes

    if settings.host_ipc:
        pod_spec.host_ipc = settings.host_ipc
build_cron_job_manifest(cron_expression: str, pod_name: str, run_name: str, pipeline_name: str, image_name: str, command: List[str], args: List[str], privileged: bool, pod_settings: Optional[KubernetesPodSettings] = None, service_account_name: Optional[str] = None, env: Optional[Dict[str, str]] = None, mount_local_stores: bool = False) -> k8s_client.V1CronJob

Create a manifest for launching a pod as scheduled CRON job.

Parameters:

Name Type Description Default
cron_expression str

CRON job schedule expression, e.g. " * * * ".

required
pod_name str

Name of the pod.

required
run_name str

Name of the ZenML run.

required
pipeline_name str

Name of the ZenML pipeline.

required
image_name str

Name of the Docker image.

required
command List[str]

Command to execute the entrypoint in the pod.

required
args List[str]

Arguments provided to the entrypoint command.

required
privileged bool

Whether to run the container in privileged mode.

required
pod_settings Optional[KubernetesPodSettings]

Optional settings for the pod.

None
service_account_name Optional[str]

Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster.

None
env Optional[Dict[str, str]]

Environment variables to set.

None
mount_local_stores bool

Whether to mount the local stores path inside the pod.

False

Returns:

Type Description
V1CronJob

CRON job manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def build_cron_job_manifest(
    cron_expression: str,
    pod_name: str,
    run_name: str,
    pipeline_name: str,
    image_name: str,
    command: List[str],
    args: List[str],
    privileged: bool,
    pod_settings: Optional[KubernetesPodSettings] = None,
    service_account_name: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    mount_local_stores: bool = False,
) -> k8s_client.V1CronJob:
    """Create a manifest for launching a pod as scheduled CRON job.

    Args:
        cron_expression: CRON job schedule expression, e.g. "* * * * *".
        pod_name: Name of the pod.
        run_name: Name of the ZenML run.
        pipeline_name: Name of the ZenML pipeline.
        image_name: Name of the Docker image.
        command: Command to execute the entrypoint in the pod.
        args: Arguments provided to the entrypoint command.
        privileged: Whether to run the container in privileged mode.
        pod_settings: Optional settings for the pod.
        service_account_name: Optional name of a service account.
            Can be used to assign certain roles to a pod, e.g., to allow it to
            run Kubernetes commands from within the cluster.
        env: Environment variables to set.
        mount_local_stores: Whether to mount the local stores path inside the
            pod.

    Returns:
        CRON job manifest.
    """
    pod_manifest = build_pod_manifest(
        pod_name=pod_name,
        run_name=run_name,
        pipeline_name=pipeline_name,
        image_name=image_name,
        command=command,
        args=args,
        privileged=privileged,
        pod_settings=pod_settings,
        service_account_name=service_account_name,
        env=env,
        mount_local_stores=mount_local_stores,
    )

    job_spec = k8s_client.V1CronJobSpec(
        schedule=cron_expression,
        job_template=k8s_client.V1JobTemplateSpec(
            metadata=pod_manifest.metadata,
            spec=k8s_client.V1JobSpec(
                template=k8s_client.V1PodTemplateSpec(
                    metadata=pod_manifest.metadata,
                    spec=pod_manifest.spec,
                ),
            ),
        ),
    )

    job_manifest = k8s_client.V1CronJob(
        kind="CronJob",
        api_version="batch/v1",
        metadata=pod_manifest.metadata,
        spec=job_spec,
    )

    return job_manifest
build_namespace_manifest(namespace: str) -> Dict[str, Any]

Build the manifest for a new namespace.

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace.

required

Returns:

Type Description
Dict[str, Any]

Manifest of the new namespace.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def build_namespace_manifest(namespace: str) -> Dict[str, Any]:
    """Build the manifest for a new namespace.

    Args:
        namespace: Kubernetes namespace.

    Returns:
        Manifest of the new namespace.
    """
    return {
        "apiVersion": "v1",
        "kind": "Namespace",
        "metadata": {
            "name": namespace,
        },
    }
build_pod_manifest(pod_name: str, run_name: str, pipeline_name: str, image_name: str, command: List[str], args: List[str], privileged: bool, pod_settings: Optional[KubernetesPodSettings] = None, service_account_name: Optional[str] = None, env: Optional[Dict[str, str]] = None, mount_local_stores: bool = False) -> k8s_client.V1Pod

Build a Kubernetes pod manifest for a ZenML run or step.

Parameters:

Name Type Description Default
pod_name str

Name of the pod.

required
run_name str

Name of the ZenML run.

required
pipeline_name str

Name of the ZenML pipeline.

required
image_name str

Name of the Docker image.

required
command List[str]

Command to execute the entrypoint in the pod.

required
args List[str]

Arguments provided to the entrypoint command.

required
privileged bool

Whether to run the container in privileged mode.

required
pod_settings Optional[KubernetesPodSettings]

Optional settings for the pod.

None
service_account_name Optional[str]

Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster.

None
env Optional[Dict[str, str]]

Environment variables to set.

None
mount_local_stores bool

Whether to mount the local stores path inside the pod.

False

Returns:

Type Description
V1Pod

Pod manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def build_pod_manifest(
    pod_name: str,
    run_name: str,
    pipeline_name: str,
    image_name: str,
    command: List[str],
    args: List[str],
    privileged: bool,
    pod_settings: Optional[KubernetesPodSettings] = None,
    service_account_name: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    mount_local_stores: bool = False,
) -> k8s_client.V1Pod:
    """Build a Kubernetes pod manifest for a ZenML run or step.

    Args:
        pod_name: Name of the pod.
        run_name: Name of the ZenML run.
        pipeline_name: Name of the ZenML pipeline.
        image_name: Name of the Docker image.
        command: Command to execute the entrypoint in the pod.
        args: Arguments provided to the entrypoint command.
        privileged: Whether to run the container in privileged mode.
        pod_settings: Optional settings for the pod.
        service_account_name: Optional name of a service account.
            Can be used to assign certain roles to a pod, e.g., to allow it to
            run Kubernetes commands from within the cluster.
        env: Environment variables to set.
        mount_local_stores: Whether to mount the local stores path inside the
            pod.

    Returns:
        Pod manifest.
    """
    env = env.copy() if env else {}
    env.setdefault(ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, "False")

    security_context = k8s_client.V1SecurityContext(privileged=privileged)
    container_spec = k8s_client.V1Container(
        name="main",
        image=image_name,
        command=command,
        args=args,
        env=[
            k8s_client.V1EnvVar(name=name, value=value)
            for name, value in env.items()
        ],
        security_context=security_context,
    )
    image_pull_secrets = []
    if pod_settings:
        image_pull_secrets = [
            k8s_client.V1LocalObjectReference(name=name)
            for name in pod_settings.image_pull_secrets
        ]

    pod_spec = k8s_client.V1PodSpec(
        containers=[container_spec],
        restart_policy="Never",
        image_pull_secrets=image_pull_secrets,
    )

    if service_account_name is not None:
        pod_spec.service_account_name = service_account_name

    labels = {}

    if pod_settings:
        add_pod_settings(pod_spec, pod_settings)

        # Add pod_settings.labels to the labels
        if pod_settings.labels:
            labels.update(pod_settings.labels)

    # Add run_name and pipeline_name to the labels
    labels.update(
        {
            "run": kube_utils.sanitize_label(run_name),
            "pipeline": kube_utils.sanitize_label(pipeline_name),
        }
    )

    pod_metadata = k8s_client.V1ObjectMeta(
        name=pod_name,
        labels=labels,
    )

    if pod_settings and pod_settings.annotations:
        pod_metadata.annotations = pod_settings.annotations

    pod_manifest = k8s_client.V1Pod(
        kind="Pod",
        api_version="v1",
        metadata=pod_metadata,
        spec=pod_spec,
    )

    if mount_local_stores:
        add_local_stores_mount(pod_spec)

    return pod_manifest
build_role_binding_manifest_for_service_account(name: str, role_name: str, service_account_name: str, namespace: str = 'default') -> Dict[str, Any]

Build a manifest for a role binding of a service account.

Parameters:

Name Type Description Default
name str

Name of the cluster role binding.

required
role_name str

Name of the role.

required
service_account_name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

'default'

Returns:

Type Description
Dict[str, Any]

Manifest for a cluster role binding of a service account.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def build_role_binding_manifest_for_service_account(
    name: str,
    role_name: str,
    service_account_name: str,
    namespace: str = "default",
) -> Dict[str, Any]:
    """Build a manifest for a role binding of a service account.

    Args:
        name: Name of the cluster role binding.
        role_name: Name of the role.
        service_account_name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".

    Returns:
        Manifest for a cluster role binding of a service account.
    """
    return {
        "apiVersion": "rbac.authorization.k8s.io/v1",
        "kind": "RoleBinding",
        "metadata": {"name": name},
        "subjects": [
            {
                "kind": "ServiceAccount",
                "name": service_account_name,
                "namespace": namespace,
            }
        ],
        "roleRef": {
            "kind": "ClusterRole",
            "name": role_name,
            "apiGroup": "rbac.authorization.k8s.io",
        },
    }
build_secret_manifest(name: str, data: Mapping[str, Optional[str]], secret_type: str = 'Opaque') -> Dict[str, Any]

Builds a Kubernetes secret manifest.

Parameters:

Name Type Description Default
name str

Name of the secret.

required
data Mapping[str, Optional[str]]

The secret data.

required
secret_type str

The secret type.

'Opaque'

Returns:

Type Description
Dict[str, Any]

The secret manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def build_secret_manifest(
    name: str,
    data: Mapping[str, Optional[str]],
    secret_type: str = "Opaque",
) -> Dict[str, Any]:
    """Builds a Kubernetes secret manifest.

    Args:
        name: Name of the secret.
        data: The secret data.
        secret_type: The secret type.

    Returns:
        The secret manifest.
    """
    encoded_data = {
        key: base64.b64encode(value.encode()).decode() if value else None
        for key, value in data.items()
    }

    return {
        "apiVersion": "v1",
        "kind": "Secret",
        "metadata": {
            "name": name,
        },
        "type": secret_type,
        "data": encoded_data,
    }
build_service_account_manifest(name: str, namespace: str = 'default') -> Dict[str, Any]

Build the manifest for a service account.

Parameters:

Name Type Description Default
name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

'default'

Returns:

Type Description
Dict[str, Any]

Manifest for a service account.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def build_service_account_manifest(
    name: str, namespace: str = "default"
) -> Dict[str, Any]:
    """Build the manifest for a service account.

    Args:
        name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".

    Returns:
        Manifest for a service account.
    """
    return {
        "apiVersion": "v1",
        "metadata": {
            "name": name,
            "namespace": namespace,
        },
    }
Modules

pod_settings

Kubernetes pod settings.

Classes
KubernetesPodSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Kubernetes Pod settings.

Attributes:

Name Type Description
node_selectors Dict[str, str]

Node selectors to apply to the pod.

affinity Dict[str, Any]

Affinity to apply to the pod.

tolerations List[Dict[str, Any]]

Tolerations to apply to the pod.

resources Dict[str, Dict[str, str]]

Resource requests and limits for the pod.

annotations Dict[str, str]

Annotations to apply to the pod metadata.

volumes List[Dict[str, Any]]

Volumes to mount in the pod.

volume_mounts List[Dict[str, Any]]

Volume mounts to apply to the pod containers.

host_ipc bool

Whether to enable host IPC for the pod.

image_pull_secrets List[str]

Image pull secrets to use for the pod.

labels Dict[str, str]

Labels to apply to the pod.

env List[Dict[str, Any]]

Environment variables to apply to the container.

env_from List[Dict[str, Any]]

Environment variables to apply to the container.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules

serialization_utils

Kubernetes serialization utils.

Functions
deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any

Deserializes a Kubernetes model.

Parameters:

Name Type Description Default
data Dict[str, Any]

The model data.

required
class_name str

Name of the Kubernetes model class.

required

Raises:

Type Description
KeyError

If the data contains values for an invalid attribute.

Returns:

Type Description
Any

The deserialized model.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any:
    """Deserializes a Kubernetes model.

    Args:
        data: The model data.
        class_name: Name of the Kubernetes model class.

    Raises:
        KeyError: If the data contains values for an invalid attribute.

    Returns:
        The deserialized model.
    """
    model_class = get_model_class(class_name=class_name)
    assert hasattr(model_class, "openapi_types")
    assert hasattr(model_class, "attribute_map")
    # Mapping of the attribute name of the model class to the attribute type
    type_mapping = cast(Dict[str, str], model_class.openapi_types)
    reverse_attribute_mapping = cast(Dict[str, str], model_class.attribute_map)
    # Mapping of the serialized key to the attribute name of the model class
    attribute_mapping = {
        value: key for key, value in reverse_attribute_mapping.items()
    }

    deserialized_attributes: Dict[str, Any] = {}

    for key, value in data.items():
        if key not in attribute_mapping:
            raise KeyError(
                f"Got value for attribute {key} which is not one of the "
                f"available attributes {set(attribute_mapping)}."
            )

        attribute_name = attribute_mapping[key]
        attribute_class = type_mapping[attribute_name]

        if not value:
            deserialized_attributes[attribute_name] = value
        elif attribute_class.startswith("list["):
            match = re.fullmatch(r"list\[(.*)\]", attribute_class)
            assert match
            inner_class = match.group(1)
            deserialized_attributes[attribute_name] = _deserialize_list(
                value, class_name=inner_class
            )
        elif attribute_class.startswith("dict("):
            match = re.fullmatch(r"dict\(([^,]*), (.*)\)", attribute_class)
            assert match
            inner_class = match.group(1)
            deserialized_attributes[attribute_name] = _deserialize_dict(
                value, class_name=inner_class
            )
        elif is_model_class(attribute_class):
            deserialized_attributes[attribute_name] = (
                deserialize_kubernetes_model(value, attribute_class)
            )
        else:
            deserialized_attributes[attribute_name] = value

    return model_class(**deserialized_attributes)
get_model_class(class_name: str) -> Type[Any]

Gets a Kubernetes model class.

Parameters:

Name Type Description Default
class_name str

Name of the class to get.

required

Raises:

Type Description
TypeError

If no Kubernetes model class exists for this name.

Returns:

Type Description
Type[Any]

The model class.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def get_model_class(class_name: str) -> Type[Any]:
    """Gets a Kubernetes model class.

    Args:
        class_name: Name of the class to get.

    Raises:
        TypeError: If no Kubernetes model class exists for this name.

    Returns:
        The model class.
    """
    import kubernetes.client.models

    class_ = getattr(kubernetes.client.models, class_name, None)

    if not class_:
        raise TypeError(
            f"Unable to find kubernetes model class with name {class_name}."
        )

    assert isinstance(class_, type)
    return class_
is_model_class(class_name: str) -> bool

Checks whether the given class name is a Kubernetes model class.

Parameters:

Name Type Description Default
class_name str

Name of the class to check.

required

Returns:

Type Description
bool

If the given class name is a Kubernetes model class.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
152
153
154
155
156
157
158
159
160
161
162
163
def is_model_class(class_name: str) -> bool:
    """Checks whether the given class name is a Kubernetes model class.

    Args:
        class_name: Name of the class to check.

    Returns:
        If the given class name is a Kubernetes model class.
    """
    import kubernetes.client.models

    return hasattr(kubernetes.client.models, class_name)
serialize_kubernetes_model(model: Any) -> Dict[str, Any]

Serializes a Kubernetes model.

Parameters:

Name Type Description Default
model Any

The model to serialize.

required

Raises:

Type Description
TypeError

If the model is not a Kubernetes model.

Returns:

Type Description
Dict[str, Any]

The serialized model.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def serialize_kubernetes_model(model: Any) -> Dict[str, Any]:
    """Serializes a Kubernetes model.

    Args:
        model: The model to serialize.

    Raises:
        TypeError: If the model is not a Kubernetes model.

    Returns:
        The serialized model.
    """
    if not is_model_class(model.__class__.__name__):
        raise TypeError(f"Unable to serialize non-kubernetes model {model}.")

    assert hasattr(model, "attribute_map")
    attribute_mapping = cast(Dict[str, str], model.attribute_map)

    model_attributes = {
        serialized_attribute_name: getattr(model, attribute_name)
        for attribute_name, serialized_attribute_name in attribute_mapping.items()
    }
    return _serialize_dict(model_attributes)

service_connectors

Kubernetes Service Connector.

Classes
KubernetesServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Kubernetes service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Modules
kubernetes_service_connector

Kubernetes Service Connector.

The Kubernetes Service Connector implements various authentication methods for Kubernetes clusters.

Classes
KubernetesAuthenticationMethods

Bases: StrEnum

Kubernetes Authentication methods.

KubernetesBaseConfig

Bases: KubernetesServerConfig

Kubernetes basic config.

KubernetesServerConfig

Bases: KubernetesServerCredentials

Kubernetes server config.

KubernetesServerCredentials

Bases: AuthenticationConfig

Kubernetes server authentication config.

KubernetesServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Kubernetes service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
KubernetesTokenConfig

Bases: KubernetesBaseConfig, KubernetesTokenCredentials

Kubernetes token config.

KubernetesTokenCredentials

Bases: AuthenticationConfig

Kubernetes token authentication config.

KubernetesUserPasswordConfig

Bases: KubernetesBaseConfig, KubernetesUserPasswordCredentials

Kubernetes user/pass config.

KubernetesUserPasswordCredentials

Bases: AuthenticationConfig

Kubernetes user/pass authentication config.

Functions

step_operators

Kubernetes step operator.

Classes
KubernetesStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run on Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesStepOperatorConfig property

Returns the KubernetesStepOperatorConfig config.

Returns:

Type Description
KubernetesStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_kube_client() -> k8s_client.ApiClient

Get the Kubernetes API client.

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

If the service connector returns an unexpected client.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_kube_client(self) -> k8s_client.ApiClient:
    """Get the Kubernetes API client.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: If the service connector returns an unexpected client.
    """
    if self.config.incluster:
        kube_utils.load_kube_config(incluster=True)
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on Kubernetes.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on Kubernetes.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(
        KubernetesStepOperatorSettings, self.get_settings(info)
    )
    image_name = info.get_image(
        key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
    )

    pod_name = f"{info.run_name}_{info.pipeline_step_name}"
    pod_name = kube_utils.sanitize_pod_name(
        pod_name, namespace=self.config.kubernetes_namespace
    )

    command = entrypoint_command[:3]
    args = entrypoint_command[3:]

    # Create and run the orchestrator pod.
    pod_manifest = build_pod_manifest(
        run_name=info.run_name,
        pod_name=pod_name,
        pipeline_name=info.pipeline.name,
        image_name=image_name,
        command=command,
        args=args,
        privileged=settings.privileged,
        service_account_name=settings.service_account_name,
        pod_settings=settings.pod_settings,
        env=environment,
        mount_local_stores=False,
    )

    self._k8s_core_api.create_namespaced_pod(
        namespace=self.config.kubernetes_namespace,
        body=pod_manifest,
    )

    logger.info(
        "Waiting for pod of step `%s` to start...", info.pipeline_step_name
    )
    kube_utils.wait_pod(
        kube_client_fn=self.get_kube_client,
        pod_name=pod_name,
        namespace=self.config.kubernetes_namespace,
        exit_condition_lambda=kube_utils.pod_is_done,
        stream_logs=True,
    )
    logger.info("Pod of step `%s` completed.", info.pipeline_step_name)
Modules
kubernetes_step_operator

Kubernetes step operator implementation.

Classes
KubernetesStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run on Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesStepOperatorConfig property

Returns the KubernetesStepOperatorConfig config.

Returns:

Type Description
KubernetesStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_kube_client() -> k8s_client.ApiClient

Get the Kubernetes API client.

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

If the service connector returns an unexpected client.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_kube_client(self) -> k8s_client.ApiClient:
    """Get the Kubernetes API client.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: If the service connector returns an unexpected client.
    """
    if self.config.incluster:
        kube_utils.load_kube_config(incluster=True)
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on Kubernetes.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on Kubernetes.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(
        KubernetesStepOperatorSettings, self.get_settings(info)
    )
    image_name = info.get_image(
        key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
    )

    pod_name = f"{info.run_name}_{info.pipeline_step_name}"
    pod_name = kube_utils.sanitize_pod_name(
        pod_name, namespace=self.config.kubernetes_namespace
    )

    command = entrypoint_command[:3]
    args = entrypoint_command[3:]

    # Create and run the orchestrator pod.
    pod_manifest = build_pod_manifest(
        run_name=info.run_name,
        pod_name=pod_name,
        pipeline_name=info.pipeline.name,
        image_name=image_name,
        command=command,
        args=args,
        privileged=settings.privileged,
        service_account_name=settings.service_account_name,
        pod_settings=settings.pod_settings,
        env=environment,
        mount_local_stores=False,
    )

    self._k8s_core_api.create_namespaced_pod(
        namespace=self.config.kubernetes_namespace,
        body=pod_manifest,
    )

    logger.info(
        "Waiting for pod of step `%s` to start...", info.pipeline_step_name
    )
    kube_utils.wait_pod(
        kube_client_fn=self.get_kube_client,
        pod_name=pod_name,
        namespace=self.config.kubernetes_namespace,
        exit_condition_lambda=kube_utils.pod_is_done,
        stream_logs=True,
    )
    logger.info("Pod of step `%s` completed.", info.pipeline_step_name)
Functions Modules