Skip to content

Kubernetes

zenml.integrations.kubernetes

Kubernetes integration for Kubernetes-native orchestration.

The Kubernetes integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubernetes orchestrator with the CLI tool.

Attributes

KUBERNETES = 'kubernetes' module-attribute

KUBERNETES_ORCHESTRATOR_FLAVOR = 'kubernetes' module-attribute

KUBERNETES_STEP_OPERATOR_FLAVOR = 'kubernetes' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        # Get the config class name to point to the specific class
        config_class_name = self.config_class.__name__

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}"
            f"#zenml.integrations.{integration}.flavors.{config_class_name}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
140
141
142
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for requirement in cls.get_requirements():
        parsed_requirement = Requirement(requirement)

        if not requirement_installed(parsed_requirement):
            logger.debug(
                "Requirement '%s' for integration '%s' is not installed "
                "or installed with the wrong version.",
                requirement,
                cls.NAME,
            )
            return False

        dependencies = get_dependencies(parsed_requirement)

        for dependency in dependencies:
            if not requirement_installed(dependency):
                logger.debug(
                    "Requirement '%s' for integration '%s' is not "
                    "installed or installed with the wrong version.",
                    dependency,
                    cls.NAME,
                )
                return False

    logger.debug(
        f"Integration '{cls.NAME}' is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
144
145
146
147
148
149
150
151
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
153
154
155
156
157
158
159
160
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

KubernetesIntegration

Bases: Integration

Definition of Kubernetes integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Kubernetes integration.

Returns:

Type Description
List[Type[Flavor]]

List of new stack component flavors.

Source code in src/zenml/integrations/kubernetes/__init__.py
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Kubernetes integration.

    Returns:
        List of new stack component flavors.
    """
    from zenml.integrations.kubernetes.flavors import (
        KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor
    )

    return [KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor]

Modules

constants

Kubernetes orchestrator constants.

flavors

Kubernetes integration flavors.

Classes
KubernetesOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, KubernetesOrchestratorSettings

Configuration for the Kubernetes orchestrator.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
handles_step_retries: bool property

Whether the orchestrator handles step retries.

Returns:

Type Description
bool

Whether the orchestrator handles step retries.

is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

KubernetesOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Kubernetes orchestrator flavor.

Attributes
config_class: Type[KubernetesOrchestratorConfig] property

Returns KubernetesOrchestratorConfig config class.

Returns:

Type Description
Type[KubernetesOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes orchestrator.

Configuration options for how pipelines are executed on Kubernetes clusters. Field descriptions are defined inline using Field() descriptors.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
KubernetesStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, KubernetesStepOperatorSettings

Configuration for the Kubernetes step operator.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

KubernetesStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Kubernetes step operator flavor.

Attributes
config_class: Type[KubernetesStepOperatorConfig] property

Returns KubernetesStepOperatorConfig config class.

Returns:

Type Description
Type[KubernetesStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes step operator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
kubernetes_orchestrator_flavor

Kubernetes orchestrator flavor.

Classes
KubernetesOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, KubernetesOrchestratorSettings

Configuration for the Kubernetes orchestrator.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
handles_step_retries: bool property

Whether the orchestrator handles step retries.

Returns:

Type Description
bool

Whether the orchestrator handles step retries.

is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

supports_client_side_caching: bool property

Whether the orchestrator supports client side caching.

Returns:

Type Description
bool

Whether the orchestrator supports client side caching.

KubernetesOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Kubernetes orchestrator flavor.

Attributes
config_class: Type[KubernetesOrchestratorConfig] property

Returns KubernetesOrchestratorConfig config class.

Returns:

Type Description
Type[KubernetesOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes orchestrator.

Configuration options for how pipelines are executed on Kubernetes clusters. Field descriptions are defined inline using Field() descriptors.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
kubernetes_step_operator_flavor

Kubernetes step operator flavor.

Classes
KubernetesStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, KubernetesStepOperatorSettings

Configuration for the Kubernetes step operator.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

KubernetesStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Kubernetes step operator flavor.

Attributes
config_class: Type[KubernetesStepOperatorConfig] property

Returns KubernetesStepOperatorConfig config class.

Returns:

Type Description
Type[KubernetesStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubernetesStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubernetes step operator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules

orchestrators

Kubernetes-native orchestration.

Classes
KubernetesOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator for running ZenML pipelines using native Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesOrchestratorConfig property

Returns the KubernetesOrchestratorConfig config.

Returns:

Type Description
KubernetesOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Defines the validator that checks whether the stack is valid.

Returns:

Type Description
Optional[StackValidator]

Stack validator.

Functions
delete_schedule(schedule: ScheduleResponse) -> None

Deletes a schedule.

Parameters:

Name Type Description Default
schedule ScheduleResponse

The schedule to delete.

required

Raises:

Type Description
RuntimeError

If the cron job name is not found.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
def delete_schedule(self, schedule: "ScheduleResponse") -> None:
    """Deletes a schedule.

    Args:
        schedule: The schedule to delete.

    Raises:
        RuntimeError: If the cron job name is not found.
    """
    cron_job_name = schedule.run_metadata.get(
        KUBERNETES_CRON_JOB_METADATA_KEY
    )
    if not cron_job_name:
        raise RuntimeError("Unable to find cron job name for schedule.")

    self._k8s_batch_api.delete_namespaced_cron_job(
        name=cron_job_name,
        namespace=self.config.kubernetes_namespace,
    )
fetch_status(run: PipelineRunResponse, include_steps: bool = False) -> Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required
include_steps bool

If True, also fetch the status of individual steps.

False

Returns:

Type Description
Optional[ExecutionStatus]

A tuple of (pipeline_status, step_statuses).

Optional[Dict[str, ExecutionStatus]]

If include_steps is False, step_statuses will be None.

Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]

If include_steps is True, step_statuses will be a dict (possibly empty).

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
def fetch_status(
    self, run: "PipelineRunResponse", include_steps: bool = False
) -> Tuple[
    Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]
]:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.
        include_steps: If True, also fetch the status of individual steps.

    Returns:
        A tuple of (pipeline_status, step_statuses).
        If include_steps is False, step_statuses will be None.
        If include_steps is True, step_statuses will be a dict (possibly empty).
    """
    pipeline_status = None
    include_run_status = not run.status.is_finished

    label_selector = f"run_id={kube_utils.sanitize_label(str(run.id))}"
    try:
        job_list = kube_utils.list_jobs(
            batch_api=self._k8s_batch_api,
            namespace=self.config.kubernetes_namespace,
            label_selector=label_selector,
        )
    except Exception as e:
        logger.warning(f"Failed to list jobs for run {run.id}: {e}")
        return None, None

    step_statuses = {}
    # Only fetch steps if we really need them
    steps_dict = run.steps if include_steps else {}

    for job in job_list.items:
        if not job.metadata or not job.metadata.annotations:
            continue

        is_orchestrator_job = (
            ORCHESTRATOR_ANNOTATION_KEY in job.metadata.annotations
        )
        if is_orchestrator_job:
            if include_run_status:
                pipeline_status = self._map_job_status_to_execution_status(
                    job
                )
            continue

        step_name = job.metadata.annotations.get(
            STEP_NAME_ANNOTATION_KEY, None
        )
        if not include_steps or not step_name:
            continue

        step_response = steps_dict.get(step_name, None)

        if step_response is None:
            continue

        # If the step is already in a finished state, skip
        if step_response and step_response.status.is_finished:
            continue

        execution_status = self._map_job_status_to_execution_status(job)
        if execution_status is not None:
            step_statuses[step_name] = execution_status

    return pipeline_status, step_statuses
get_kube_client(incluster: Optional[bool] = None) -> k8s_client.ApiClient

Getter for the Kubernetes API client.

Parameters:

Name Type Description Default
incluster Optional[bool]

Whether to use the in-cluster config or not. Overrides the incluster setting in the config.

None

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

if the Kubernetes connector behaves unexpectedly.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def get_kube_client(
    self, incluster: Optional[bool] = None
) -> k8s_client.ApiClient:
    """Getter for the Kubernetes API client.

    Args:
        incluster: Whether to use the in-cluster config or not. Overrides
            the `incluster` setting in the config.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: if the Kubernetes connector behaves unexpectedly.
    """
    if incluster is None:
        incluster = self.config.incluster

    if incluster:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
get_kubernetes_contexts() -> Tuple[List[str], str]

Get list of configured Kubernetes contexts and the active context.

Raises:

Type Description
RuntimeError

if the Kubernetes configuration cannot be loaded.

Returns:

Name Type Description
context_name List[str]

List of configured Kubernetes contexts

active_context_name str

Name of the active Kubernetes context.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
    """Get list of configured Kubernetes contexts and the active context.

    Raises:
        RuntimeError: if the Kubernetes configuration cannot be loaded.

    Returns:
        context_name: List of configured Kubernetes contexts
        active_context_name: Name of the active Kubernetes context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException as e:
        raise RuntimeError(
            "Could not load the Kubernetes configuration"
        ) from e

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_KUBERNETES_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
876
877
878
879
880
881
882
883
884
885
886
887
888
889
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    return {
        METADATA_ORCHESTRATOR_RUN_ID: self.get_orchestrator_run_id(),
    }
get_token_secret_name(deployment_id: UUID) -> str

Returns the name of the secret that contains the ZenML token.

Parameters:

Name Type Description Default
deployment_id UUID

The ID of the deployment.

required

Returns:

Type Description
str

The name of the secret that contains the ZenML token.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
372
373
374
375
376
377
378
379
380
381
def get_token_secret_name(self, deployment_id: UUID) -> str:
    """Returns the name of the secret that contains the ZenML token.

    Args:
        deployment_id: The ID of the deployment.

    Returns:
        The name of the secret that contains the ZenML token.
    """
    return f"zenml-token-{deployment_id}"
should_build_pipeline_image(deployment: PipelineDeploymentBase) -> bool

Whether to always build the pipeline image.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment.

required

Returns:

Type Description
bool

Whether to always build the pipeline image.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def should_build_pipeline_image(
    self, deployment: "PipelineDeploymentBase"
) -> bool:
    """Whether to always build the pipeline image.

    Args:
        deployment: The pipeline deployment.

    Returns:
        Whether to always build the pipeline image.
    """
    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )
    return settings.always_build_pipeline_image
submit_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submits a pipeline to the orchestrator.

This method should only submit the pipeline and not wait for it to complete. If the orchestrator is configured to wait for the pipeline run to complete, a function that waits for the pipeline run to complete can be passed as part of the submission result.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to submit.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment. These don't need to be set if running locally.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If a schedule without cron expression is given.

Exception

If the orchestrator pod fails to start.

Returns:

Type Description
Optional[SubmissionResult]

Optional submission result.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def submit_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submits a pipeline to the orchestrator.

    This method should only submit the pipeline and not wait for it to
    complete. If the orchestrator is configured to wait for the pipeline run
    to complete, a function that waits for the pipeline run to complete can
    be passed as part of the submission result.

    Args:
        deployment: The pipeline deployment to submit.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment. These don't need to be set if running locally.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If a schedule without cron expression is given.
        Exception: If the orchestrator pod fails to start.

    Returns:
        Optional submission result.
    """
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not yet supported for "
                "the Kubernetes orchestrator, ignoring resource "
                "configuration for step %s.",
                step_name,
            )

        if retry_config := step.config.retry:
            if retry_config.delay or retry_config.backoff:
                logger.warning(
                    "Specifying retry delay or backoff is not supported "
                    "for the Kubernetes orchestrator."
                )

    pipeline_name = deployment.pipeline_configuration.name
    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    # Build entrypoint command and args for the orchestrator pod.
    # This will internally also build the command/args for all step pods.
    command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
    args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
        deployment_id=deployment.id,
        run_id=placeholder_run.id if placeholder_run else None,
    )

    # Authorize pod to run Kubernetes commands inside the cluster.
    service_account_name = self._get_service_account_name(settings)

    # We set some default minimum resource requests for the orchestrator pod
    # here if the user has not specified any, because the orchestrator pod
    # takes up some memory resources itself and, if not specified, the pod
    # will be scheduled on any node regardless of available memory and risk
    # negatively impacting or even crashing the node due to memory pressure.
    orchestrator_pod_settings = kube_utils.apply_default_resource_requests(
        memory="400Mi",
        cpu="100m",
        pod_settings=settings.orchestrator_pod_settings,
    )

    if self.config.pass_zenml_token_as_secret:
        secret_name = self.get_token_secret_name(deployment.id)
        token = environment.pop("ZENML_STORE_API_TOKEN")
        kube_utils.create_or_update_secret(
            core_api=self._k8s_core_api,
            namespace=self.config.kubernetes_namespace,
            secret_name=secret_name,
            data={KUBERNETES_SECRET_TOKEN_KEY_NAME: token},
        )
        orchestrator_pod_settings.env.append(
            {
                "name": "ZENML_STORE_API_TOKEN",
                "valueFrom": {
                    "secretKeyRef": {
                        "name": secret_name,
                        "key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
                    }
                },
            }
        )

    orchestrator_pod_labels = {
        "pipeline": kube_utils.sanitize_label(pipeline_name),
    }

    if placeholder_run:
        orchestrator_pod_labels["run_id"] = kube_utils.sanitize_label(
            str(placeholder_run.id)
        )
        orchestrator_pod_labels["run_name"] = kube_utils.sanitize_label(
            placeholder_run.name
        )

    pod_manifest = build_pod_manifest(
        pod_name=None,
        image_name=image,
        command=command,
        args=args,
        privileged=False,
        pod_settings=orchestrator_pod_settings,
        service_account_name=service_account_name,
        env=environment,
        labels=orchestrator_pod_labels,
        mount_local_stores=self.config.is_local,
        termination_grace_period_seconds=settings.pod_stop_grace_period,
    )

    pod_failure_policy = settings.pod_failure_policy or {
        # These rules are applied sequentially. This means any failure in
        # the main container will count towards the max retries. Any other
        # disruption will not count towards the max retries.
        "rules": [
            # If the main container fails, we count it towards the max
            # retries.
            {
                "action": "Count",
                "onExitCodes": {
                    "containerName": "main",
                    "operator": "NotIn",
                    "values": [0],
                },
            },
            # If the pod is interrupted at any other time, we don't count
            # it as a retry
            {
                "action": "Ignore",
                "onPodConditions": [
                    {
                        "type": "DisruptionTarget",
                        "status": "True",
                    }
                ],
            },
        ]
    }

    job_name = settings.job_name_prefix or ""
    random_prefix = "".join(random.choices("0123456789abcdef", k=8))
    job_name += (
        f"-{random_prefix}-{deployment.pipeline_configuration.name}"
    )
    # The job name will be used as a label on the pods, so we need to make
    # sure it doesn't exceed the label length limit
    job_name = kube_utils.sanitize_label(job_name)

    job_manifest = build_job_manifest(
        job_name=job_name,
        pod_template=pod_template_manifest_from_pod(pod_manifest),
        backoff_limit=settings.orchestrator_job_backoff_limit,
        ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
        active_deadline_seconds=settings.active_deadline_seconds,
        pod_failure_policy=pod_failure_policy,
        labels=orchestrator_pod_labels,
        annotations={
            ORCHESTRATOR_ANNOTATION_KEY: str(self.id),
        },
    )

    if deployment.schedule:
        if not deployment.schedule.cron_expression:
            raise RuntimeError(
                "The Kubernetes orchestrator only supports scheduling via "
                "CRON jobs, but the run was configured with a manual "
                "schedule. Use `Schedule(cron_expression=...)` instead."
            )
        cron_expression = deployment.schedule.cron_expression
        cron_job_manifest = build_cron_job_manifest(
            cron_expression=cron_expression,
            job_template=job_template_manifest_from_job(job_manifest),
            successful_jobs_history_limit=settings.successful_jobs_history_limit,
            failed_jobs_history_limit=settings.failed_jobs_history_limit,
        )

        cron_job = self._k8s_batch_api.create_namespaced_cron_job(
            body=cron_job_manifest,
            namespace=self.config.kubernetes_namespace,
        )
        logger.info(
            f"Created Kubernetes CronJob `{cron_job.metadata.name}` "
            f"with CRON expression `{cron_expression}`."
        )
        return SubmissionResult(
            metadata={
                KUBERNETES_CRON_JOB_METADATA_KEY: cron_job.metadata.name,
            }
        )
    else:
        try:
            kube_utils.create_job(
                batch_api=self._k8s_batch_api,
                namespace=self.config.kubernetes_namespace,
                job_manifest=job_manifest,
            )
        except Exception as e:
            if self.config.pass_zenml_token_as_secret:
                secret_name = self.get_token_secret_name(deployment.id)
                try:
                    kube_utils.delete_secret(
                        core_api=self._k8s_core_api,
                        namespace=self.config.kubernetes_namespace,
                        secret_name=secret_name,
                    )
                except Exception as cleanup_error:
                    logger.error(
                        "Error cleaning up secret %s: %s",
                        secret_name,
                        cleanup_error,
                    )
            raise e

        if settings.synchronous:

            def _wait_for_run_to_finish() -> None:
                logger.info("Waiting for orchestrator job to finish...")
                kube_utils.wait_for_job_to_finish(
                    batch_api=self._k8s_batch_api,
                    core_api=self._k8s_core_api,
                    namespace=self.config.kubernetes_namespace,
                    job_name=job_name,
                    backoff_interval=settings.job_monitoring_interval,
                    fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
                    stream_logs=True,
                )

            return SubmissionResult(
                wait_for_completion=_wait_for_run_to_finish,
            )
        else:
            logger.info(
                f"Orchestrator job `{job_name}` started. "
                f"Run the following command to inspect the logs: "
                f"`kubectl -n {self.config.kubernetes_namespace} logs "
                f"job/{job_name}`"
            )
            return None
update_schedule(schedule: ScheduleResponse, update: ScheduleUpdate) -> None

Updates a schedule.

Parameters:

Name Type Description Default
schedule ScheduleResponse

The schedule to update.

required
update ScheduleUpdate

The update to apply to the schedule.

required

Raises:

Type Description
RuntimeError

If the cron job name is not found.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
def update_schedule(
    self, schedule: "ScheduleResponse", update: ScheduleUpdate
) -> None:
    """Updates a schedule.

    Args:
        schedule: The schedule to update.
        update: The update to apply to the schedule.

    Raises:
        RuntimeError: If the cron job name is not found.
    """
    cron_job_name = schedule.run_metadata.get(
        KUBERNETES_CRON_JOB_METADATA_KEY
    )
    if not cron_job_name:
        raise RuntimeError("Unable to find cron job name for schedule.")

    if update.cron_expression:
        self._k8s_batch_api.patch_namespaced_cron_job(
            name=cron_job_name,
            namespace=self.config.kubernetes_namespace,
            body={"spec": {"schedule": update.cron_expression}},
        )
Modules
dag_runner

DAG runner.

Classes
DagRunner(nodes: List[Node], node_startup_function: Callable[[Node], NodeStatus], node_monitoring_function: Callable[[Node], NodeStatus], node_stop_function: Optional[Callable[[Node], None]] = None, interrupt_function: Optional[Callable[[], Optional[InterruptMode]]] = None, monitoring_interval: float = 1.0, monitoring_delay: float = 0.0, interrupt_check_interval: float = 1.0, max_parallelism: Optional[int] = None)

DAG runner.

This class does the orchestration of running the nodes of a DAG. It is running two loops in separate threads: The main thread - checks if any nodes should be skipped or are ready to run, in which case the node will be added to the startup queue - creates a worker thread to start the node and executes it in a thread pool if there are nodes in the startup queue and the maximum parallelism is not reached - periodically checks if the DAG should be interrupted The monitoring thread - monitors the running nodes and updates their status

Initialize the DAG runner.

Parameters:

Name Type Description Default
nodes List[Node]

The nodes of the DAG.

required
node_startup_function Callable[[Node], NodeStatus]

The function to start a node.

required
node_monitoring_function Callable[[Node], NodeStatus]

The function to monitor a node.

required
node_stop_function Optional[Callable[[Node], None]]

The function to stop a node.

None
interrupt_function Optional[Callable[[], Optional[InterruptMode]]]

Will be periodically called to check if the DAG should be interrupted.

None
monitoring_interval float

The interval in which the nodes are monitored.

1.0
monitoring_delay float

The delay in seconds to wait between monitoring different nodes.

0.0
interrupt_check_interval float

The interval in which the interrupt function is called.

1.0
max_parallelism Optional[int]

The maximum number of nodes to run in parallel.

None
Source code in src/zenml/integrations/kubernetes/orchestrators/dag_runner.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def __init__(
    self,
    nodes: List[Node],
    node_startup_function: Callable[[Node], NodeStatus],
    node_monitoring_function: Callable[[Node], NodeStatus],
    node_stop_function: Optional[Callable[[Node], None]] = None,
    interrupt_function: Optional[
        Callable[[], Optional[InterruptMode]]
    ] = None,
    monitoring_interval: float = 1.0,
    monitoring_delay: float = 0.0,
    interrupt_check_interval: float = 1.0,
    max_parallelism: Optional[int] = None,
) -> None:
    """Initialize the DAG runner.

    Args:
        nodes: The nodes of the DAG.
        node_startup_function: The function to start a node.
        node_monitoring_function: The function to monitor a node.
        node_stop_function: The function to stop a node.
        interrupt_function: Will be periodically called to check if the
            DAG should be interrupted.
        monitoring_interval: The interval in which the nodes are monitored.
        monitoring_delay: The delay in seconds to wait between monitoring
            different nodes.
        interrupt_check_interval: The interval in which the interrupt
            function is called.
        max_parallelism: The maximum number of nodes to run in parallel.
    """
    self.nodes = {node.id: node for node in nodes}
    self.startup_queue: queue.Queue[Node] = queue.Queue()
    self.node_startup_function = node_startup_function
    self.node_monitoring_function = node_monitoring_function
    self.node_stop_function = node_stop_function
    self.interrupt_function = interrupt_function
    self.monitoring_thread = threading.Thread(
        name="DagRunner-Monitoring-Loop",
        target=self._monitoring_loop,
        daemon=True,
    )
    self.monitoring_interval = monitoring_interval
    self.monitoring_delay = monitoring_delay
    self.interrupt_check_interval = interrupt_check_interval
    self.max_parallelism = max_parallelism
    self.shutdown_event = threading.Event()
    self.startup_executor = ThreadPoolExecutor(
        max_workers=10, thread_name_prefix="DagRunner-Startup"
    )
Attributes
active_nodes: List[Node] property

Active nodes.

Active nodes are nodes that are either running or starting.

Returns:

Type Description
List[Node]

Active nodes.

running_nodes: List[Node] property

Running nodes.

Returns:

Type Description
List[Node]

Running nodes.

Functions
run() -> Dict[str, NodeStatus]

Run the DAG.

Returns:

Type Description
Dict[str, NodeStatus]

The final node states.

Source code in src/zenml/integrations/kubernetes/orchestrators/dag_runner.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def run(self) -> Dict[str, NodeStatus]:
    """Run the DAG.

    Returns:
        The final node states.
    """
    self._initialize_startup_queue()

    self.monitoring_thread.start()

    interrupt_mode = None
    last_interrupt_check = time.time()

    while True:
        if self.interrupt_function is not None:
            if (
                time.time() - last_interrupt_check
                >= self.interrupt_check_interval
            ):
                if interrupt_mode := self.interrupt_function():
                    logger.warning("DAG execution interrupted.")
                    break
                last_interrupt_check = time.time()

        is_finished = self._process_nodes()
        if is_finished:
            break

        time.sleep(0.5)

    self.shutdown_event.set()
    if interrupt_mode == InterruptMode.FORCE:
        # If a force interrupt was requested, we stop all running nodes.
        self._stop_all_nodes()

    self.monitoring_thread.join()

    node_statuses = {
        node_id: node.status for node_id, node in self.nodes.items()
    }
    logger.debug("Finished with node statuses: %s", node_statuses)

    return node_statuses
InterruptMode

Bases: StrEnum

Interrupt mode.

Node

Bases: BaseModel

DAG node.

Attributes
is_finished: bool property

Whether the node is finished.

Returns:

Type Description
bool

Whether the node is finished.

NodeStatus

Bases: StrEnum

Status of a DAG node.

Functions
kube_utils

Utilities for Kubernetes related functions.

Internal interface: no backwards compatibility guarantees. Adjusted from https://github.com/tensorflow/tfx/blob/master/tfx/utils/kube_utils.py.

Classes
JobStatus

Bases: Enum

Status of a Kubernetes job.

PatchedFailurePolicyRule

Bases: V1PodFailurePolicyRule

Patched failure policy rule.

Attributes
on_pod_conditions property writable

On pod conditions.

Returns:

Type Description

On pod conditions.

PodPhase

Bases: Enum

Phase of the Kubernetes pod.

Pod phases are defined in https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.

Functions
apply_default_resource_requests(memory: str, cpu: Optional[str] = None, pod_settings: Optional[KubernetesPodSettings] = None) -> KubernetesPodSettings

Applies default resource requests to a pod settings object.

Parameters:

Name Type Description Default
memory str

The memory resource request.

required
cpu Optional[str]

The CPU resource request.

None
pod_settings Optional[KubernetesPodSettings]

The pod settings to update. A new one will be created if not provided.

None

Returns:

Type Description
KubernetesPodSettings

The new or updated pod settings.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
def apply_default_resource_requests(
    memory: str,
    cpu: Optional[str] = None,
    pod_settings: Optional[KubernetesPodSettings] = None,
) -> KubernetesPodSettings:
    """Applies default resource requests to a pod settings object.

    Args:
        memory: The memory resource request.
        cpu: The CPU resource request.
        pod_settings: The pod settings to update. A new one will be created
            if not provided.

    Returns:
        The new or updated pod settings.
    """
    resources = {
        "requests": {"memory": memory},
    }
    if cpu:
        resources["requests"]["cpu"] = cpu
    if not pod_settings:
        pod_settings = KubernetesPodSettings(resources=resources)
    elif not pod_settings.resources:
        # We can't update the pod settings in place (because it's a frozen
        # pydantic model), so we have to create a new one.
        pod_settings = KubernetesPodSettings(
            **pod_settings.model_dump(exclude_unset=True),
            resources=resources,
        )
    else:
        set_requests = pod_settings.resources.get("requests", {})
        resources["requests"].update(set_requests)
        pod_settings.resources["requests"] = resources["requests"]

    return pod_settings
check_job_status(batch_api: k8s_client.BatchV1Api, core_api: k8s_client.CoreV1Api, namespace: str, job_name: str, fail_on_container_waiting_reasons: Optional[List[str]] = None, container_name: Optional[str] = None) -> Tuple[JobStatus, Optional[str]]

Check the status of a job.

Parameters:

Name Type Description Default
batch_api BatchV1Api

Kubernetes BatchV1Api client.

required
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
namespace str

Kubernetes namespace.

required
job_name str

Name of the job for which to wait.

required
fail_on_container_waiting_reasons Optional[List[str]]

List of container waiting reasons that will cause the job to fail.

None
container_name Optional[str]

Name of the container to check for failure.

None

Returns:

Type Description
Tuple[JobStatus, Optional[str]]

The status of the job and an error message if the job failed.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
def check_job_status(
    batch_api: k8s_client.BatchV1Api,
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    job_name: str,
    fail_on_container_waiting_reasons: Optional[List[str]] = None,
    container_name: Optional[str] = None,
) -> Tuple[JobStatus, Optional[str]]:
    """Check the status of a job.

    Args:
        batch_api: Kubernetes BatchV1Api client.
        core_api: Kubernetes CoreV1Api client.
        namespace: Kubernetes namespace.
        job_name: Name of the job for which to wait.
        fail_on_container_waiting_reasons: List of container waiting reasons
            that will cause the job to fail.
        container_name: Name of the container to check for failure.

    Returns:
        The status of the job and an error message if the job failed.
    """
    job: k8s_client.V1Job = retry_on_api_exception(
        batch_api.read_namespaced_job
    )(name=job_name, namespace=namespace)

    if job.status.conditions:
        for condition in job.status.conditions:
            if condition.type == "Complete" and condition.status == "True":
                return JobStatus.SUCCEEDED, None
            if condition.type == "Failed" and condition.status == "True":
                error_message = condition.message or "Unknown"
                container_failure_reason = None
                try:
                    pods = core_api.list_namespaced_pod(
                        label_selector=f"job-name={job_name}",
                        namespace=namespace,
                    ).items
                    # Sort pods by creation timestamp, oldest first
                    pods.sort(
                        key=lambda pod: pod.metadata.creation_timestamp,
                    )
                    if pods:
                        if (
                            termination_reason
                            := get_container_termination_reason(
                                pods[-1], container_name or "main"
                            )
                        ):
                            exit_code, reason = termination_reason
                            if exit_code != 0:
                                container_failure_reason = (
                                    f"{reason}, exit_code={exit_code}"
                                )
                except Exception:
                    pass

                if container_failure_reason:
                    error_message += f" (container failure reason: {container_failure_reason})"

                return JobStatus.FAILED, error_message

    if fail_on_container_waiting_reasons:
        pod_list: k8s_client.V1PodList = retry_on_api_exception(
            core_api.list_namespaced_pod
        )(
            namespace=namespace,
            label_selector=f"job-name={job_name}",
            field_selector="status.phase=Pending",
        )
        for pod in pod_list.items:
            container_state = get_container_status(
                pod, container_name or "main"
            )

            if (
                container_state
                and (waiting_state := container_state.waiting)
                and waiting_state.reason in fail_on_container_waiting_reasons
            ):
                retry_on_api_exception(batch_api.delete_namespaced_job)(
                    name=job_name,
                    namespace=namespace,
                    propagation_policy="Foreground",
                )
                error_message = (
                    f"Detected container in state `{waiting_state.reason}`"
                )
                return JobStatus.FAILED, error_message

    return JobStatus.RUNNING, None
create_and_wait_for_pod_to_start(core_api: k8s_client.CoreV1Api, pod_display_name: str, pod_name: str, pod_manifest: k8s_client.V1Pod, namespace: str, startup_max_retries: int, startup_failure_delay: float, startup_failure_backoff: float, startup_timeout: float) -> None

Create a pod and wait for it to reach a desired state.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
pod_display_name str

The display name of the pod to use in logs.

required
pod_name str

The name of the pod to create.

required
pod_manifest V1Pod

The manifest of the pod to create.

required
namespace str

The namespace in which to create the pod.

required
startup_max_retries int

The maximum number of retries for the pod startup.

required
startup_failure_delay float

The delay between retries for the pod startup.

required
startup_failure_backoff float

The backoff factor for the pod startup.

required
startup_timeout float

The maximum time to wait for the pod to start.

required

Raises:

Type Description
TimeoutError

If the pod is still in a pending state after the maximum wait time has elapsed.

Exception

If the pod fails to start after the maximum number of retries.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
def create_and_wait_for_pod_to_start(
    core_api: k8s_client.CoreV1Api,
    pod_display_name: str,
    pod_name: str,
    pod_manifest: k8s_client.V1Pod,
    namespace: str,
    startup_max_retries: int,
    startup_failure_delay: float,
    startup_failure_backoff: float,
    startup_timeout: float,
) -> None:
    """Create a pod and wait for it to reach a desired state.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        pod_display_name: The display name of the pod to use in logs.
        pod_name: The name of the pod to create.
        pod_manifest: The manifest of the pod to create.
        namespace: The namespace in which to create the pod.
        startup_max_retries: The maximum number of retries for the pod startup.
        startup_failure_delay: The delay between retries for the pod startup.
        startup_failure_backoff: The backoff factor for the pod startup.
        startup_timeout: The maximum time to wait for the pod to start.

    Raises:
        TimeoutError: If the pod is still in a pending state after the maximum
            wait time has elapsed.
        Exception: If the pod fails to start after the maximum number of
            retries.
    """
    retries = 0

    while retries < startup_max_retries:
        try:
            # Create and run pod.
            core_api.create_namespaced_pod(
                namespace=namespace,
                body=pod_manifest,
            )
            break
        except Exception as e:
            retries += 1
            if retries < startup_max_retries:
                logger.debug(f"The {pod_display_name} failed to start: {e}")
                message = ""
                try:
                    if isinstance(e, ApiException) and e.body:
                        exception_body = json.loads(e.body)
                        message = exception_body.get("message", "")
                except Exception:
                    pass
                logger.error(
                    f"Failed to create {pod_display_name}. "
                    f"Retrying in {startup_failure_delay} seconds..."
                    "\nReason: " + message
                    if message
                    else ""
                )
                time.sleep(startup_failure_delay)
                startup_failure_delay *= startup_failure_backoff
            else:
                logger.error(
                    f"Failed to create {pod_display_name} after "
                    f"{startup_max_retries} retries. Exiting."
                )
                raise

    # Wait for pod to start
    logger.info(f"Waiting for {pod_display_name} to start...")
    max_wait = startup_timeout
    total_wait: float = 0
    delay = startup_failure_delay
    while True:
        pod = get_pod(
            core_api=core_api,
            pod_name=pod_name,
            namespace=namespace,
        )
        if not pod or pod_is_not_pending(pod):
            break
        if total_wait >= max_wait:
            # Have to delete the pending pod so it doesn't start running
            # later on.
            try:
                core_api.delete_namespaced_pod(
                    name=pod_name,
                    namespace=namespace,
                )
            except Exception:
                pass
            raise TimeoutError(
                f"The {pod_display_name} is still in a pending state "
                f"after {total_wait} seconds. Exiting."
            )

        if total_wait + delay > max_wait:
            delay = max_wait - total_wait
        total_wait += delay
        time.sleep(delay)
        delay *= startup_failure_backoff
create_config_map(core_api: k8s_client.CoreV1Api, namespace: str, name: str, data: Dict[str, str]) -> None

Create a Kubernetes config map.

Parameters:

Name Type Description Default
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
namespace str

Kubernetes namespace.

required
name str

Name of the config map to create.

required
data Dict[str, str]

Data to store in the config map.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
def create_config_map(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    name: str,
    data: Dict[str, str],
) -> None:
    """Create a Kubernetes config map.

    Args:
        core_api: Kubernetes CoreV1Api client.
        namespace: Kubernetes namespace.
        name: Name of the config map to create.
        data: Data to store in the config map.
    """
    retry_on_api_exception(core_api.create_namespaced_config_map)(
        namespace=namespace,
        body=k8s_client.V1ConfigMap(metadata={"name": name}, data=data),
    )
create_edit_service_account(core_api: k8s_client.CoreV1Api, rbac_api: k8s_client.RbacAuthorizationV1Api, service_account_name: str, namespace: str, role_binding_name: str = 'zenml-edit') -> None

Create a new Kubernetes service account with "edit" rights.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
rbac_api RbacAuthorizationV1Api

Client of Rbac Authorization V1 API of Kubernetes API.

required
service_account_name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

required
role_binding_name str

Name of the role binding. Defaults to "zenml-edit".

'zenml-edit'
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def create_edit_service_account(
    core_api: k8s_client.CoreV1Api,
    rbac_api: k8s_client.RbacAuthorizationV1Api,
    service_account_name: str,
    namespace: str,
    role_binding_name: str = "zenml-edit",
) -> None:
    """Create a new Kubernetes service account with "edit" rights.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        rbac_api: Client of Rbac Authorization V1 API of Kubernetes API.
        service_account_name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".
        role_binding_name: Name of the role binding. Defaults to "zenml-edit".
    """
    rb_manifest = build_role_binding_manifest_for_service_account(
        name=role_binding_name,
        role_name="edit",
        service_account_name=service_account_name,
        namespace=namespace,
    )
    _if_not_exists(rbac_api.create_namespaced_role_binding)(
        namespace=namespace, body=rb_manifest
    )

    sa_manifest = build_service_account_manifest(
        name=service_account_name, namespace=namespace
    )
    _if_not_exists(core_api.create_namespaced_service_account)(
        namespace=namespace,
        body=sa_manifest,
    )
create_job(batch_api: k8s_client.BatchV1Api, namespace: str, job_manifest: k8s_client.V1Job) -> None

Create a Kubernetes job.

Parameters:

Name Type Description Default
batch_api BatchV1Api

Kubernetes batch api.

required
namespace str

Kubernetes namespace.

required
job_manifest V1Job

The manifest of the job to create.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
def create_job(
    batch_api: k8s_client.BatchV1Api,
    namespace: str,
    job_manifest: k8s_client.V1Job,
) -> None:
    """Create a Kubernetes job.

    Args:
        batch_api: Kubernetes batch api.
        namespace: Kubernetes namespace.
        job_manifest: The manifest of the job to create.
    """
    retry_on_api_exception(batch_api.create_namespaced_job)(
        namespace=namespace,
        body=job_manifest,
    )
create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None

Create a Kubernetes namespace.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

Kubernetes namespace. Defaults to "default".

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
384
385
386
387
388
389
390
391
392
def create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None:
    """Create a Kubernetes namespace.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: Kubernetes namespace. Defaults to "default".
    """
    manifest = build_namespace_manifest(namespace)
    _if_not_exists(core_api.create_namespace)(body=manifest)
create_or_update_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str, data: Dict[str, Optional[str]]) -> None

Create a Kubernetes secret if it doesn't exist, or update it if it does.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to create or update the secret.

required
secret_name str

The name of the secret to create or update.

required
data Dict[str, Optional[str]]

The secret data. If the value is None, the key will be removed from the secret.

required

Raises:

Type Description
ApiException

If the secret creation failed for any reason other than the secret already existing.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def create_or_update_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
    data: Dict[str, Optional[str]],
) -> None:
    """Create a Kubernetes secret if it doesn't exist, or update it if it does.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to create or update the secret.
        secret_name: The name of the secret to create or update.
        data: The secret data. If the value is None, the key will be removed
            from the secret.

    Raises:
        ApiException: If the secret creation failed for any reason other than
            the secret already existing.
    """
    try:
        create_secret(core_api, namespace, secret_name, data)
    except ApiException as e:
        if e.status != 409:
            raise
        update_secret(core_api, namespace, secret_name, data)
create_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str, data: Dict[str, Optional[str]]) -> None

Create a Kubernetes secret.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to create the secret.

required
secret_name str

The name of the secret to create.

required
data Dict[str, Optional[str]]

The secret data.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def create_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
    data: Dict[str, Optional[str]],
) -> None:
    """Create a Kubernetes secret.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to create the secret.
        secret_name: The name of the secret to create.
        data: The secret data.
    """
    core_api.create_namespaced_secret(
        namespace=namespace,
        body=build_secret_manifest(name=secret_name, data=data),
    )
delete_config_map(core_api: k8s_client.CoreV1Api, namespace: str, name: str) -> None

Delete a Kubernetes config map.

Parameters:

Name Type Description Default
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
namespace str

Kubernetes namespace.

required
name str

Name of the config map to delete.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
def delete_config_map(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    name: str,
) -> None:
    """Delete a Kubernetes config map.

    Args:
        core_api: Kubernetes CoreV1Api client.
        namespace: Kubernetes namespace.
        name: Name of the config map to delete.
    """
    retry_on_api_exception(core_api.delete_namespaced_config_map)(
        namespace=namespace,
        name=name,
    )
delete_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str) -> None

Delete a Kubernetes secret.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to delete the secret.

required
secret_name str

The name of the secret to delete.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def delete_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
) -> None:
    """Delete a Kubernetes secret.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to delete the secret.
        secret_name: The name of the secret to delete.
    """
    core_api.delete_namespaced_secret(
        name=secret_name,
        namespace=namespace,
    )
get_config_map(core_api: k8s_client.CoreV1Api, namespace: str, name: str) -> k8s_client.V1ConfigMap

Get a Kubernetes config map.

Parameters:

Name Type Description Default
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
namespace str

Kubernetes namespace.

required
name str

Name of the config map to get.

required

Returns:

Type Description
V1ConfigMap

The config map.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
def get_config_map(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    name: str,
) -> k8s_client.V1ConfigMap:
    """Get a Kubernetes config map.

    Args:
        core_api: Kubernetes CoreV1Api client.
        namespace: Kubernetes namespace.
        name: Name of the config map to get.

    Returns:
        The config map.
    """
    return retry_on_api_exception(core_api.read_namespaced_config_map)(
        namespace=namespace,
        name=name,
    )
get_container_status(pod: k8s_client.V1Pod, container_name: str) -> Optional[k8s_client.V1ContainerState]

Get the status of a container.

Parameters:

Name Type Description Default
pod V1Pod

The pod to get the container status for.

required
container_name str

The container name.

required

Returns:

Type Description
Optional[V1ContainerState]

The container status.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
def get_container_status(
    pod: k8s_client.V1Pod, container_name: str
) -> Optional[k8s_client.V1ContainerState]:
    """Get the status of a container.

    Args:
        pod: The pod to get the container status for.
        container_name: The container name.

    Returns:
        The container status.
    """
    if not pod.status or not pod.status.container_statuses:
        return None

    for container_status in pod.status.container_statuses:
        if container_status.name == container_name:
            return container_status.state

    return None
get_container_termination_reason(pod: k8s_client.V1Pod, container_name: str) -> Optional[Tuple[int, str]]

Get the termination reason for a container.

Parameters:

Name Type Description Default
pod V1Pod

The pod to get the termination reason for.

required
container_name str

The container name.

required

Returns:

Type Description
Optional[Tuple[int, str]]

The exit code and termination reason for the container.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
def get_container_termination_reason(
    pod: k8s_client.V1Pod, container_name: str
) -> Optional[Tuple[int, str]]:
    """Get the termination reason for a container.

    Args:
        pod: The pod to get the termination reason for.
        container_name: The container name.

    Returns:
        The exit code and termination reason for the container.
    """
    container_state = get_container_status(pod, container_name)
    if not container_state or not container_state.terminated:
        return None

    return (
        container_state.terminated.exit_code,
        container_state.terminated.reason or "Unknown",
    )
get_job(batch_api: k8s_client.BatchV1Api, namespace: str, job_name: str) -> k8s_client.V1Job

Get a job by name.

Parameters:

Name Type Description Default
batch_api BatchV1Api

Kubernetes batch api.

required
namespace str

Kubernetes namespace.

required
job_name str

The name of the job to get.

required

Returns:

Type Description
V1Job

The job.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def get_job(
    batch_api: k8s_client.BatchV1Api,
    namespace: str,
    job_name: str,
) -> k8s_client.V1Job:
    """Get a job by name.

    Args:
        batch_api: Kubernetes batch api.
        namespace: Kubernetes namespace.
        job_name: The name of the job to get.

    Returns:
        The job.
    """
    return retry_on_api_exception(batch_api.read_namespaced_job)(
        name=job_name, namespace=namespace
    )
get_parent_job_name(core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str) -> Optional[str]

Get the name of the job that created a pod.

Parameters:

Name Type Description Default
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
pod_name str

Name of the pod.

required
namespace str

Kubernetes namespace.

required

Returns:

Type Description
Optional[str]

The name of the job that created the pod, or None if the pod is not

Optional[str]

associated with a job.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
def get_parent_job_name(
    core_api: k8s_client.CoreV1Api,
    pod_name: str,
    namespace: str,
) -> Optional[str]:
    """Get the name of the job that created a pod.

    Args:
        core_api: Kubernetes CoreV1Api client.
        pod_name: Name of the pod.
        namespace: Kubernetes namespace.

    Returns:
        The name of the job that created the pod, or None if the pod is not
        associated with a job.
    """
    pod = get_pod(core_api, pod_name=pod_name, namespace=namespace)
    if (
        pod
        and pod.metadata
        and pod.metadata.labels
        and (job_name := pod.metadata.labels.get("job-name", None))
    ):
        return cast(str, job_name)

    return None
get_pod(core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str) -> Optional[k8s_client.V1Pod]

Get a pod from Kubernetes metadata API.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of CoreV1Api of Kubernetes API.

required
pod_name str

The name of the pod.

required
namespace str

The namespace of the pod.

required

Raises:

Type Description
RuntimeError

When it sees unexpected errors from Kubernetes API.

Returns:

Type Description
Optional[V1Pod]

The found pod object. None if it's not found.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def get_pod(
    core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str
) -> Optional[k8s_client.V1Pod]:
    """Get a pod from Kubernetes metadata API.

    Args:
        core_api: Client of `CoreV1Api` of Kubernetes API.
        pod_name: The name of the pod.
        namespace: The namespace of the pod.

    Raises:
        RuntimeError: When it sees unexpected errors from Kubernetes API.

    Returns:
        The found pod object. None if it's not found.
    """
    try:
        return retry_on_api_exception(core_api.read_namespaced_pod)(
            name=pod_name, namespace=namespace
        )
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            return None
        raise RuntimeError from e
get_pod_owner_references(core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str) -> List[k8s_client.V1OwnerReference]

Get owner references for a pod.

Parameters:

Name Type Description Default
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
pod_name str

Name of the pod.

required
namespace str

Kubernetes namespace.

required

Returns:

Type Description
List[V1OwnerReference]

List of owner references.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
def get_pod_owner_references(
    core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str
) -> List[k8s_client.V1OwnerReference]:
    """Get owner references for a pod.

    Args:
        core_api: Kubernetes CoreV1Api client.
        pod_name: Name of the pod.
        namespace: Kubernetes namespace.

    Returns:
        List of owner references.
    """
    pod = get_pod(core_api=core_api, pod_name=pod_name, namespace=namespace)

    if not pod or not pod.metadata or not pod.metadata.owner_references:
        return []

    return cast(
        List[k8s_client.V1OwnerReference], pod.metadata.owner_references
    )
is_inside_kubernetes() -> bool

Check whether we are inside a Kubernetes cluster or on a remote host.

Returns:

Type Description
bool

True if inside a Kubernetes cluster, else False.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
115
116
117
118
119
120
121
122
123
124
125
def is_inside_kubernetes() -> bool:
    """Check whether we are inside a Kubernetes cluster or on a remote host.

    Returns:
        True if inside a Kubernetes cluster, else False.
    """
    try:
        k8s_config.load_incluster_config()
        return True
    except k8s_config.ConfigException:
        return False
is_step_job(job: k8s_client.V1Job) -> bool

Check if a job is a step job.

Parameters:

Name Type Description Default
job V1Job

The job to check.

required

Returns:

Type Description
bool

Whether the job is a step job.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
736
737
738
739
740
741
742
743
744
745
746
747
748
def is_step_job(job: k8s_client.V1Job) -> bool:
    """Check if a job is a step job.

    Args:
        job: The job to check.

    Returns:
        Whether the job is a step job.
    """
    if not job.metadata or not job.metadata.annotations:
        return False

    return STEP_NAME_ANNOTATION_KEY in job.metadata.annotations
list_jobs(batch_api: k8s_client.BatchV1Api, namespace: str, label_selector: Optional[str] = None) -> k8s_client.V1JobList

List jobs in a namespace.

Parameters:

Name Type Description Default
batch_api BatchV1Api

Kubernetes batch api.

required
namespace str

Kubernetes namespace.

required
label_selector Optional[str]

The label selector to use.

None

Returns:

Type Description
V1JobList

The job list.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
def list_jobs(
    batch_api: k8s_client.BatchV1Api,
    namespace: str,
    label_selector: Optional[str] = None,
) -> k8s_client.V1JobList:
    """List jobs in a namespace.

    Args:
        batch_api: Kubernetes batch api.
        namespace: Kubernetes namespace.
        label_selector: The label selector to use.

    Returns:
        The job list.
    """
    return retry_on_api_exception(batch_api.list_namespaced_job)(
        namespace=namespace,
        label_selector=label_selector,
    )
load_kube_config(incluster: bool = False, context: Optional[str] = None) -> None

Load the Kubernetes client config.

Parameters:

Name Type Description Default
incluster bool

Whether to load the in-cluster config.

False
context Optional[str]

Name of the Kubernetes context. If not provided, uses the currently active context. Will be ignored if incluster is True.

None
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def load_kube_config(
    incluster: bool = False, context: Optional[str] = None
) -> None:
    """Load the Kubernetes client config.

    Args:
        incluster: Whether to load the in-cluster config.
        context: Name of the Kubernetes context. If not provided, uses the
            currently active context. Will be ignored if `incluster` is True.
    """
    if incluster:
        k8s_config.load_incluster_config()
    else:
        k8s_config.load_kube_config(context=context)
pod_failed(pod: k8s_client.V1Pod) -> bool

Check if pod status is 'Failed'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

True if pod status is 'Failed' else False.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
177
178
179
180
181
182
183
184
185
186
def pod_failed(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is 'Failed'.

    Args:
        pod: Kubernetes pod.

    Returns:
        True if pod status is 'Failed' else False.
    """
    return pod.status.phase == PodPhase.FAILED.value  # type: ignore[no-any-return]
pod_is_done(pod: k8s_client.V1Pod) -> bool

Check if pod status is 'Succeeded'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

True if pod status is 'Succeeded' else False.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
189
190
191
192
193
194
195
196
197
198
def pod_is_done(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is 'Succeeded'.

    Args:
        pod: Kubernetes pod.

    Returns:
        True if pod status is 'Succeeded' else False.
    """
    return pod.status.phase == PodPhase.SUCCEEDED.value  # type: ignore[no-any-return]
pod_is_not_pending(pod: k8s_client.V1Pod) -> bool

Check if pod status is not 'Pending'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

False if the pod status is 'Pending' else True.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
165
166
167
168
169
170
171
172
173
174
def pod_is_not_pending(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is not 'Pending'.

    Args:
        pod: Kubernetes pod.

    Returns:
        False if the pod status is 'Pending' else True.
    """
    return pod.status.phase != PodPhase.PENDING.value  # type: ignore[no-any-return]
retry_on_api_exception(func: Callable[..., R], max_retries: int = 3, delay: float = 1, backoff: float = 1, fail_on_status_codes: Tuple[int, ...] = (404,)) -> Callable[..., R]

Retry a function on API exceptions.

Parameters:

Name Type Description Default
func Callable[..., R]

The function to retry.

required
max_retries int

The maximum number of retries.

3
delay float

The delay between retries.

1
backoff float

The backoff factor.

1
fail_on_status_codes Tuple[int, ...]

The status codes to fail on immediately.

(404,)

Returns:

Type Description
Callable[..., R]

The wrapped function with retry logic.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def retry_on_api_exception(
    func: Callable[..., R],
    max_retries: int = 3,
    delay: float = 1,
    backoff: float = 1,
    fail_on_status_codes: Tuple[int, ...] = (404,),
) -> Callable[..., R]:
    """Retry a function on API exceptions.

    Args:
        func: The function to retry.
        max_retries: The maximum number of retries.
        delay: The delay between retries.
        backoff: The backoff factor.
        fail_on_status_codes: The status codes to fail on immediately.

    Returns:
        The wrapped function with retry logic.
    """

    @functools.wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> R:
        _delay = delay
        retries = 0
        while retries <= max_retries:
            try:
                return func(*args, **kwargs)
            except ApiException as e:
                if e.status in fail_on_status_codes:
                    raise

                retries += 1
                if retries <= max_retries:
                    logger.warning("Error calling %s: %s.", func.__name__, e)
                    time.sleep(_delay)
                    _delay *= backoff
                else:
                    raise

        raise RuntimeError(
            f"Failed to call {func.__name__} after {max_retries} retries."
        )

    return wrapper
sanitize_label(label: str) -> str

Sanitize a label for a Kubernetes resource.

Parameters:

Name Type Description Default
label str

The label to sanitize.

required

Returns:

Type Description
str

The sanitized label.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def sanitize_label(label: str) -> str:
    """Sanitize a label for a Kubernetes resource.

    Args:
        label: The label to sanitize.

    Returns:
        The sanitized label.
    """
    # https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#rfc-1035-label-names
    label = re.sub(r"[^a-z0-9-]", "-", label.lower())
    label = re.sub(r"^[-]+", "", label)
    label = re.sub(r"[-]+", "-", label)
    label = label[:63]
    # Remove trailing dashes after truncation to make sure we end with an
    # alphanumeric character
    label = re.sub(r"[-]+$", "", label)

    return label
update_config_map(core_api: k8s_client.CoreV1Api, namespace: str, name: str, data: Dict[str, str]) -> None

Update a Kubernetes config map.

Parameters:

Name Type Description Default
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
namespace str

Kubernetes namespace.

required
name str

Name of the config map to update.

required
data Dict[str, str]

Data to store in the config map.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
def update_config_map(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    name: str,
    data: Dict[str, str],
) -> None:
    """Update a Kubernetes config map.

    Args:
        core_api: Kubernetes CoreV1Api client.
        namespace: Kubernetes namespace.
        name: Name of the config map to update.
        data: Data to store in the config map.
    """
    retry_on_api_exception(core_api.patch_namespaced_config_map)(
        namespace=namespace,
        name=name,
        body=k8s_client.V1ConfigMap(data=data),
    )
update_job(batch_api: k8s_client.BatchV1Api, namespace: str, job_name: str, annotations: Dict[str, str]) -> k8s_client.V1Job

Update a job.

Parameters:

Name Type Description Default
batch_api BatchV1Api

Kubernetes batch api.

required
namespace str

Kubernetes namespace.

required
job_name str

The name of the job to update.

required
annotations Dict[str, str]

The annotations to update.

required

Returns:

Type Description
V1Job

The updated job.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def update_job(
    batch_api: k8s_client.BatchV1Api,
    namespace: str,
    job_name: str,
    annotations: Dict[str, str],
) -> k8s_client.V1Job:
    """Update a job.

    Args:
        batch_api: Kubernetes batch api.
        namespace: Kubernetes namespace.
        job_name: The name of the job to update.
        annotations: The annotations to update.

    Returns:
        The updated job.
    """
    return retry_on_api_exception(batch_api.patch_namespaced_job)(
        name=job_name,
        namespace=namespace,
        body={"metadata": {"annotations": annotations}},
    )
update_secret(core_api: k8s_client.CoreV1Api, namespace: str, secret_name: str, data: Dict[str, Optional[str]]) -> None

Update a Kubernetes secret.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

The namespace in which to update the secret.

required
secret_name str

The name of the secret to update.

required
data Dict[str, Optional[str]]

The secret data. If the value is None, the key will be removed from the secret.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def update_secret(
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    secret_name: str,
    data: Dict[str, Optional[str]],
) -> None:
    """Update a Kubernetes secret.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: The namespace in which to update the secret.
        secret_name: The name of the secret to update.
        data: The secret data. If the value is None, the key will be removed
            from the secret.
    """
    core_api.patch_namespaced_secret(
        namespace=namespace,
        name=secret_name,
        body=build_secret_manifest(name=secret_name, data=data),
    )
wait_for_job_to_finish(batch_api: k8s_client.BatchV1Api, core_api: k8s_client.CoreV1Api, namespace: str, job_name: str, backoff_interval: float = 1, maximum_backoff: float = 32, exponential_backoff: bool = False, fail_on_container_waiting_reasons: Optional[List[str]] = None, stream_logs: bool = True, container_name: Optional[str] = None) -> None

Wait for a job to finish.

Parameters:

Name Type Description Default
batch_api BatchV1Api

Kubernetes BatchV1Api client.

required
core_api CoreV1Api

Kubernetes CoreV1Api client.

required
namespace str

Kubernetes namespace.

required
job_name str

Name of the job for which to wait.

required
backoff_interval float

The interval to wait between polling the job status.

1
maximum_backoff float

The maximum interval to wait between polling the job status.

32
exponential_backoff bool

Whether to use exponential backoff.

False
fail_on_container_waiting_reasons Optional[List[str]]

List of container waiting reasons that will cause the job to fail.

None
stream_logs bool

Whether to stream the job logs.

True
container_name Optional[str]

Name of the container to stream logs from.

None

Raises:

Type Description
RuntimeError

If the job failed or timed out.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
def wait_for_job_to_finish(
    batch_api: k8s_client.BatchV1Api,
    core_api: k8s_client.CoreV1Api,
    namespace: str,
    job_name: str,
    backoff_interval: float = 1,
    maximum_backoff: float = 32,
    exponential_backoff: bool = False,
    fail_on_container_waiting_reasons: Optional[List[str]] = None,
    stream_logs: bool = True,
    container_name: Optional[str] = None,
) -> None:
    """Wait for a job to finish.

    Args:
        batch_api: Kubernetes BatchV1Api client.
        core_api: Kubernetes CoreV1Api client.
        namespace: Kubernetes namespace.
        job_name: Name of the job for which to wait.
        backoff_interval: The interval to wait between polling the job status.
        maximum_backoff: The maximum interval to wait between polling the job
            status.
        exponential_backoff: Whether to use exponential backoff.
        fail_on_container_waiting_reasons: List of container waiting reasons
            that will cause the job to fail.
        stream_logs: Whether to stream the job logs.
        container_name: Name of the container to stream logs from.

    Raises:
        RuntimeError: If the job failed or timed out.
    """
    logged_lines_per_pod: Dict[str, int] = defaultdict(int)
    finished_pods = set()

    while True:
        job: k8s_client.V1Job = retry_on_api_exception(
            batch_api.read_namespaced_job
        )(name=job_name, namespace=namespace)

        if job.status.conditions:
            for condition in job.status.conditions:
                if condition.type == "Complete" and condition.status == "True":
                    return
                if condition.type == "Failed" and condition.status == "True":
                    raise RuntimeError(
                        f"Job `{namespace}:{job_name}` failed: "
                        f"{condition.message}"
                    )

        if fail_on_container_waiting_reasons:
            pod_list: k8s_client.V1PodList = retry_on_api_exception(
                core_api.list_namespaced_pod
            )(
                namespace=namespace,
                label_selector=f"job-name={job_name}",
                field_selector="status.phase=Pending",
            )
            for pod in pod_list.items:
                container_state = get_container_status(
                    pod, container_name or "main"
                )

                if (
                    container_state
                    and (waiting_state := container_state.waiting)
                    and waiting_state.reason
                    in fail_on_container_waiting_reasons
                ):
                    retry_on_api_exception(batch_api.delete_namespaced_job)(
                        name=job_name,
                        namespace=namespace,
                        propagation_policy="Foreground",
                    )
                    raise RuntimeError(
                        f"Job `{namespace}:{job_name}` failed: "
                        f"Detected container in state "
                        f"{waiting_state.reason}"
                    )

        if stream_logs:
            try:
                pod_list = core_api.list_namespaced_pod(
                    namespace=namespace,
                    label_selector=f"job-name={job_name}",
                )
            except ApiException as e:
                logger.error("Error fetching pods: %s.", e)
                pod_list = []
            else:
                # Sort pods by creation timestamp, oldest first
                pod_list.items.sort(
                    key=lambda pod: pod.metadata.creation_timestamp,
                )

            for pod in pod_list.items:
                pod_name = pod.metadata.name
                pod_status = pod.status.phase

                if pod_name in finished_pods:
                    # We've already streamed all logs for this pod, so we can
                    # skip it.
                    continue

                if pod_status == PodPhase.PENDING.value:
                    # The pod is still pending, so we can't stream logs for it
                    # yet.
                    continue

                if pod_status in [
                    PodPhase.SUCCEEDED.value,
                    PodPhase.FAILED.value,
                ]:
                    finished_pods.add(pod_name)

                containers = pod.spec.containers
                if not container_name:
                    container_name = containers[0].name

                try:
                    response = core_api.read_namespaced_pod_log(
                        name=pod_name,
                        namespace=namespace,
                        container=container_name,
                        _preload_content=False,
                    )
                except ApiException as e:
                    logger.error("Error reading pod logs: %s.", e)
                else:
                    raw_data = response.data
                    decoded_log = raw_data.decode("utf-8", errors="replace")
                    logs = decoded_log.splitlines()
                    logged_lines = logged_lines_per_pod[pod_name]
                    if len(logs) > logged_lines:
                        for line in logs[logged_lines:]:
                            logger.info(line)
                        logged_lines_per_pod[pod_name] = len(logs)

        time.sleep(backoff_interval)
        if exponential_backoff and backoff_interval < maximum_backoff:
            backoff_interval *= 2
wait_pod(kube_client_fn: Callable[[], k8s_client.ApiClient], pod_name: str, namespace: str, exit_condition_lambda: Callable[[k8s_client.V1Pod], bool], timeout_sec: int = 0, exponential_backoff: bool = False, stream_logs: bool = False) -> k8s_client.V1Pod

Wait for a pod to meet an exit condition.

Parameters:

Name Type Description Default
kube_client_fn Callable[[], ApiClient]

the kube client fn is a function that is called periodically and is used to get a CoreV1Api client for the Kubernetes API. It should cache the client to avoid unnecessary overhead but should also instantiate a new client if the previous one is using credentials that are about to expire.

required
pod_name str

The name of the pod.

required
namespace str

The namespace of the pod.

required
exit_condition_lambda Callable[[V1Pod], bool]

A lambda which will be called periodically to wait for a pod to exit. The function returns True to exit.

required
timeout_sec int

Timeout in seconds to wait for pod to reach exit condition, or 0 to wait for an unlimited duration. Defaults to unlimited.

0
exponential_backoff bool

Whether to use exponential back off for polling. Defaults to False.

False
stream_logs bool

Whether to stream the pod logs to zenml.logger.info(). Defaults to False.

False

Raises:

Type Description
RuntimeError

when the function times out.

Returns:

Type Description
V1Pod

The pod object which meets the exit condition.

Source code in src/zenml/integrations/kubernetes/orchestrators/kube_utils.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def wait_pod(
    kube_client_fn: Callable[[], k8s_client.ApiClient],
    pod_name: str,
    namespace: str,
    exit_condition_lambda: Callable[[k8s_client.V1Pod], bool],
    timeout_sec: int = 0,
    exponential_backoff: bool = False,
    stream_logs: bool = False,
) -> k8s_client.V1Pod:
    """Wait for a pod to meet an exit condition.

    Args:
        kube_client_fn: the kube client fn is a function that is called
            periodically and is used to get a `CoreV1Api` client for
            the Kubernetes API. It should cache the client to avoid
            unnecessary overhead but should also instantiate a new client if
            the previous one is using credentials that are about to expire.
        pod_name: The name of the pod.
        namespace: The namespace of the pod.
        exit_condition_lambda: A lambda
            which will be called periodically to wait for a pod to exit. The
            function returns True to exit.
        timeout_sec: Timeout in seconds to wait for pod to reach exit
            condition, or 0 to wait for an unlimited duration.
            Defaults to unlimited.
        exponential_backoff: Whether to use exponential back off for polling.
            Defaults to False.
        stream_logs: Whether to stream the pod logs to
            `zenml.logger.info()`. Defaults to False.

    Raises:
        RuntimeError: when the function times out.

    Returns:
        The pod object which meets the exit condition.
    """
    start_time = utc_now()

    # Link to exponential back-off algorithm used here:
    # https://cloud.google.com/storage/docs/exponential-backoff
    backoff_interval = 1
    maximum_backoff = 32

    logged_lines = 0

    while True:
        kube_client = kube_client_fn()
        core_api = k8s_client.CoreV1Api(kube_client)

        resp = get_pod(core_api, pod_name, namespace)

        if resp is None:
            raise RuntimeError(f"Pod `{namespace}:{pod_name}` not found.")

        # Stream logs to `zenml.logger.info()`.
        # TODO: can we do this without parsing all logs every time?
        if stream_logs and pod_is_not_pending(resp):
            try:
                response = core_api.read_namespaced_pod_log(
                    name=pod_name,
                    namespace=namespace,
                    _preload_content=False,
                )
            except ApiException as e:
                logger.error(f"Error reading pod logs: {e}. Retrying...")
            else:
                raw_data = response.data
                decoded_log = raw_data.decode("utf-8", errors="replace")
                logs = decoded_log.splitlines()
                if len(logs) > logged_lines:
                    for line in logs[logged_lines:]:
                        logger.info(line)
                    logged_lines = len(logs)

        # Raise an error if the pod failed.
        if pod_failed(resp):
            raise RuntimeError(f"Pod `{namespace}:{pod_name}` failed.")

        # Check if pod is in desired state (e.g. finished / running / ...).
        if exit_condition_lambda(resp):
            return resp

        # Check if wait timed out.
        elapse_time = utc_now() - start_time
        if elapse_time.seconds >= timeout_sec and timeout_sec != 0:
            raise RuntimeError(
                f"Waiting for pod `{namespace}:{pod_name}` timed out after "
                f"{timeout_sec} seconds."
            )

        # Wait (using exponential backoff).
        time.sleep(backoff_interval)
        if exponential_backoff and backoff_interval < maximum_backoff:
            backoff_interval *= 2
kubernetes_orchestrator

Kubernetes-native orchestrator.

Classes
KubernetesOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator for running ZenML pipelines using native Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesOrchestratorConfig property

Returns the KubernetesOrchestratorConfig config.

Returns:

Type Description
KubernetesOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Defines the validator that checks whether the stack is valid.

Returns:

Type Description
Optional[StackValidator]

Stack validator.

Functions
delete_schedule(schedule: ScheduleResponse) -> None

Deletes a schedule.

Parameters:

Name Type Description Default
schedule ScheduleResponse

The schedule to delete.

required

Raises:

Type Description
RuntimeError

If the cron job name is not found.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
def delete_schedule(self, schedule: "ScheduleResponse") -> None:
    """Deletes a schedule.

    Args:
        schedule: The schedule to delete.

    Raises:
        RuntimeError: If the cron job name is not found.
    """
    cron_job_name = schedule.run_metadata.get(
        KUBERNETES_CRON_JOB_METADATA_KEY
    )
    if not cron_job_name:
        raise RuntimeError("Unable to find cron job name for schedule.")

    self._k8s_batch_api.delete_namespaced_cron_job(
        name=cron_job_name,
        namespace=self.config.kubernetes_namespace,
    )
fetch_status(run: PipelineRunResponse, include_steps: bool = False) -> Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required
include_steps bool

If True, also fetch the status of individual steps.

False

Returns:

Type Description
Optional[ExecutionStatus]

A tuple of (pipeline_status, step_statuses).

Optional[Dict[str, ExecutionStatus]]

If include_steps is False, step_statuses will be None.

Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]

If include_steps is True, step_statuses will be a dict (possibly empty).

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
def fetch_status(
    self, run: "PipelineRunResponse", include_steps: bool = False
) -> Tuple[
    Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]
]:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.
        include_steps: If True, also fetch the status of individual steps.

    Returns:
        A tuple of (pipeline_status, step_statuses).
        If include_steps is False, step_statuses will be None.
        If include_steps is True, step_statuses will be a dict (possibly empty).
    """
    pipeline_status = None
    include_run_status = not run.status.is_finished

    label_selector = f"run_id={kube_utils.sanitize_label(str(run.id))}"
    try:
        job_list = kube_utils.list_jobs(
            batch_api=self._k8s_batch_api,
            namespace=self.config.kubernetes_namespace,
            label_selector=label_selector,
        )
    except Exception as e:
        logger.warning(f"Failed to list jobs for run {run.id}: {e}")
        return None, None

    step_statuses = {}
    # Only fetch steps if we really need them
    steps_dict = run.steps if include_steps else {}

    for job in job_list.items:
        if not job.metadata or not job.metadata.annotations:
            continue

        is_orchestrator_job = (
            ORCHESTRATOR_ANNOTATION_KEY in job.metadata.annotations
        )
        if is_orchestrator_job:
            if include_run_status:
                pipeline_status = self._map_job_status_to_execution_status(
                    job
                )
            continue

        step_name = job.metadata.annotations.get(
            STEP_NAME_ANNOTATION_KEY, None
        )
        if not include_steps or not step_name:
            continue

        step_response = steps_dict.get(step_name, None)

        if step_response is None:
            continue

        # If the step is already in a finished state, skip
        if step_response and step_response.status.is_finished:
            continue

        execution_status = self._map_job_status_to_execution_status(job)
        if execution_status is not None:
            step_statuses[step_name] = execution_status

    return pipeline_status, step_statuses
get_kube_client(incluster: Optional[bool] = None) -> k8s_client.ApiClient

Getter for the Kubernetes API client.

Parameters:

Name Type Description Default
incluster Optional[bool]

Whether to use the in-cluster config or not. Overrides the incluster setting in the config.

None

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

if the Kubernetes connector behaves unexpectedly.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def get_kube_client(
    self, incluster: Optional[bool] = None
) -> k8s_client.ApiClient:
    """Getter for the Kubernetes API client.

    Args:
        incluster: Whether to use the in-cluster config or not. Overrides
            the `incluster` setting in the config.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: if the Kubernetes connector behaves unexpectedly.
    """
    if incluster is None:
        incluster = self.config.incluster

    if incluster:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            incluster=incluster,
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
get_kubernetes_contexts() -> Tuple[List[str], str]

Get list of configured Kubernetes contexts and the active context.

Raises:

Type Description
RuntimeError

if the Kubernetes configuration cannot be loaded.

Returns:

Name Type Description
context_name List[str]

List of configured Kubernetes contexts

active_context_name str

Name of the active Kubernetes context.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
    """Get list of configured Kubernetes contexts and the active context.

    Raises:
        RuntimeError: if the Kubernetes configuration cannot be loaded.

    Returns:
        context_name: List of configured Kubernetes contexts
        active_context_name: Name of the active Kubernetes context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException as e:
        raise RuntimeError(
            "Could not load the Kubernetes configuration"
        ) from e

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_KUBERNETES_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
876
877
878
879
880
881
882
883
884
885
886
887
888
889
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    return {
        METADATA_ORCHESTRATOR_RUN_ID: self.get_orchestrator_run_id(),
    }
get_token_secret_name(deployment_id: UUID) -> str

Returns the name of the secret that contains the ZenML token.

Parameters:

Name Type Description Default
deployment_id UUID

The ID of the deployment.

required

Returns:

Type Description
str

The name of the secret that contains the ZenML token.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
372
373
374
375
376
377
378
379
380
381
def get_token_secret_name(self, deployment_id: UUID) -> str:
    """Returns the name of the secret that contains the ZenML token.

    Args:
        deployment_id: The ID of the deployment.

    Returns:
        The name of the secret that contains the ZenML token.
    """
    return f"zenml-token-{deployment_id}"
should_build_pipeline_image(deployment: PipelineDeploymentBase) -> bool

Whether to always build the pipeline image.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment.

required

Returns:

Type Description
bool

Whether to always build the pipeline image.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def should_build_pipeline_image(
    self, deployment: "PipelineDeploymentBase"
) -> bool:
    """Whether to always build the pipeline image.

    Args:
        deployment: The pipeline deployment.

    Returns:
        Whether to always build the pipeline image.
    """
    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )
    return settings.always_build_pipeline_image
submit_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Optional[SubmissionResult]

Submits a pipeline to the orchestrator.

This method should only submit the pipeline and not wait for it to complete. If the orchestrator is configured to wait for the pipeline run to complete, a function that waits for the pipeline run to complete can be passed as part of the submission result.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to submit.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment. These don't need to be set if running locally.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If a schedule without cron expression is given.

Exception

If the orchestrator pod fails to start.

Returns:

Type Description
Optional[SubmissionResult]

Optional submission result.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def submit_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Optional[SubmissionResult]:
    """Submits a pipeline to the orchestrator.

    This method should only submit the pipeline and not wait for it to
    complete. If the orchestrator is configured to wait for the pipeline run
    to complete, a function that waits for the pipeline run to complete can
    be passed as part of the submission result.

    Args:
        deployment: The pipeline deployment to submit.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment. These don't need to be set if running locally.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If a schedule without cron expression is given.
        Exception: If the orchestrator pod fails to start.

    Returns:
        Optional submission result.
    """
    for step_name, step in deployment.step_configurations.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not yet supported for "
                "the Kubernetes orchestrator, ignoring resource "
                "configuration for step %s.",
                step_name,
            )

        if retry_config := step.config.retry:
            if retry_config.delay or retry_config.backoff:
                logger.warning(
                    "Specifying retry delay or backoff is not supported "
                    "for the Kubernetes orchestrator."
                )

    pipeline_name = deployment.pipeline_configuration.name
    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    # Build entrypoint command and args for the orchestrator pod.
    # This will internally also build the command/args for all step pods.
    command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
    args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
        deployment_id=deployment.id,
        run_id=placeholder_run.id if placeholder_run else None,
    )

    # Authorize pod to run Kubernetes commands inside the cluster.
    service_account_name = self._get_service_account_name(settings)

    # We set some default minimum resource requests for the orchestrator pod
    # here if the user has not specified any, because the orchestrator pod
    # takes up some memory resources itself and, if not specified, the pod
    # will be scheduled on any node regardless of available memory and risk
    # negatively impacting or even crashing the node due to memory pressure.
    orchestrator_pod_settings = kube_utils.apply_default_resource_requests(
        memory="400Mi",
        cpu="100m",
        pod_settings=settings.orchestrator_pod_settings,
    )

    if self.config.pass_zenml_token_as_secret:
        secret_name = self.get_token_secret_name(deployment.id)
        token = environment.pop("ZENML_STORE_API_TOKEN")
        kube_utils.create_or_update_secret(
            core_api=self._k8s_core_api,
            namespace=self.config.kubernetes_namespace,
            secret_name=secret_name,
            data={KUBERNETES_SECRET_TOKEN_KEY_NAME: token},
        )
        orchestrator_pod_settings.env.append(
            {
                "name": "ZENML_STORE_API_TOKEN",
                "valueFrom": {
                    "secretKeyRef": {
                        "name": secret_name,
                        "key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
                    }
                },
            }
        )

    orchestrator_pod_labels = {
        "pipeline": kube_utils.sanitize_label(pipeline_name),
    }

    if placeholder_run:
        orchestrator_pod_labels["run_id"] = kube_utils.sanitize_label(
            str(placeholder_run.id)
        )
        orchestrator_pod_labels["run_name"] = kube_utils.sanitize_label(
            placeholder_run.name
        )

    pod_manifest = build_pod_manifest(
        pod_name=None,
        image_name=image,
        command=command,
        args=args,
        privileged=False,
        pod_settings=orchestrator_pod_settings,
        service_account_name=service_account_name,
        env=environment,
        labels=orchestrator_pod_labels,
        mount_local_stores=self.config.is_local,
        termination_grace_period_seconds=settings.pod_stop_grace_period,
    )

    pod_failure_policy = settings.pod_failure_policy or {
        # These rules are applied sequentially. This means any failure in
        # the main container will count towards the max retries. Any other
        # disruption will not count towards the max retries.
        "rules": [
            # If the main container fails, we count it towards the max
            # retries.
            {
                "action": "Count",
                "onExitCodes": {
                    "containerName": "main",
                    "operator": "NotIn",
                    "values": [0],
                },
            },
            # If the pod is interrupted at any other time, we don't count
            # it as a retry
            {
                "action": "Ignore",
                "onPodConditions": [
                    {
                        "type": "DisruptionTarget",
                        "status": "True",
                    }
                ],
            },
        ]
    }

    job_name = settings.job_name_prefix or ""
    random_prefix = "".join(random.choices("0123456789abcdef", k=8))
    job_name += (
        f"-{random_prefix}-{deployment.pipeline_configuration.name}"
    )
    # The job name will be used as a label on the pods, so we need to make
    # sure it doesn't exceed the label length limit
    job_name = kube_utils.sanitize_label(job_name)

    job_manifest = build_job_manifest(
        job_name=job_name,
        pod_template=pod_template_manifest_from_pod(pod_manifest),
        backoff_limit=settings.orchestrator_job_backoff_limit,
        ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
        active_deadline_seconds=settings.active_deadline_seconds,
        pod_failure_policy=pod_failure_policy,
        labels=orchestrator_pod_labels,
        annotations={
            ORCHESTRATOR_ANNOTATION_KEY: str(self.id),
        },
    )

    if deployment.schedule:
        if not deployment.schedule.cron_expression:
            raise RuntimeError(
                "The Kubernetes orchestrator only supports scheduling via "
                "CRON jobs, but the run was configured with a manual "
                "schedule. Use `Schedule(cron_expression=...)` instead."
            )
        cron_expression = deployment.schedule.cron_expression
        cron_job_manifest = build_cron_job_manifest(
            cron_expression=cron_expression,
            job_template=job_template_manifest_from_job(job_manifest),
            successful_jobs_history_limit=settings.successful_jobs_history_limit,
            failed_jobs_history_limit=settings.failed_jobs_history_limit,
        )

        cron_job = self._k8s_batch_api.create_namespaced_cron_job(
            body=cron_job_manifest,
            namespace=self.config.kubernetes_namespace,
        )
        logger.info(
            f"Created Kubernetes CronJob `{cron_job.metadata.name}` "
            f"with CRON expression `{cron_expression}`."
        )
        return SubmissionResult(
            metadata={
                KUBERNETES_CRON_JOB_METADATA_KEY: cron_job.metadata.name,
            }
        )
    else:
        try:
            kube_utils.create_job(
                batch_api=self._k8s_batch_api,
                namespace=self.config.kubernetes_namespace,
                job_manifest=job_manifest,
            )
        except Exception as e:
            if self.config.pass_zenml_token_as_secret:
                secret_name = self.get_token_secret_name(deployment.id)
                try:
                    kube_utils.delete_secret(
                        core_api=self._k8s_core_api,
                        namespace=self.config.kubernetes_namespace,
                        secret_name=secret_name,
                    )
                except Exception as cleanup_error:
                    logger.error(
                        "Error cleaning up secret %s: %s",
                        secret_name,
                        cleanup_error,
                    )
            raise e

        if settings.synchronous:

            def _wait_for_run_to_finish() -> None:
                logger.info("Waiting for orchestrator job to finish...")
                kube_utils.wait_for_job_to_finish(
                    batch_api=self._k8s_batch_api,
                    core_api=self._k8s_core_api,
                    namespace=self.config.kubernetes_namespace,
                    job_name=job_name,
                    backoff_interval=settings.job_monitoring_interval,
                    fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
                    stream_logs=True,
                )

            return SubmissionResult(
                wait_for_completion=_wait_for_run_to_finish,
            )
        else:
            logger.info(
                f"Orchestrator job `{job_name}` started. "
                f"Run the following command to inspect the logs: "
                f"`kubectl -n {self.config.kubernetes_namespace} logs "
                f"job/{job_name}`"
            )
            return None
update_schedule(schedule: ScheduleResponse, update: ScheduleUpdate) -> None

Updates a schedule.

Parameters:

Name Type Description Default
schedule ScheduleResponse

The schedule to update.

required
update ScheduleUpdate

The update to apply to the schedule.

required

Raises:

Type Description
RuntimeError

If the cron job name is not found.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
def update_schedule(
    self, schedule: "ScheduleResponse", update: ScheduleUpdate
) -> None:
    """Updates a schedule.

    Args:
        schedule: The schedule to update.
        update: The update to apply to the schedule.

    Raises:
        RuntimeError: If the cron job name is not found.
    """
    cron_job_name = schedule.run_metadata.get(
        KUBERNETES_CRON_JOB_METADATA_KEY
    )
    if not cron_job_name:
        raise RuntimeError("Unable to find cron job name for schedule.")

    if update.cron_expression:
        self._k8s_batch_api.patch_namespaced_cron_job(
            name=cron_job_name,
            namespace=self.config.kubernetes_namespace,
            body={"spec": {"schedule": update.cron_expression}},
        )
Functions Modules
kubernetes_orchestrator_entrypoint

Entrypoint of the Kubernetes master/orchestrator pod.

Classes Functions
main() -> None

Entrypoint of the k8s master/orchestrator pod.

Raises:

Type Description
RuntimeError

If the orchestrator pod is not associated with a job.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
def main() -> None:
    """Entrypoint of the k8s master/orchestrator pod.

    Raises:
        RuntimeError: If the orchestrator pod is not associated with a job.
    """
    logger.info("Orchestrator pod started.")

    args = parse_args()

    orchestrator_pod_name = socket.gethostname()

    client = Client()
    deployment = client.get_deployment(args.deployment_id)
    active_stack = client.active_stack
    orchestrator = active_stack.orchestrator
    assert isinstance(orchestrator, KubernetesOrchestrator)
    namespace = orchestrator.config.kubernetes_namespace

    pipeline_settings = cast(
        KubernetesOrchestratorSettings,
        orchestrator.get_settings(deployment),
    )

    # Get a Kubernetes client from the active Kubernetes orchestrator, but
    # override the `incluster` setting to `True` since we are running inside
    # the Kubernetes cluster.
    api_client_config = orchestrator.get_kube_client(
        incluster=True
    ).configuration
    api_client_config.connection_pool_maxsize = (
        pipeline_settings.max_parallelism
    )
    kube_client = k8s_client.ApiClient(api_client_config)
    core_api = k8s_client.CoreV1Api(kube_client)
    batch_api = k8s_client.BatchV1Api(kube_client)

    job_name = kube_utils.get_parent_job_name(
        core_api=core_api,
        pod_name=orchestrator_pod_name,
        namespace=namespace,
    )
    if not job_name:
        raise RuntimeError("Failed to fetch job name for orchestrator pod.")

    run_id, orchestrator_run_id = _get_orchestrator_job_state(
        batch_api=batch_api,
        namespace=namespace,
        job_name=job_name,
    )
    existing_logs_response = None

    if run_id and orchestrator_run_id:
        logger.info("Continuing existing run `%s`.", run_id)
        pipeline_run = client.get_pipeline_run(run_id)
        nodes = _reconstruct_nodes(
            deployment=deployment,
            pipeline_run=pipeline_run,
            namespace=namespace,
            batch_api=batch_api,
        )
        logger.debug("Reconstructed nodes: %s", nodes)

        # Continue logging to the same log file if it exists
        for log_response in pipeline_run.log_collection or []:
            if log_response.source == "orchestrator":
                existing_logs_response = log_response
                break
    else:
        orchestrator_run_id = orchestrator_pod_name
        if args.run_id:
            pipeline_run = client.get_pipeline_run(args.run_id)
        else:
            pipeline_run = create_placeholder_run(
                deployment=deployment,
                orchestrator_run_id=orchestrator_run_id,
            )

        # Store in the job annotations so we can continue the run if the pod
        # is restarted
        kube_utils.update_job(
            batch_api=batch_api,
            namespace=namespace,
            job_name=job_name,
            annotations={
                RUN_ID_ANNOTATION_KEY: str(pipeline_run.id),
                ORCHESTRATOR_RUN_ID_ANNOTATION_KEY: orchestrator_run_id,
            },
        )
        nodes = [
            Node(id=step_name, upstream_nodes=step.spec.upstream_steps)
            for step_name, step in deployment.step_configurations.items()
        ]

    logs_context = setup_orchestrator_logging(
        run_id=pipeline_run.id,
        deployment=deployment,
        logs_response=existing_logs_response,
    )

    with logs_context:
        step_command = StepEntrypointConfiguration.get_entrypoint_command()
        mount_local_stores = active_stack.orchestrator.config.is_local

        env = get_config_environment_vars()
        env[ENV_ZENML_KUBERNETES_RUN_ID] = orchestrator_run_id

        try:
            owner_references = kube_utils.get_pod_owner_references(
                core_api=core_api,
                pod_name=orchestrator_pod_name,
                namespace=namespace,
            )
        except Exception as e:
            logger.warning(f"Failed to get pod owner references: {str(e)}")
            owner_references = []
        else:
            # Make sure None of the owner references are marked as controllers of
            # the created pod, which messes with the garbage collection logic.
            for owner_reference in owner_references:
                owner_reference.controller = False

        step_run_request_factory = StepRunRequestFactory(
            deployment=deployment,
            pipeline_run=pipeline_run,
            stack=active_stack,
        )
        step_runs = {}

        base_labels = {
            "run_id": kube_utils.sanitize_label(str(pipeline_run.id)),
            "run_name": kube_utils.sanitize_label(str(pipeline_run.name)),
            "pipeline": kube_utils.sanitize_label(
                deployment.pipeline_configuration.name
            ),
        }

        def _cache_step_run_if_possible(step_name: str) -> bool:
            if not step_run_request_factory.has_caching_enabled(step_name):
                return False

            step_run_request = step_run_request_factory.create_request(
                step_name
            )
            try:
                step_run_request_factory.populate_request(step_run_request)
            except Exception as e:
                logger.error(
                    f"Failed to populate step run request for step {step_name}: {e}"
                )
                return False

            if step_run_request.status == ExecutionStatus.CACHED:
                step_run = publish_cached_step_run(
                    step_run_request, pipeline_run
                )
                step_runs[step_name] = step_run
                logger.info("Using cached version of step `%s`.", step_name)
                return True

            return False

        startup_lock = threading.Lock()
        last_startup_time: float = 0.0

        def start_step_job(node: Node) -> NodeStatus:
            """Run a pipeline step in a separate Kubernetes pod.

            Args:
                node: The node to start.

            Returns:
                The status of the node.
            """
            step_name = node.id
            step_config = deployment.step_configurations[step_name].config
            settings = step_config.settings.get(
                "orchestrator.kubernetes", None
            )
            settings = KubernetesOrchestratorSettings.model_validate(
                settings.model_dump() if settings else {}
            )
            if not pipeline_settings.prevent_orchestrator_pod_caching:
                if _cache_step_run_if_possible(step_name):
                    return NodeStatus.COMPLETED

            step_labels = base_labels.copy()
            step_labels["step_name"] = kube_utils.sanitize_label(step_name)
            step_annotations = {
                STEP_NAME_ANNOTATION_KEY: step_name,
            }

            image = KubernetesOrchestrator.get_image(
                deployment=deployment, step_name=step_name
            )
            step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name, deployment_id=deployment.id
            )

            # We set some default minimum memory resource requests for the step pod
            # here if the user has not specified any, because the step pod takes up
            # some memory resources itself and, if not specified, the pod will be
            # scheduled on any node regardless of available memory and risk
            # negatively impacting or even crashing the node due to memory pressure.
            pod_settings = kube_utils.apply_default_resource_requests(
                memory="400Mi",
                pod_settings=settings.pod_settings,
            )

            if orchestrator.config.pass_zenml_token_as_secret:
                env.pop("ZENML_STORE_API_TOKEN", None)
                secret_name = orchestrator.get_token_secret_name(deployment.id)
                pod_settings.env.append(
                    {
                        "name": "ZENML_STORE_API_TOKEN",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": secret_name,
                                "key": KUBERNETES_SECRET_TOKEN_KEY_NAME,
                            }
                        },
                    }
                )

            pod_manifest = build_pod_manifest(
                pod_name=None,
                image_name=image,
                command=step_command,
                args=step_args,
                env=env,
                privileged=settings.privileged,
                pod_settings=pod_settings,
                service_account_name=settings.step_pod_service_account_name
                or settings.service_account_name,
                mount_local_stores=mount_local_stores,
                termination_grace_period_seconds=settings.pod_stop_grace_period,
                labels=step_labels,
            )

            retry_config = step_config.retry
            backoff_limit = (
                retry_config.max_retries if retry_config else 0
            ) + settings.backoff_limit_margin

            pod_failure_policy = settings.pod_failure_policy or {
                # These rules are applied sequentially. This means any failure in
                # the main container will count towards the max retries. Any other
                # disruption will not count towards the max retries.
                "rules": [
                    # If the main container fails, we count it towards the max
                    # retries.
                    {
                        "action": "Count",
                        "onExitCodes": {
                            "containerName": "main",
                            "operator": "NotIn",
                            "values": [0],
                        },
                    },
                    # If the pod is interrupted at any other time, we don't count
                    # it as a retry
                    {
                        "action": "Ignore",
                        "onPodConditions": [
                            {
                                "type": "DisruptionTarget",
                            }
                        ],
                    },
                ]
            }

            job_name = settings.job_name_prefix or ""
            random_prefix = "".join(random.choices("0123456789abcdef", k=8))
            job_name += f"-{random_prefix}-{step_name}-{deployment.pipeline_configuration.name}"
            # The job name will be used as a label on the pods, so we need to make
            # sure it doesn't exceed the label length limit
            job_name = kube_utils.sanitize_label(job_name)

            job_manifest = build_job_manifest(
                job_name=job_name,
                pod_template=pod_template_manifest_from_pod(pod_manifest),
                backoff_limit=backoff_limit,
                ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
                active_deadline_seconds=settings.active_deadline_seconds,
                pod_failure_policy=pod_failure_policy,
                owner_references=owner_references,
                labels=step_labels,
                annotations=step_annotations,
            )

            if (
                startup_interval
                := orchestrator.config.parallel_step_startup_waiting_period
            ):
                nonlocal last_startup_time

                with startup_lock:
                    now = time.time()
                    time_since_last_startup = now - last_startup_time
                    sleep_time = startup_interval - time_since_last_startup
                    if sleep_time > 0:
                        logger.debug(
                            f"Sleeping for {sleep_time} seconds before "
                            f"starting job for step {step_name}."
                        )
                        time.sleep(sleep_time)
                    last_startup_time = now

            kube_utils.create_job(
                batch_api=batch_api,
                namespace=namespace,
                job_manifest=job_manifest,
            )

            node.metadata["job_name"] = job_name

            return NodeStatus.RUNNING

        def check_job_status(node: Node) -> NodeStatus:
            """Check the status of a job.

            Args:
                node: The node to check.

            Returns:
                The status of the node.
            """
            step_name = node.id
            job_name = node.metadata.get("job_name", None)
            if not job_name:
                logger.error(
                    "Missing job name to monitor step `%s`.", step_name
                )
                return NodeStatus.FAILED

            step_config = deployment.step_configurations[step_name].config
            settings = step_config.settings.get(
                "orchestrator.kubernetes", None
            )
            settings = KubernetesOrchestratorSettings.model_validate(
                settings.model_dump() if settings else {}
            )
            status, error_message = kube_utils.check_job_status(
                batch_api=batch_api,
                core_api=core_api,
                namespace=namespace,
                job_name=job_name,
                fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
            )
            if status == kube_utils.JobStatus.SUCCEEDED:
                return NodeStatus.COMPLETED
            elif status == kube_utils.JobStatus.FAILED:
                logger.error(
                    "Job for step `%s` failed: %s",
                    step_name,
                    error_message,
                )
                return NodeStatus.FAILED
            else:
                return NodeStatus.RUNNING

        def should_interrupt_execution() -> Optional[InterruptMode]:
            """Check if the DAG execution should be interrupted.

            Returns:
                If the DAG execution should be interrupted.
            """
            try:
                run = client.get_pipeline_run(
                    name_id_or_prefix=pipeline_run.id,
                    project=pipeline_run.project_id,
                    hydrate=False,  # We only need status, not full hydration
                )

                if run.status in [
                    ExecutionStatus.STOPPING,
                    ExecutionStatus.STOPPED,
                ]:
                    logger.info(
                        "Stopping DAG execution because pipeline run is in "
                        "`%s` state.",
                        run.status,
                    )
                    return InterruptMode.GRACEFUL
            except Exception as e:
                logger.warning(
                    "Failed to check pipeline cancellation status: %s", e
                )

            return None

        try:
            nodes_statuses = DagRunner(
                nodes=nodes,
                node_startup_function=start_step_job,
                node_monitoring_function=check_job_status,
                interrupt_function=should_interrupt_execution,
                monitoring_interval=pipeline_settings.job_monitoring_interval,
                monitoring_delay=pipeline_settings.job_monitoring_delay,
                interrupt_check_interval=pipeline_settings.interrupt_check_interval,
                max_parallelism=pipeline_settings.max_parallelism,
            ).run()
        finally:
            if (
                orchestrator.config.pass_zenml_token_as_secret
                and deployment.schedule is None
            ):
                secret_name = orchestrator.get_token_secret_name(deployment.id)
                try:
                    kube_utils.delete_secret(
                        core_api=core_api,
                        namespace=namespace,
                        secret_name=secret_name,
                    )
                except ApiException as e:
                    logger.error(
                        f"Error cleaning up secret {secret_name}: {e}"
                    )

        try:
            pipeline_failed = False
            failed_step_names = [
                step_name
                for step_name, node_state in nodes_statuses.items()
                if node_state == NodeStatus.FAILED
            ]
            skipped_step_names = [
                step_name
                for step_name, node_state in nodes_statuses.items()
                if node_state == NodeStatus.SKIPPED
            ]

            if failed_step_names:
                logger.error(
                    "The following steps failed: %s",
                    ", ".join(failed_step_names),
                )
            if skipped_step_names:
                logger.error(
                    "The following steps were skipped because some of their "
                    "upstream steps failed: %s",
                    ", ".join(skipped_step_names),
                )

            step_runs = fetch_step_runs_by_names(
                step_run_names=failed_step_names, pipeline_run=pipeline_run
            )

            for step_name, node_state in nodes_statuses.items():
                if node_state != NodeStatus.FAILED:
                    continue

                pipeline_failed = True

                if step_run := step_runs.get(step_name, None):
                    # Try to update the step run status, if it exists and is in
                    # a transient state.
                    if step_run and step_run.status in {
                        ExecutionStatus.INITIALIZING,
                        ExecutionStatus.RUNNING,
                    }:
                        publish_utils.publish_failed_step_run(step_run.id)

            # If any steps failed and the pipeline run is still in a transient
            # state, we need to mark it as failed.
            if pipeline_failed and pipeline_run.status in {
                ExecutionStatus.INITIALIZING,
                ExecutionStatus.RUNNING,
            }:
                publish_utils.publish_failed_pipeline_run(pipeline_run.id)
        except AuthorizationException:
            # If a step of the pipeline failed or all of them completed
            # successfully, the pipeline run will be finished and the API token
            # will be invalidated. We catch this exception and do nothing here,
            # as the pipeline run status will already have been published.
            pass

        logger.info("Orchestrator pod finished.")
parse_args() -> argparse.Namespace

Parse entrypoint arguments.

Returns:

Type Description
Namespace

Parsed args.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
75
76
77
78
79
80
81
82
83
84
def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.

    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--deployment_id", type=str, required=True)
    parser.add_argument("--run_id", type=str, required=False)
    return parser.parse_args()
Modules
kubernetes_orchestrator_entrypoint_configuration

Entrypoint configuration for the Kubernetes master/orchestrator pod.

Classes
KubernetesOrchestratorEntrypointConfiguration

Entrypoint configuration for the k8s master/orchestrator pod.

Functions
get_entrypoint_arguments(deployment_id: UUID, run_id: Optional[UUID] = None) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
deployment_id UUID

ID of the deployment.

required
run_id Optional[UUID]

Optional ID of the pipeline run. Not set for scheduled runs.

None

Returns:

Type Description
List[str]

List of entrypoint arguments.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@classmethod
def get_entrypoint_arguments(
    cls,
    deployment_id: "UUID",
    run_id: Optional["UUID"] = None,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        deployment_id: ID of the deployment.
        run_id: Optional ID of the pipeline run. Not set for scheduled runs.

    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
    ]

    if run_id:
        args.append(f"--{RUN_ID_OPTION}")
        args.append(str(run_id))

    return args
get_entrypoint_command() -> List[str] classmethod

Returns a command that runs the entrypoint module.

Returns:

Type Description
List[str]

Entrypoint command.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
40
41
42
43
44
45
46
47
48
49
50
51
52
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options() -> Set[str] classmethod

Gets all the options required for running this entrypoint.

Returns:

Type Description
Set[str]

Entrypoint options.

Source code in src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
28
29
30
31
32
33
34
35
36
37
38
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.

    Returns:
        Entrypoint options.
    """
    options = {
        DEPLOYMENT_ID_OPTION,
    }
    return options
manifest_utils

Utility functions for building manifests for k8s pods.

Classes Functions
add_local_stores_mount(pod_spec: k8s_client.V1PodSpec) -> None

Makes changes in place to the configuration of the pod spec.

Configures mounted volumes for stack components that write to a local path.

Parameters:

Name Type Description Default
pod_spec V1PodSpec

The pod spec to update.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def add_local_stores_mount(
    pod_spec: k8s_client.V1PodSpec,
) -> None:
    """Makes changes in place to the configuration of the pod spec.

    Configures mounted volumes for stack components that write to a local
    path.

    Args:
        pod_spec: The pod spec to update.
    """
    assert len(pod_spec.containers) == 1
    container_spec: k8s_client.V1Container = pod_spec.containers[0]

    stack = Client().active_stack

    stack.check_local_paths()

    local_stores_path = GlobalConfiguration().local_stores_path

    host_path = k8s_client.V1HostPathVolumeSource(
        path=local_stores_path, type="Directory"
    )

    pod_spec.volumes = pod_spec.volumes or []
    pod_spec.volumes.append(
        k8s_client.V1Volume(
            name="local-stores",
            host_path=host_path,
        )
    )
    container_spec.volume_mounts = container_spec.volume_mounts or []
    container_spec.volume_mounts.append(
        k8s_client.V1VolumeMount(
            name="local-stores",
            mount_path=local_stores_path,
        )
    )

    if sys.platform == "win32":
        # File permissions are not checked on Windows. This if clause
        # prevents mypy from complaining about unused 'type: ignore'
        # statements
        pass
    else:
        # Run KFP containers in the context of the local UID/GID
        # to ensure that the local stores can be shared
        # with the local pipeline runs.
        pod_spec.security_context = k8s_client.V1SecurityContext(
            run_as_user=os.getuid(),
            run_as_group=os.getgid(),
        )

    container_spec.env = container_spec.env or []
    container_spec.env.append(
        k8s_client.V1EnvVar(
            name=ENV_ZENML_LOCAL_STORES_PATH,
            value=local_stores_path,
        )
    )
add_pod_settings(pod_spec: k8s_client.V1PodSpec, settings: KubernetesPodSettings) -> None

Updates pod spec fields in place if passed in orchestrator settings.

Parameters:

Name Type Description Default
pod_spec V1PodSpec

Pod spec to update.

required
settings KubernetesPodSettings

Pod settings to apply.

required
Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def add_pod_settings(
    pod_spec: k8s_client.V1PodSpec,
    settings: KubernetesPodSettings,
) -> None:
    """Updates pod `spec` fields in place if passed in orchestrator settings.

    Args:
        pod_spec: Pod spec to update.
        settings: Pod settings to apply.
    """
    if settings.node_selectors:
        pod_spec.node_selector = settings.node_selectors

    if settings.affinity:
        pod_spec.affinity = settings.affinity

    if settings.tolerations:
        pod_spec.tolerations = settings.tolerations

    for container in pod_spec.containers:
        assert isinstance(container, k8s_client.V1Container)
        container._resources = settings.resources
        if settings.volume_mounts:
            if container.volume_mounts:
                container.volume_mounts.extend(settings.volume_mounts)
            else:
                container.volume_mounts = settings.volume_mounts

        if settings.env:
            if container.env:
                container.env.extend(settings.env)
            else:
                container.env = settings.env

        if settings.env_from:
            if container.env_from:
                container.env_from.extend(settings.env_from)
            else:
                container.env_from = settings.env_from

    if settings.volumes:
        if pod_spec.volumes:
            pod_spec.volumes.extend(settings.volumes)
        else:
            pod_spec.volumes = settings.volumes

    if settings.host_ipc:
        pod_spec.host_ipc = settings.host_ipc

    if settings.scheduler_name:
        pod_spec.scheduler_name = settings.scheduler_name

    for key, value in settings.additional_pod_spec_args.items():
        if not hasattr(pod_spec, key):
            logger.warning(f"Ignoring invalid Pod Spec argument `{key}`.")
        else:
            if value is None:
                continue

            existing_value = getattr(pod_spec, key)
            if isinstance(existing_value, list):
                existing_value.extend(value)
            elif isinstance(existing_value, dict):
                existing_value.update(value)
            else:
                setattr(pod_spec, key, value)
build_cron_job_manifest(job_template: k8s_client.V1JobTemplateSpec, cron_expression: str, successful_jobs_history_limit: Optional[int] = None, failed_jobs_history_limit: Optional[int] = None) -> k8s_client.V1CronJob

Build a Kubernetes cron job manifest.

Parameters:

Name Type Description Default
job_template V1JobTemplateSpec

The job template to use for the cron job.

required
cron_expression str

The cron expression to use for the cron job.

required
successful_jobs_history_limit Optional[int]

The number of successful jobs to keep.

None
failed_jobs_history_limit Optional[int]

The number of failed jobs to keep.

None

Returns:

Type Description
V1CronJob

The Kubernetes cron job manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def build_cron_job_manifest(
    job_template: k8s_client.V1JobTemplateSpec,
    cron_expression: str,
    successful_jobs_history_limit: Optional[int] = None,
    failed_jobs_history_limit: Optional[int] = None,
) -> k8s_client.V1CronJob:
    """Build a Kubernetes cron job manifest.

    Args:
        job_template: The job template to use for the cron job.
        cron_expression: The cron expression to use for the cron job.
        successful_jobs_history_limit: The number of successful jobs to keep.
        failed_jobs_history_limit: The number of failed jobs to keep.

    Returns:
        The Kubernetes cron job manifest.
    """
    spec = k8s_client.V1CronJobSpec(
        schedule=cron_expression,
        successful_jobs_history_limit=successful_jobs_history_limit,
        failed_jobs_history_limit=failed_jobs_history_limit,
        job_template=job_template,
    )

    return k8s_client.V1CronJob(
        kind="CronJob",
        api_version="batch/v1",
        metadata=job_template.metadata,
        spec=spec,
    )
build_job_manifest(job_name: str, pod_template: k8s_client.V1PodTemplateSpec, backoff_limit: Optional[int] = None, ttl_seconds_after_finished: Optional[int] = None, labels: Optional[Dict[str, str]] = None, annotations: Optional[Dict[str, str]] = None, active_deadline_seconds: Optional[int] = None, pod_failure_policy: Optional[Dict[str, Any]] = None, owner_references: Optional[List[k8s_client.V1OwnerReference]] = None) -> k8s_client.V1Job

Build a Kubernetes job manifest.

Parameters:

Name Type Description Default
job_name str

Name of the job.

required
pod_template V1PodTemplateSpec

The pod template to use for the job.

required
backoff_limit Optional[int]

The backoff limit for the job.

None
ttl_seconds_after_finished Optional[int]

The TTL seconds after finished for the job.

None
labels Optional[Dict[str, str]]

The labels to use for the job.

None
annotations Optional[Dict[str, str]]

The annotations to use for the job.

None
active_deadline_seconds Optional[int]

The active deadline seconds for the job.

None
pod_failure_policy Optional[Dict[str, Any]]

The pod failure policy for the job.

None
owner_references Optional[List[V1OwnerReference]]

The owner references for the job.

None

Returns:

Type Description
V1Job

The Kubernetes job manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def build_job_manifest(
    job_name: str,
    pod_template: k8s_client.V1PodTemplateSpec,
    backoff_limit: Optional[int] = None,
    ttl_seconds_after_finished: Optional[int] = None,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    active_deadline_seconds: Optional[int] = None,
    pod_failure_policy: Optional[Dict[str, Any]] = None,
    owner_references: Optional[List[k8s_client.V1OwnerReference]] = None,
) -> k8s_client.V1Job:
    """Build a Kubernetes job manifest.

    Args:
        job_name: Name of the job.
        pod_template: The pod template to use for the job.
        backoff_limit: The backoff limit for the job.
        ttl_seconds_after_finished: The TTL seconds after finished for the job.
        labels: The labels to use for the job.
        annotations: The annotations to use for the job.
        active_deadline_seconds: The active deadline seconds for the job.
        pod_failure_policy: The pod failure policy for the job.
        owner_references: The owner references for the job.

    Returns:
        The Kubernetes job manifest.
    """
    job_spec = k8s_client.V1JobSpec(
        template=pod_template,
        backoff_limit=backoff_limit,
        parallelism=1,
        ttl_seconds_after_finished=ttl_seconds_after_finished,
        active_deadline_seconds=active_deadline_seconds,
        pod_failure_policy=pod_failure_policy,
    )
    job_metadata = k8s_client.V1ObjectMeta(
        name=job_name,
        labels=labels,
        annotations=annotations,
        owner_references=owner_references,
    )

    return k8s_client.V1Job(spec=job_spec, metadata=job_metadata)
build_namespace_manifest(namespace: str) -> Dict[str, Any]

Build the manifest for a new namespace.

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace.

required

Returns:

Type Description
Dict[str, Any]

Manifest of the new namespace.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def build_namespace_manifest(namespace: str) -> Dict[str, Any]:
    """Build the manifest for a new namespace.

    Args:
        namespace: Kubernetes namespace.

    Returns:
        Manifest of the new namespace.
    """
    return {
        "apiVersion": "v1",
        "kind": "Namespace",
        "metadata": {
            "name": namespace,
        },
    }
build_pod_manifest(pod_name: Optional[str], image_name: str, command: List[str], args: List[str], privileged: bool, pod_settings: Optional[KubernetesPodSettings] = None, service_account_name: Optional[str] = None, env: Optional[Dict[str, str]] = None, labels: Optional[Dict[str, str]] = None, mount_local_stores: bool = False, owner_references: Optional[List[k8s_client.V1OwnerReference]] = None, termination_grace_period_seconds: Optional[int] = 30) -> k8s_client.V1Pod

Build a Kubernetes pod manifest for a ZenML run or step.

Parameters:

Name Type Description Default
pod_name Optional[str]

Name of the pod.

required
image_name str

Name of the Docker image.

required
command List[str]

Command to execute the entrypoint in the pod.

required
args List[str]

Arguments provided to the entrypoint command.

required
privileged bool

Whether to run the container in privileged mode.

required
pod_settings Optional[KubernetesPodSettings]

Optional settings for the pod.

None
service_account_name Optional[str]

Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster.

None
env Optional[Dict[str, str]]

Environment variables to set.

None
labels Optional[Dict[str, str]]

Labels to add to the pod.

None
mount_local_stores bool

Whether to mount the local stores path inside the pod.

False
owner_references Optional[List[V1OwnerReference]]

List of owner references for the pod.

None
termination_grace_period_seconds Optional[int]

The amount of seconds to wait for a pod to shutdown gracefully.

30

Returns:

Type Description
V1Pod

Pod manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def build_pod_manifest(
    pod_name: Optional[str],
    image_name: str,
    command: List[str],
    args: List[str],
    privileged: bool,
    pod_settings: Optional[KubernetesPodSettings] = None,
    service_account_name: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    labels: Optional[Dict[str, str]] = None,
    mount_local_stores: bool = False,
    owner_references: Optional[List[k8s_client.V1OwnerReference]] = None,
    termination_grace_period_seconds: Optional[int] = 30,
) -> k8s_client.V1Pod:
    """Build a Kubernetes pod manifest for a ZenML run or step.

    Args:
        pod_name: Name of the pod.
        image_name: Name of the Docker image.
        command: Command to execute the entrypoint in the pod.
        args: Arguments provided to the entrypoint command.
        privileged: Whether to run the container in privileged mode.
        pod_settings: Optional settings for the pod.
        service_account_name: Optional name of a service account.
            Can be used to assign certain roles to a pod, e.g., to allow it to
            run Kubernetes commands from within the cluster.
        env: Environment variables to set.
        labels: Labels to add to the pod.
        mount_local_stores: Whether to mount the local stores path inside the
            pod.
        owner_references: List of owner references for the pod.
        termination_grace_period_seconds: The amount of seconds to wait for a
            pod to shutdown gracefully.

    Returns:
        Pod manifest.
    """
    env = env.copy() if env else {}
    env.setdefault(ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, "False")

    security_context = k8s_client.V1SecurityContext(privileged=privileged)
    container_spec = k8s_client.V1Container(
        name="main",
        image=image_name,
        command=command,
        args=args,
        env=[
            k8s_client.V1EnvVar(name=name, value=value)
            for name, value in env.items()
        ],
        security_context=security_context,
    )
    image_pull_secrets = []
    if pod_settings:
        image_pull_secrets = [
            k8s_client.V1LocalObjectReference(name=name)
            for name in pod_settings.image_pull_secrets
        ]

    pod_spec = k8s_client.V1PodSpec(
        containers=[container_spec],
        restart_policy="Never",
        image_pull_secrets=image_pull_secrets,
        termination_grace_period_seconds=termination_grace_period_seconds,
    )

    if service_account_name is not None:
        pod_spec.service_account_name = service_account_name

    # Apply pod settings if provided
    labels = labels or {}

    if pod_settings:
        add_pod_settings(pod_spec, pod_settings)

    if pod_settings and pod_settings.labels:
        labels.update(pod_settings.labels)

    pod_metadata = k8s_client.V1ObjectMeta(
        name=pod_name,
        labels=labels,
        owner_references=owner_references,
    )

    if pod_settings and pod_settings.annotations:
        pod_metadata.annotations = pod_settings.annotations

    pod_manifest = k8s_client.V1Pod(
        kind="Pod",
        api_version="v1",
        metadata=pod_metadata,
        spec=pod_spec,
    )

    if mount_local_stores:
        add_local_stores_mount(pod_spec)

    return pod_manifest
build_role_binding_manifest_for_service_account(name: str, role_name: str, service_account_name: str, namespace: str = 'default') -> Dict[str, Any]

Build a manifest for a role binding of a service account.

Parameters:

Name Type Description Default
name str

Name of the cluster role binding.

required
role_name str

Name of the role.

required
service_account_name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

'default'

Returns:

Type Description
Dict[str, Any]

Manifest for a cluster role binding of a service account.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def build_role_binding_manifest_for_service_account(
    name: str,
    role_name: str,
    service_account_name: str,
    namespace: str = "default",
) -> Dict[str, Any]:
    """Build a manifest for a role binding of a service account.

    Args:
        name: Name of the cluster role binding.
        role_name: Name of the role.
        service_account_name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".

    Returns:
        Manifest for a cluster role binding of a service account.
    """
    return {
        "apiVersion": "rbac.authorization.k8s.io/v1",
        "kind": "RoleBinding",
        "metadata": {"name": name},
        "subjects": [
            {
                "kind": "ServiceAccount",
                "name": service_account_name,
                "namespace": namespace,
            }
        ],
        "roleRef": {
            "kind": "ClusterRole",
            "name": role_name,
            "apiGroup": "rbac.authorization.k8s.io",
        },
    }
build_secret_manifest(name: str, data: Mapping[str, Optional[str]], secret_type: str = 'Opaque') -> Dict[str, Any]

Builds a Kubernetes secret manifest.

Parameters:

Name Type Description Default
name str

Name of the secret.

required
data Mapping[str, Optional[str]]

The secret data.

required
secret_type str

The secret type.

'Opaque'

Returns:

Type Description
Dict[str, Any]

The secret manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def build_secret_manifest(
    name: str,
    data: Mapping[str, Optional[str]],
    secret_type: str = "Opaque",
) -> Dict[str, Any]:
    """Builds a Kubernetes secret manifest.

    Args:
        name: Name of the secret.
        data: The secret data.
        secret_type: The secret type.

    Returns:
        The secret manifest.
    """
    encoded_data = {
        key: base64.b64encode(value.encode()).decode() if value else None
        for key, value in data.items()
    }

    return {
        "apiVersion": "v1",
        "kind": "Secret",
        "metadata": {
            "name": name,
        },
        "type": secret_type,
        "data": encoded_data,
    }
build_service_account_manifest(name: str, namespace: str = 'default') -> Dict[str, Any]

Build the manifest for a service account.

Parameters:

Name Type Description Default
name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

'default'

Returns:

Type Description
Dict[str, Any]

Manifest for a service account.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def build_service_account_manifest(
    name: str, namespace: str = "default"
) -> Dict[str, Any]:
    """Build the manifest for a service account.

    Args:
        name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".

    Returns:
        Manifest for a service account.
    """
    return {
        "apiVersion": "v1",
        "metadata": {
            "name": name,
            "namespace": namespace,
        },
    }
job_template_manifest_from_job(job: k8s_client.V1Job) -> k8s_client.V1JobTemplateSpec

Build a Kubernetes job template manifest from a job.

Parameters:

Name Type Description Default
job V1Job

The job manifest to build the template from.

required

Returns:

Type Description
V1JobTemplateSpec

The job template manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def job_template_manifest_from_job(
    job: k8s_client.V1Job,
) -> k8s_client.V1JobTemplateSpec:
    """Build a Kubernetes job template manifest from a job.

    Args:
        job: The job manifest to build the template from.

    Returns:
        The job template manifest.
    """
    return k8s_client.V1JobTemplateSpec(
        metadata=job.metadata,
        spec=job.spec,
    )
pod_template_manifest_from_pod(pod: k8s_client.V1Pod) -> k8s_client.V1PodTemplateSpec

Build a Kubernetes pod template manifest from a pod.

Parameters:

Name Type Description Default
pod V1Pod

The pod manifest to build the template from.

required

Returns:

Type Description
V1PodTemplateSpec

The pod template manifest.

Source code in src/zenml/integrations/kubernetes/orchestrators/manifest_utils.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def pod_template_manifest_from_pod(
    pod: k8s_client.V1Pod,
) -> k8s_client.V1PodTemplateSpec:
    """Build a Kubernetes pod template manifest from a pod.

    Args:
        pod: The pod manifest to build the template from.

    Returns:
        The pod template manifest.
    """
    return k8s_client.V1PodTemplateSpec(
        metadata=pod.metadata,
        spec=pod.spec,
    )

pod_settings

Kubernetes pod settings.

Classes
KubernetesPodSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Kubernetes Pod settings.

Attributes:

Name Type Description
node_selectors Dict[str, str]

Node selectors to apply to the pod.

affinity Dict[str, Any]

Affinity to apply to the pod.

tolerations List[Dict[str, Any]]

Tolerations to apply to the pod.

resources Dict[str, Dict[str, str]]

Resource requests and limits for the pod.

annotations Dict[str, str]

Annotations to apply to the pod metadata.

volumes List[Dict[str, Any]]

Volumes to mount in the pod.

volume_mounts List[Dict[str, Any]]

Volume mounts to apply to the pod containers.

host_ipc bool

Whether to enable host IPC for the pod.

scheduler_name Optional[str]

The name of the scheduler to use for the pod.

image_pull_secrets List[str]

Image pull secrets to use for the pod.

labels Dict[str, str]

Labels to apply to the pod.

env List[Dict[str, Any]]

Environment variables to apply to the container.

env_from List[Dict[str, Any]]

Environment variables to apply to the container.

additional_pod_spec_args Dict[str, Any]

Additional arguments to pass to the pod. These will be applied to the pod spec.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/deploying-zenml/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions
warn_if_invalid_model_data(data: Any, class_name: str) -> None

Validates the data of a Kubernetes model.

Parameters:

Name Type Description Default
data Any

The data to validate.

required
class_name str

Name of the class of the model.

required
Source code in src/zenml/integrations/kubernetes/pod_settings.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def warn_if_invalid_model_data(data: Any, class_name: str) -> None:
    """Validates the data of a Kubernetes model.

    Args:
        data: The data to validate.
        class_name: Name of the class of the model.
    """
    if not isinstance(data, dict):
        return

    try:
        serialization_utils.deserialize_kubernetes_model(data, class_name)
    except KeyError as e:
        if str(e) not in _pod_settings_logged_warnings:
            _pod_settings_logged_warnings.append(str(e))
            logger.warning(
                "Invalid data for Kubernetes model class `%s`: %s. "
                "Hint: Kubernetes expects attribute names in CamelCase, not "
                "snake_case.",
                class_name,
                e,
            )
Modules

serialization_utils

Kubernetes serialization utils.

Functions
deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any

Deserializes a Kubernetes model.

Parameters:

Name Type Description Default
data Dict[str, Any]

The model data.

required
class_name str

Name of the Kubernetes model class.

required

Raises:

Type Description
KeyError

If the data contains values for an invalid attribute.

Returns:

Type Description
Any

The deserialized model.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any:
    """Deserializes a Kubernetes model.

    Args:
        data: The model data.
        class_name: Name of the Kubernetes model class.

    Raises:
        KeyError: If the data contains values for an invalid attribute.

    Returns:
        The deserialized model.
    """
    model_class = get_model_class(class_name=class_name)
    assert hasattr(model_class, "openapi_types")
    assert hasattr(model_class, "attribute_map")
    # Mapping of the attribute name of the model class to the attribute type
    type_mapping = cast(Dict[str, str], model_class.openapi_types)
    reverse_attribute_mapping = cast(Dict[str, str], model_class.attribute_map)
    # Mapping of the serialized key to the attribute name of the model class
    attribute_mapping = {
        value: key for key, value in reverse_attribute_mapping.items()
    }

    deserialized_attributes: Dict[str, Any] = {}

    for key, value in data.items():
        if key not in attribute_mapping:
            raise KeyError(
                f"Got value for attribute {key} which is not one of the "
                f"available attributes for class {class_name}: "
                f"{set(attribute_mapping)}."
            )

        attribute_name = attribute_mapping[key]
        attribute_class = type_mapping[attribute_name]

        if not value:
            deserialized_attributes[attribute_name] = value
        elif attribute_class.startswith("list["):
            match = re.fullmatch(r"list\[(.*)\]", attribute_class)
            assert match
            inner_class = match.group(1)
            deserialized_attributes[attribute_name] = _deserialize_list(
                value, class_name=inner_class
            )
        elif attribute_class.startswith("dict("):
            match = re.fullmatch(r"dict\(([^,]*), (.*)\)", attribute_class)
            assert match
            inner_class = match.group(1)
            deserialized_attributes[attribute_name] = _deserialize_dict(
                value, class_name=inner_class
            )
        elif is_model_class(attribute_class):
            deserialized_attributes[attribute_name] = (
                deserialize_kubernetes_model(value, attribute_class)
            )
        else:
            deserialized_attributes[attribute_name] = value

    return model_class(**deserialized_attributes)
get_model_class(class_name: str) -> Type[Any]

Gets a Kubernetes model class.

Parameters:

Name Type Description Default
class_name str

Name of the class to get.

required

Raises:

Type Description
TypeError

If no Kubernetes model class exists for this name.

Returns:

Type Description
Type[Any]

The model class.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get_model_class(class_name: str) -> Type[Any]:
    """Gets a Kubernetes model class.

    Args:
        class_name: Name of the class to get.

    Raises:
        TypeError: If no Kubernetes model class exists for this name.

    Returns:
        The model class.
    """
    import kubernetes.client.models

    class_ = getattr(kubernetes.client.models, class_name, None)

    if not class_:
        raise TypeError(
            f"Unable to find kubernetes model class with name {class_name}."
        )

    assert isinstance(class_, type)
    return class_
is_model_class(class_name: str) -> bool

Checks whether the given class name is a Kubernetes model class.

Parameters:

Name Type Description Default
class_name str

Name of the class to check.

required

Returns:

Type Description
bool

If the given class name is a Kubernetes model class.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
153
154
155
156
157
158
159
160
161
162
163
164
def is_model_class(class_name: str) -> bool:
    """Checks whether the given class name is a Kubernetes model class.

    Args:
        class_name: Name of the class to check.

    Returns:
        If the given class name is a Kubernetes model class.
    """
    import kubernetes.client.models

    return hasattr(kubernetes.client.models, class_name)
serialize_kubernetes_model(model: Any) -> Dict[str, Any]

Serializes a Kubernetes model.

Parameters:

Name Type Description Default
model Any

The model to serialize.

required

Raises:

Type Description
TypeError

If the model is not a Kubernetes model.

Returns:

Type Description
Dict[str, Any]

The serialized model.

Source code in src/zenml/integrations/kubernetes/serialization_utils.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def serialize_kubernetes_model(model: Any) -> Dict[str, Any]:
    """Serializes a Kubernetes model.

    Args:
        model: The model to serialize.

    Raises:
        TypeError: If the model is not a Kubernetes model.

    Returns:
        The serialized model.
    """
    if not is_model_class(model.__class__.__name__):
        raise TypeError(f"Unable to serialize non-kubernetes model {model}.")

    assert hasattr(model, "attribute_map")
    attribute_mapping = cast(Dict[str, str], model.attribute_map)

    model_attributes = {
        serialized_attribute_name: getattr(model, attribute_name)
        for attribute_name, serialized_attribute_name in attribute_mapping.items()
    }
    return _serialize_dict(model_attributes)

service_connectors

Kubernetes Service Connector.

Classes
KubernetesServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Kubernetes service connector.

Source code in src/zenml/service_connectors/service_connector.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Modules
kubernetes_service_connector

Kubernetes Service Connector.

The Kubernetes Service Connector implements various authentication methods for Kubernetes clusters.

Classes
KubernetesAuthenticationMethods

Bases: StrEnum

Kubernetes Authentication methods.

KubernetesBaseConfig

Bases: KubernetesServerConfig

Kubernetes basic config.

KubernetesServerConfig

Bases: KubernetesServerCredentials

Kubernetes server config.

KubernetesServerCredentials

Bases: AuthenticationConfig

Kubernetes server authentication config.

KubernetesServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Kubernetes service connector.

Source code in src/zenml/service_connectors/service_connector.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
KubernetesTokenConfig

Bases: KubernetesBaseConfig, KubernetesTokenCredentials

Kubernetes token config.

KubernetesTokenCredentials

Bases: AuthenticationConfig

Kubernetes token authentication config.

KubernetesUserPasswordConfig

Bases: KubernetesBaseConfig, KubernetesUserPasswordCredentials

Kubernetes user/pass config.

KubernetesUserPasswordCredentials

Bases: AuthenticationConfig

Kubernetes user/pass authentication config.

Functions

step_operators

Kubernetes step operator.

Classes
KubernetesStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run on Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesStepOperatorConfig property

Returns the KubernetesStepOperatorConfig config.

Returns:

Type Description
KubernetesStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_kube_client() -> k8s_client.ApiClient

Get the Kubernetes API client.

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

If the service connector returns an unexpected client.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_kube_client(self) -> k8s_client.ApiClient:
    """Get the Kubernetes API client.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: If the service connector returns an unexpected client.
    """
    if self.config.incluster:
        kube_utils.load_kube_config(incluster=True)
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on Kubernetes.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on Kubernetes.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(
        KubernetesStepOperatorSettings, self.get_settings(info)
    )
    image_name = info.get_image(
        key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
    )
    command = entrypoint_command[:3]
    args = entrypoint_command[3:]

    step_labels = {
        "run_id": kube_utils.sanitize_label(str(info.run_id)),
        "run_name": kube_utils.sanitize_label(str(info.run_name)),
        "pipeline": kube_utils.sanitize_label(info.pipeline.name),
        "step_name": kube_utils.sanitize_label(info.pipeline_step_name),
    }
    step_annotations = {
        STEP_NAME_ANNOTATION_KEY: info.pipeline_step_name,
        STEP_OPERATOR_ANNOTATION_KEY: str(self.id),
    }

    # We set some default minimum memory resource requests for the step pod
    # here if the user has not specified any, because the step pod takes up
    # some memory resources itself and, if not specified, the pod will be
    # scheduled on any node regardless of available memory and risk
    # negatively impacting or even crashing the node due to memory pressure.
    pod_settings = kube_utils.apply_default_resource_requests(
        memory="400Mi",
        pod_settings=settings.pod_settings,
    )

    pod_manifest = build_pod_manifest(
        pod_name=None,
        image_name=image_name,
        command=command,
        args=args,
        env=environment,
        privileged=settings.privileged,
        pod_settings=pod_settings,
        service_account_name=settings.service_account_name,
        labels=step_labels,
    )

    job_name = settings.job_name_prefix or ""
    random_prefix = "".join(random.choices("0123456789abcdef", k=8))
    job_name += f"-{random_prefix}-{info.pipeline_step_name}-{info.pipeline.name}-step-operator"
    # The job name will be used as a label on the pods, so we need to make
    # sure it doesn't exceed the label length limit
    job_name = kube_utils.sanitize_label(job_name)

    job_manifest = build_job_manifest(
        job_name=job_name,
        pod_template=pod_template_manifest_from_pod(pod_manifest),
        # The orchestrator already handles retries, so we don't need to
        # retry the step operator job.
        backoff_limit=0,
        ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
        active_deadline_seconds=settings.active_deadline_seconds,
        labels=step_labels,
        annotations=step_annotations,
    )

    kube_utils.create_job(
        batch_api=self._k8s_batch_api,
        namespace=self.config.kubernetes_namespace,
        job_manifest=job_manifest,
    )

    logger.info(
        "Waiting for step operator job `%s` to finish...",
        job_name,
    )
    kube_utils.wait_for_job_to_finish(
        batch_api=self._k8s_batch_api,
        core_api=self._k8s_core_api,
        namespace=self.config.kubernetes_namespace,
        job_name=job_name,
        fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
        stream_logs=True,
    )
    logger.info("Step operator job completed.")
Modules
kubernetes_step_operator

Kubernetes step operator implementation.

Classes
KubernetesStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run on Kubernetes.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubernetesStepOperatorConfig property

Returns the KubernetesStepOperatorConfig config.

Returns:

Type Description
KubernetesStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Kubernetes step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.uses_step_operator(self.name):
            build = BuildConfiguration(
                key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_kube_client() -> k8s_client.ApiClient

Get the Kubernetes API client.

Returns:

Type Description
ApiClient

The Kubernetes API client.

Raises:

Type Description
RuntimeError

If the service connector returns an unexpected client.

Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def get_kube_client(self) -> k8s_client.ApiClient:
    """Get the Kubernetes API client.

    Returns:
        The Kubernetes API client.

    Raises:
        RuntimeError: If the service connector returns an unexpected client.
    """
    if self.config.incluster:
        kube_utils.load_kube_config(incluster=True)
        self._k8s_client = k8s_client.ApiClient()
        return self._k8s_client

    # Refresh the client also if the connector has expired
    if self._k8s_client and not self.connector_has_expired():
        return self._k8s_client

    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, k8s_client.ApiClient):
            raise RuntimeError(
                f"Expected a k8s_client.ApiClient while trying to use the "
                f"linked connector, but got {type(client)}."
            )
        self._k8s_client = client
    else:
        kube_utils.load_kube_config(
            context=self.config.kubernetes_context,
        )
        self._k8s_client = k8s_client.ApiClient()

    return self._k8s_client
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on Kubernetes.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on Kubernetes.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(
        KubernetesStepOperatorSettings, self.get_settings(info)
    )
    image_name = info.get_image(
        key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
    )
    command = entrypoint_command[:3]
    args = entrypoint_command[3:]

    step_labels = {
        "run_id": kube_utils.sanitize_label(str(info.run_id)),
        "run_name": kube_utils.sanitize_label(str(info.run_name)),
        "pipeline": kube_utils.sanitize_label(info.pipeline.name),
        "step_name": kube_utils.sanitize_label(info.pipeline_step_name),
    }
    step_annotations = {
        STEP_NAME_ANNOTATION_KEY: info.pipeline_step_name,
        STEP_OPERATOR_ANNOTATION_KEY: str(self.id),
    }

    # We set some default minimum memory resource requests for the step pod
    # here if the user has not specified any, because the step pod takes up
    # some memory resources itself and, if not specified, the pod will be
    # scheduled on any node regardless of available memory and risk
    # negatively impacting or even crashing the node due to memory pressure.
    pod_settings = kube_utils.apply_default_resource_requests(
        memory="400Mi",
        pod_settings=settings.pod_settings,
    )

    pod_manifest = build_pod_manifest(
        pod_name=None,
        image_name=image_name,
        command=command,
        args=args,
        env=environment,
        privileged=settings.privileged,
        pod_settings=pod_settings,
        service_account_name=settings.service_account_name,
        labels=step_labels,
    )

    job_name = settings.job_name_prefix or ""
    random_prefix = "".join(random.choices("0123456789abcdef", k=8))
    job_name += f"-{random_prefix}-{info.pipeline_step_name}-{info.pipeline.name}-step-operator"
    # The job name will be used as a label on the pods, so we need to make
    # sure it doesn't exceed the label length limit
    job_name = kube_utils.sanitize_label(job_name)

    job_manifest = build_job_manifest(
        job_name=job_name,
        pod_template=pod_template_manifest_from_pod(pod_manifest),
        # The orchestrator already handles retries, so we don't need to
        # retry the step operator job.
        backoff_limit=0,
        ttl_seconds_after_finished=settings.ttl_seconds_after_finished,
        active_deadline_seconds=settings.active_deadline_seconds,
        labels=step_labels,
        annotations=step_annotations,
    )

    kube_utils.create_job(
        batch_api=self._k8s_batch_api,
        namespace=self.config.kubernetes_namespace,
        job_manifest=job_manifest,
    )

    logger.info(
        "Waiting for step operator job `%s` to finish...",
        job_name,
    )
    kube_utils.wait_for_job_to_finish(
        batch_api=self._k8s_batch_api,
        core_api=self._k8s_core_api,
        namespace=self.config.kubernetes_namespace,
        job_name=job_name,
        fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons,
        stream_logs=True,
    )
    logger.info("Step operator job completed.")
Functions Modules