Skip to content

Kubeflow

zenml.integrations.kubeflow

Initialization of the Kubeflow integration for ZenML.

The Kubeflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubeflow orchestrator with the CLI tool.

Attributes

KUBEFLOW = 'kubeflow' module-attribute

KUBEFLOW_ORCHESTRATOR_FLAVOR = 'kubeflow' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

KubeflowIntegration

Bases: Integration

Definition of Kubeflow Integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Kubeflow integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/kubeflow/__init__.py
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Kubeflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.kubeflow.flavors import (
        KubeflowOrchestratorFlavor,
    )

    return [KubeflowOrchestratorFlavor]

Modules

flavors

Kubeflow integration flavors.

Classes
KubeflowOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, KubeflowOrchestratorSettings

Configuration for the Kubeflow orchestrator.

Attributes:

Name Type Description
kubeflow_hostname Optional[str]

The hostname to use to talk to the Kubeflow Pipelines API. If not set, the hostname will be derived from the Kubernetes API proxy. Mandatory when connecting to a multi-tenant Kubeflow Pipelines deployment.

kubeflow_namespace str

The Kubernetes namespace in which Kubeflow Pipelines is deployed. Defaults to kubeflow.

kubernetes_context Optional[str]

Name of a kubernetes context to run pipelines in. Not applicable when connecting to a multi-tenant Kubeflow Pipelines deployment (i.e. when kubeflow_hostname is set) or if the stack component is linked to a Kubernetes service connector.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

KubeflowOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Kubeflow orchestrator flavor.

Attributes
config_class: Type[KubeflowOrchestratorConfig] property

Returns KubeflowOrchestratorConfig config class.

Returns:

Type Description
Type[KubeflowOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubeflowOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubeflowOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

Modules
kubeflow_orchestrator_flavor

Kubeflow orchestrator flavor.

Classes
KubeflowOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, KubeflowOrchestratorSettings

Configuration for the Kubeflow orchestrator.

Attributes:

Name Type Description
kubeflow_hostname Optional[str]

The hostname to use to talk to the Kubeflow Pipelines API. If not set, the hostname will be derived from the Kubernetes API proxy. Mandatory when connecting to a multi-tenant Kubeflow Pipelines deployment.

kubeflow_namespace str

The Kubernetes namespace in which Kubeflow Pipelines is deployed. Defaults to kubeflow.

kubernetes_context Optional[str]

Name of a kubernetes context to run pipelines in. Not applicable when connecting to a multi-tenant Kubeflow Pipelines deployment (i.e. when kubeflow_hostname is set) or if the stack component is linked to a Kubernetes service connector.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

KubeflowOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Kubeflow orchestrator flavor.

Attributes
config_class: Type[KubeflowOrchestratorConfig] property

Returns KubeflowOrchestratorConfig config class.

Returns:

Type Description
Type[KubeflowOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubeflowOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[KubeflowOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

KubeflowOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Kubeflow orchestrator.

Attributes:

Name Type Description
synchronous bool

If True, the client running a pipeline using this orchestrator waits until all steps finish running. If False, the client returns immediately and the pipeline is executed asynchronously. Defaults to True. This setting only has an effect when specified on the pipeline and will be ignored if specified on steps.

timeout int

How many seconds to wait for synchronous runs.

client_args Dict[str, Any]

Arguments to pass when initializing the KFP client.

client_username Optional[str]

Username to generate a session cookie for the kubeflow client. Both client_username

client_password Optional[str]

Password to generate a session cookie for the kubeflow client. Both client_username

user_namespace Optional[str]

The user namespace to use when creating experiments and runs.

pod_settings Optional[KubernetesPodSettings]

Pod settings to apply.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions

orchestrators

Initialization of the Kubeflow ZenML orchestrator.

Classes
KubeflowOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines using Kubeflow.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubeflowOrchestratorConfig property

Returns the KubeflowOrchestratorConfig config.

Returns:

Type Description
KubeflowOrchestratorConfig

The configuration.

pipeline_directory: str property

Returns path to a directory in which the kubeflow pipeline files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property

Path to the root directory for all files concerning this orchestrator.

Returns:

Type Description
str

Path to the root directory.

settings_class: Type[KubeflowOrchestratorSettings] property

Settings class for the Kubeflow orchestrator.

Returns:

Type Description
Type[KubeflowOrchestratorSettings]

The settings class.

validator: Optional[StackValidator] property

Validates that the stack contains a container registry.

Also check that requirements are met for local components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_kubernetes_contexts() -> Tuple[List[str], Optional[str]]

Get the list of configured Kubernetes contexts and the active context.

Returns:

Type Description
List[str]

A tuple containing the list of configured Kubernetes contexts and

Optional[str]

the active context.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
    """Get the list of configured Kubernetes contexts and the active context.

    Returns:
        A tuple containing the list of configured Kubernetes contexts and
        the active context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException:
        return [], None

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_KFP_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_KFP_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    hostname = self.config.kubeflow_hostname
    if not hostname:
        return {}

    hostname = hostname.rstrip("/")
    pipeline_suffix = "/pipeline"
    if hostname.endswith(pipeline_suffix):
        hostname = hostname[: -len(pipeline_suffix)]

    run = Client().get_pipeline_run(run_id)

    settings_key = settings_utils.get_stack_component_setting_key(self)
    run_settings = self.settings_class.model_validate(
        run.config.model_dump().get(settings_key, self.config)
    )
    user_namespace = run_settings.user_namespace

    if user_namespace:
        run_url = (
            f"{hostname}/_/pipeline/?ns={user_namespace}#"
            f"/runs/details/{self.get_orchestrator_run_id()}"
        )
        return {
            METADATA_ORCHESTRATOR_URL: Uri(run_url),
        }
    else:
        return {
            METADATA_ORCHESTRATOR_URL: Uri(f"{hostname}"),
        }
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str]) -> Any

Creates a kfp yaml file.

This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a docker image that contains the code for the pipeline, all steps the context around these files.

Based on this docker image a callable is created which builds container_ops for each step (_construct_kfp_pipeline). To do this the entrypoint of the docker image is configured to run the correct step within the docker image. The dependencies between these container_ops are then also configured onto each container_op by pointing at the downstream steps.

This callable is then compiled into a kfp yaml file that is used as the intermediary representation of the kubeflow pipeline.

This file, together with some metadata, runtime configurations is then uploaded into the kubeflow pipelines cluster for execution.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Raises:

Type Description
RuntimeError

If trying to run a pipeline in a notebook environment.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Creates a kfp yaml file.

    This functions as an intermediary representation of the pipeline which
    is then deployed to the kubeflow pipelines instance.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()`
    method builds a docker image that contains the code for the
    pipeline, all steps the context around these files.

    Based on this docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`).
    To do this the entrypoint of the docker image is configured to
    run the correct step within the docker image. The dependencies
    between these container_ops are then also configured onto each
    container_op by pointing at the downstream steps.

    This callable is then compiled into a kfp yaml file that is used as
    the intermediary representation of the kubeflow pipeline.

    This file, together with some metadata, runtime configurations is
    then uploaded into the kubeflow pipelines cluster for execution.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        RuntimeError: If trying to run a pipeline in a notebook
            environment.
    """
    # First check whether the code running in a notebook
    if Environment.in_notebook():
        raise RuntimeError(
            "The Kubeflow orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )

    assert stack.container_registry

    # Create a callable for future compilation into a dsl.Pipeline.
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    ).replace("_", "-")

    def _create_dynamic_pipeline() -> Any:
        """Create a dynamic pipeline including each step.

        Returns:
            pipeline_func
        """
        step_name_to_dynamic_component: Dict[str, Any] = {}
        node_selector_constraint: Optional[Tuple[str, str]] = None

        for step_name, step in deployment.step_configurations.items():
            image = self.get_image(
                deployment=deployment,
                step_name=step_name,
            )
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment.id,
                )
            )
            dynamic_component = self._create_dynamic_component(
                image, command, arguments, step_name
            )
            step_settings = cast(
                KubeflowOrchestratorSettings, self.get_settings(step)
            )
            pod_settings = step_settings.pod_settings
            if pod_settings:
                if pod_settings.host_ipc:
                    logger.warning(
                        "Host IPC is set to `True` but not supported in "
                        "this orchestrator. Ignoring..."
                    )
                if pod_settings.affinity:
                    logger.warning(
                        "Affinity is set but not supported in Kubeflow with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.tolerations:
                    logger.warning(
                        "Tolerations are set but not supported in "
                        "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volumes:
                    logger.warning(
                        "Volumes are set but not supported in Kubeflow with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volume_mounts:
                    logger.warning(
                        "Volume mounts are set but not supported in "
                        "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.env or pod_settings.env_from:
                    logger.warning(
                        "Environment variables are set but not supported "
                        "in Kubeflow with Kubeflow Pipelines 2.x. "
                        "Ignoring..."
                    )

                # apply pod settings
                if (
                    KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                    in pod_settings.node_selectors.keys()
                ):
                    node_selector_constraint = (
                        KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                        pod_settings.node_selectors[
                            KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                        ],
                    )

            step_name_to_dynamic_component[step_name] = dynamic_component

        @dsl.pipeline(  # type: ignore[misc]
            display_name=orchestrator_run_name,
        )
        def dynamic_pipeline() -> None:
            """Dynamic pipeline."""
            # iterate through the components one by one
            # (from step_name_to_dynamic_component)
            for (
                component_name,
                component,
            ) in step_name_to_dynamic_component.items():
                # for each component, check to see what other steps are
                # upstream of it
                step = deployment.step_configurations[component_name]
                upstream_step_components = [
                    step_name_to_dynamic_component[upstream_step_name]
                    for upstream_step_name in step.spec.upstream_steps
                ]
                task = (
                    component()
                    .set_display_name(
                        name=component_name,
                    )
                    .set_caching_options(enable_caching=False)
                    .set_env_variable(
                        name=ENV_KFP_RUN_ID,
                        value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
                    )
                    .after(*upstream_step_components)
                )
                self._configure_container_resources(
                    task,
                    step.config.resource_settings,
                    node_selector_constraint,
                )

        return dynamic_pipeline

    def _update_yaml_with_environment(
        yaml_file_path: str, environment: Dict[str, str]
    ) -> None:
        """Updates the env section of the steps in the YAML file with the given environment variables.

        Args:
            yaml_file_path: The path to the YAML file to update.
            environment: A dictionary of environment variables to add.
        """
        pipeline_definition = yaml_utils.read_yaml(pipeline_file_path)

        # Iterate through each component and add the environment variables
        for executor in pipeline_definition["deploymentSpec"]["executors"]:
            if (
                "container"
                in pipeline_definition["deploymentSpec"]["executors"][
                    executor
                ]
            ):
                container = pipeline_definition["deploymentSpec"][
                    "executors"
                ][executor]["container"]
                if "env" not in container:
                    container["env"] = []
                for key, value in environment.items():
                    container["env"].append({"name": key, "value": value})

        yaml_utils.write_yaml(pipeline_file_path, pipeline_definition)

        print(
            f"Updated YAML file with environment variables at {yaml_file_path}"
        )

    # Get a filepath to use to save the finished yaml to
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory, f"{orchestrator_run_name}.yaml"
    )

    # write the argo pipeline yaml
    Compiler().compile(
        pipeline_func=_create_dynamic_pipeline(),
        package_path=pipeline_file_path,
        pipeline_name=orchestrator_run_name,
    )

    # Let's update the YAML file with the environment variables
    _update_yaml_with_environment(pipeline_file_path, environment)

    logger.info(
        "Writing Kubeflow workflow definition to `%s`.", pipeline_file_path
    )

    # using the kfp client uploads the pipeline to kubeflow pipelines and
    # runs it there
    self._upload_and_run_pipeline(
        deployment=deployment,
        pipeline_file_path=pipeline_file_path,
        run_name=orchestrator_run_name,
    )
Modules
kubeflow_orchestrator

Implementation of the Kubeflow orchestrator.

Classes
KubeClientKFPClient(client: k8s_client.ApiClient, *args: Any, **kwargs: Any)

Bases: Client

KFP client initialized from a Kubernetes client.

This is a workaround for the fact that the native KFP client does not support initialization from an existing Kubernetes client.

Initializes the KFP client from a Kubernetes client.

Parameters:

Name Type Description Default
client ApiClient

pre-configured Kubernetes client.

required
args Any

standard KFP client positional arguments.

()
kwargs Any

standard KFP client keyword arguments.

{}
Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
100
101
102
103
104
105
106
107
108
109
110
111
def __init__(
    self, client: k8s_client.ApiClient, *args: Any, **kwargs: Any
) -> None:
    """Initializes the KFP client from a Kubernetes client.

    Args:
        client: pre-configured Kubernetes client.
        args: standard KFP client positional arguments.
        kwargs: standard KFP client keyword arguments.
    """
    self._k8s_client = client
    super().__init__(*args, **kwargs)
Functions
KubeflowOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines using Kubeflow.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: KubeflowOrchestratorConfig property

Returns the KubeflowOrchestratorConfig config.

Returns:

Type Description
KubeflowOrchestratorConfig

The configuration.

pipeline_directory: str property

Returns path to a directory in which the kubeflow pipeline files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property

Path to the root directory for all files concerning this orchestrator.

Returns:

Type Description
str

Path to the root directory.

settings_class: Type[KubeflowOrchestratorSettings] property

Settings class for the Kubeflow orchestrator.

Returns:

Type Description
Type[KubeflowOrchestratorSettings]

The settings class.

validator: Optional[StackValidator] property

Validates that the stack contains a container registry.

Also check that requirements are met for local components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_kubernetes_contexts() -> Tuple[List[str], Optional[str]]

Get the list of configured Kubernetes contexts and the active context.

Returns:

Type Description
List[str]

A tuple containing the list of configured Kubernetes contexts and

Optional[str]

the active context.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
    """Get the list of configured Kubernetes contexts and the active context.

    Returns:
        A tuple containing the list of configured Kubernetes contexts and
        the active context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException:
        return [], None

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_KFP_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_KFP_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    hostname = self.config.kubeflow_hostname
    if not hostname:
        return {}

    hostname = hostname.rstrip("/")
    pipeline_suffix = "/pipeline"
    if hostname.endswith(pipeline_suffix):
        hostname = hostname[: -len(pipeline_suffix)]

    run = Client().get_pipeline_run(run_id)

    settings_key = settings_utils.get_stack_component_setting_key(self)
    run_settings = self.settings_class.model_validate(
        run.config.model_dump().get(settings_key, self.config)
    )
    user_namespace = run_settings.user_namespace

    if user_namespace:
        run_url = (
            f"{hostname}/_/pipeline/?ns={user_namespace}#"
            f"/runs/details/{self.get_orchestrator_run_id()}"
        )
        return {
            METADATA_ORCHESTRATOR_URL: Uri(run_url),
        }
    else:
        return {
            METADATA_ORCHESTRATOR_URL: Uri(f"{hostname}"),
        }
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str]) -> Any

Creates a kfp yaml file.

This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a docker image that contains the code for the pipeline, all steps the context around these files.

Based on this docker image a callable is created which builds container_ops for each step (_construct_kfp_pipeline). To do this the entrypoint of the docker image is configured to run the correct step within the docker image. The dependencies between these container_ops are then also configured onto each container_op by pointing at the downstream steps.

This callable is then compiled into a kfp yaml file that is used as the intermediary representation of the kubeflow pipeline.

This file, together with some metadata, runtime configurations is then uploaded into the kubeflow pipelines cluster for execution.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required

Raises:

Type Description
RuntimeError

If trying to run a pipeline in a notebook environment.

Source code in src/zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Creates a kfp yaml file.

    This functions as an intermediary representation of the pipeline which
    is then deployed to the kubeflow pipelines instance.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()`
    method builds a docker image that contains the code for the
    pipeline, all steps the context around these files.

    Based on this docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`).
    To do this the entrypoint of the docker image is configured to
    run the correct step within the docker image. The dependencies
    between these container_ops are then also configured onto each
    container_op by pointing at the downstream steps.

    This callable is then compiled into a kfp yaml file that is used as
    the intermediary representation of the kubeflow pipeline.

    This file, together with some metadata, runtime configurations is
    then uploaded into the kubeflow pipelines cluster for execution.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.

    Raises:
        RuntimeError: If trying to run a pipeline in a notebook
            environment.
    """
    # First check whether the code running in a notebook
    if Environment.in_notebook():
        raise RuntimeError(
            "The Kubeflow orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )

    assert stack.container_registry

    # Create a callable for future compilation into a dsl.Pipeline.
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    ).replace("_", "-")

    def _create_dynamic_pipeline() -> Any:
        """Create a dynamic pipeline including each step.

        Returns:
            pipeline_func
        """
        step_name_to_dynamic_component: Dict[str, Any] = {}
        node_selector_constraint: Optional[Tuple[str, str]] = None

        for step_name, step in deployment.step_configurations.items():
            image = self.get_image(
                deployment=deployment,
                step_name=step_name,
            )
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment.id,
                )
            )
            dynamic_component = self._create_dynamic_component(
                image, command, arguments, step_name
            )
            step_settings = cast(
                KubeflowOrchestratorSettings, self.get_settings(step)
            )
            pod_settings = step_settings.pod_settings
            if pod_settings:
                if pod_settings.host_ipc:
                    logger.warning(
                        "Host IPC is set to `True` but not supported in "
                        "this orchestrator. Ignoring..."
                    )
                if pod_settings.affinity:
                    logger.warning(
                        "Affinity is set but not supported in Kubeflow with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.tolerations:
                    logger.warning(
                        "Tolerations are set but not supported in "
                        "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volumes:
                    logger.warning(
                        "Volumes are set but not supported in Kubeflow with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volume_mounts:
                    logger.warning(
                        "Volume mounts are set but not supported in "
                        "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.env or pod_settings.env_from:
                    logger.warning(
                        "Environment variables are set but not supported "
                        "in Kubeflow with Kubeflow Pipelines 2.x. "
                        "Ignoring..."
                    )

                # apply pod settings
                if (
                    KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                    in pod_settings.node_selectors.keys()
                ):
                    node_selector_constraint = (
                        KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                        pod_settings.node_selectors[
                            KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                        ],
                    )

            step_name_to_dynamic_component[step_name] = dynamic_component

        @dsl.pipeline(  # type: ignore[misc]
            display_name=orchestrator_run_name,
        )
        def dynamic_pipeline() -> None:
            """Dynamic pipeline."""
            # iterate through the components one by one
            # (from step_name_to_dynamic_component)
            for (
                component_name,
                component,
            ) in step_name_to_dynamic_component.items():
                # for each component, check to see what other steps are
                # upstream of it
                step = deployment.step_configurations[component_name]
                upstream_step_components = [
                    step_name_to_dynamic_component[upstream_step_name]
                    for upstream_step_name in step.spec.upstream_steps
                ]
                task = (
                    component()
                    .set_display_name(
                        name=component_name,
                    )
                    .set_caching_options(enable_caching=False)
                    .set_env_variable(
                        name=ENV_KFP_RUN_ID,
                        value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
                    )
                    .after(*upstream_step_components)
                )
                self._configure_container_resources(
                    task,
                    step.config.resource_settings,
                    node_selector_constraint,
                )

        return dynamic_pipeline

    def _update_yaml_with_environment(
        yaml_file_path: str, environment: Dict[str, str]
    ) -> None:
        """Updates the env section of the steps in the YAML file with the given environment variables.

        Args:
            yaml_file_path: The path to the YAML file to update.
            environment: A dictionary of environment variables to add.
        """
        pipeline_definition = yaml_utils.read_yaml(pipeline_file_path)

        # Iterate through each component and add the environment variables
        for executor in pipeline_definition["deploymentSpec"]["executors"]:
            if (
                "container"
                in pipeline_definition["deploymentSpec"]["executors"][
                    executor
                ]
            ):
                container = pipeline_definition["deploymentSpec"][
                    "executors"
                ][executor]["container"]
                if "env" not in container:
                    container["env"] = []
                for key, value in environment.items():
                    container["env"].append({"name": key, "value": value})

        yaml_utils.write_yaml(pipeline_file_path, pipeline_definition)

        print(
            f"Updated YAML file with environment variables at {yaml_file_path}"
        )

    # Get a filepath to use to save the finished yaml to
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory, f"{orchestrator_run_name}.yaml"
    )

    # write the argo pipeline yaml
    Compiler().compile(
        pipeline_func=_create_dynamic_pipeline(),
        package_path=pipeline_file_path,
        pipeline_name=orchestrator_run_name,
    )

    # Let's update the YAML file with the environment variables
    _update_yaml_with_environment(pipeline_file_path, environment)

    logger.info(
        "Writing Kubeflow workflow definition to `%s`.", pipeline_file_path
    )

    # using the kfp client uploads the pipeline to kubeflow pipelines and
    # runs it there
    self._upload_and_run_pipeline(
        deployment=deployment,
        pipeline_file_path=pipeline_file_path,
        run_name=orchestrator_run_name,
    )
Functions Modules
local_deployment_utils

Utils for the local Kubeflow deployment behaviors.

Classes Functions
add_hostpath_to_kubeflow_pipelines(kubernetes_context: str, local_path: str) -> None

Patches the Kubeflow Pipelines deployment to mount a local folder.

This folder serves as a hostpath for visualization purposes.

This function reconfigures the Kubeflow pipelines deployment to use a shared local folder to support loading the TensorBoard viewer and other pipeline visualization results from a local artifact store, as described here:

https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md

Parameters:

Name Type Description Default
kubernetes_context str

The kubernetes context on which Kubeflow Pipelines should be patched.

required
local_path str

The path to the local folder to mount as a hostpath.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def add_hostpath_to_kubeflow_pipelines(
    kubernetes_context: str, local_path: str
) -> None:
    """Patches the Kubeflow Pipelines deployment to mount a local folder.

    This folder serves as a hostpath for visualization purposes.

    This function reconfigures the Kubeflow pipelines deployment to use a
    shared local folder to support loading the TensorBoard viewer and other
    pipeline visualization results from a local artifact store, as described
    here:

    https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md

    Args:
        kubernetes_context: The kubernetes context on which Kubeflow Pipelines
            should be patched.
        local_path: The path to the local folder to mount as a hostpath.
    """
    logger.info("Patching Kubeflow Pipelines to mount a local folder.")

    pod_template = {
        "spec": {
            "serviceAccountName": "kubeflow-pipelines-viewer",
            "containers": [
                {
                    "volumeMounts": [
                        {
                            "mountPath": local_path,
                            "name": "local-artifact-store",
                        }
                    ]
                }
            ],
            "volumes": [
                {
                    "hostPath": {
                        "path": local_path,
                        "type": "Directory",
                    },
                    "name": "local-artifact-store",
                }
            ],
        }
    }
    pod_template_json = json.dumps(pod_template, indent=2)
    config_map_data = {"data": {"viewer-pod-template.json": pod_template_json}}
    config_map_data_json = json.dumps(config_map_data, indent=2)

    logger.debug(
        "Adding host path volume for local path `%s` to kubeflow pipeline"
        "viewer pod template configuration.",
        local_path,
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "-n",
            "kubeflow",
            "patch",
            "configmap/ml-pipeline-ui-configmap",
            "--type",
            "merge",
            "-p",
            config_map_data_json,
        ]
    )

    deployment_patch = {
        "spec": {
            "template": {
                "spec": {
                    "containers": [
                        {
                            "name": "ml-pipeline-ui",
                            "volumeMounts": [
                                {
                                    "mountPath": local_path,
                                    "name": "local-artifact-store",
                                }
                            ],
                        }
                    ],
                    "volumes": [
                        {
                            "hostPath": {
                                "path": local_path,
                                "type": "Directory",
                            },
                            "name": "local-artifact-store",
                        }
                    ],
                }
            }
        }
    }
    deployment_patch_json = json.dumps(deployment_patch, indent=2)

    logger.debug(
        "Adding host path volume for local path `%s` to the kubeflow UI",
        local_path,
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "-n",
            "kubeflow",
            "patch",
            "deployment/ml-pipeline-ui",
            "--type",
            "strategic",
            "-p",
            deployment_patch_json,
        ]
    )
    wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context)

    logger.info("Finished patching Kubeflow Pipelines setup.")
check_prerequisites(skip_k3d: bool = False, skip_kubectl: bool = False) -> bool

Checks prerequisites for a local kubeflow pipelines deployment.

It makes sure they are installed.

Parameters:

Name Type Description Default
skip_k3d bool

Whether to skip the check for the k3d command.

False
skip_kubectl bool

Whether to skip the check for the kubectl command.

False

Returns:

Type Description
bool

Whether all prerequisites are installed.

Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def check_prerequisites(
    skip_k3d: bool = False, skip_kubectl: bool = False
) -> bool:
    """Checks prerequisites for a local kubeflow pipelines deployment.

    It makes sure they are installed.

    Args:
        skip_k3d: Whether to skip the check for the k3d command.
        skip_kubectl: Whether to skip the check for the kubectl command.

    Returns:
        Whether all prerequisites are installed.
    """
    k3d_installed = skip_k3d or shutil.which("k3d") is not None
    kubectl_installed = skip_kubectl or shutil.which("kubectl") is not None
    logger.debug(
        "Local kubeflow deployment prerequisites: K3D - %s, Kubectl - %s",
        k3d_installed,
        kubectl_installed,
    )
    return k3d_installed and kubectl_installed
create_k3d_cluster(cluster_name: str, registry_name: str, registry_config_path: str) -> None

Creates a K3D cluster.

Parameters:

Name Type Description Default
cluster_name str

Name of the cluster to create.

required
registry_name str

Name of the registry to create for this cluster.

required
registry_config_path str

Path to the registry config file.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def create_k3d_cluster(
    cluster_name: str, registry_name: str, registry_config_path: str
) -> None:
    """Creates a K3D cluster.

    Args:
        cluster_name: Name of the cluster to create.
        registry_name: Name of the registry to create for this cluster.
        registry_config_path: Path to the registry config file.
    """
    logger.info("Creating local K3D cluster '%s'.", cluster_name)
    local_stores_path = GlobalConfiguration().local_stores_path
    subprocess.check_call(
        [
            "k3d",
            "cluster",
            "create",
            cluster_name,
            "--image",
            K3S_IMAGE_NAME,
            "--registry-create",
            registry_name,
            "--registry-config",
            registry_config_path,
            "--volume",
            f"{local_stores_path}:{local_stores_path}",
        ]
    )
    logger.info("Finished K3D cluster creation.")
delete_k3d_cluster(cluster_name: str) -> None

Deletes a K3D cluster with the given name.

Parameters:

Name Type Description Default
cluster_name str

Name of the cluster to delete.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
167
168
169
170
171
172
173
174
def delete_k3d_cluster(cluster_name: str) -> None:
    """Deletes a K3D cluster with the given name.

    Args:
        cluster_name: Name of the cluster to delete.
    """
    subprocess.check_call(["k3d", "cluster", "delete", cluster_name])
    logger.info("Deleted local k3d cluster '%s'.", cluster_name)
deploy_kubeflow_pipelines(kubernetes_context: str) -> None

Deploys Kubeflow Pipelines.

Parameters:

Name Type Description Default
kubernetes_context str

The kubernetes context on which Kubeflow Pipelines should be deployed.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def deploy_kubeflow_pipelines(kubernetes_context: str) -> None:
    """Deploys Kubeflow Pipelines.

    Args:
        kubernetes_context: The kubernetes context on which Kubeflow Pipelines
            should be deployed.
    """
    logger.info("Deploying Kubeflow Pipelines.")
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "apply",
            "-k",
            f"github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref={KFP_VERSION}&timeout=5m",
        ]
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "wait",
            "--timeout=60s",
            "--for",
            "condition=established",
            "crd/applications.app.k8s.io",
        ]
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "apply",
            "-k",
            f"github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref={KFP_VERSION}&timeout=5m",
        ]
    )

    wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context)
    logger.info("Finished Kubeflow Pipelines setup.")
k3d_cluster_exists(cluster_name: str) -> bool

Checks whether there exists a K3D cluster with the given name.

Parameters:

Name Type Description Default
cluster_name str

Name of the cluster to check.

required

Returns:

Type Description
bool

Whether the cluster exists.

Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def k3d_cluster_exists(cluster_name: str) -> bool:
    """Checks whether there exists a K3D cluster with the given name.

    Args:
        cluster_name: Name of the cluster to check.

    Returns:
        Whether the cluster exists.
    """
    output = subprocess.check_output(
        ["k3d", "cluster", "list", "--output", "json"]
    )
    clusters = json.loads(output)
    for cluster in clusters:
        if cluster["name"] == cluster_name:
            return True
    return False
k3d_cluster_running(cluster_name: str) -> bool

Checks whether the K3D cluster with the given name is running.

Parameters:

Name Type Description Default
cluster_name str

Name of the cluster to check.

required

Returns:

Type Description
bool

Whether the cluster is running.

Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def k3d_cluster_running(cluster_name: str) -> bool:
    """Checks whether the K3D cluster with the given name is running.

    Args:
        cluster_name: Name of the cluster to check.

    Returns:
        Whether the cluster is running.
    """
    output = subprocess.check_output(
        ["k3d", "cluster", "list", "--output", "json"]
    )
    clusters = json.loads(output)
    for cluster in clusters:
        if cluster["name"] == cluster_name:
            server_count: int = cluster["serversCount"]
            servers_running: int = cluster["serversRunning"]
            return servers_running == server_count
    return False
kubeflow_pipelines_ready(kubernetes_context: str) -> bool

Returns whether all Kubeflow Pipelines pods are ready.

Parameters:

Name Type Description Default
kubernetes_context str

The kubernetes context in which the pods should be checked.

required

Returns:

Type Description
bool

Whether all Kubeflow Pipelines pods are ready.

Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def kubeflow_pipelines_ready(kubernetes_context: str) -> bool:
    """Returns whether all Kubeflow Pipelines pods are ready.

    Args:
        kubernetes_context: The kubernetes context in which the pods
            should be checked.

    Returns:
        Whether all Kubeflow Pipelines pods are ready.
    """
    try:
        subprocess.check_call(
            [
                "kubectl",
                "--context",
                kubernetes_context,
                "--namespace",
                "kubeflow",
                "wait",
                "--for",
                "condition=ready",
                "--timeout=0s",
                "pods",
                "-l",
                "application-crd-id=kubeflow-pipelines",
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
        return True
    except subprocess.CalledProcessError:
        return False
start_k3d_cluster(cluster_name: str) -> None

Starts a K3D cluster with the given name.

Parameters:

Name Type Description Default
cluster_name str

Name of the cluster to start.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
147
148
149
150
151
152
153
154
def start_k3d_cluster(cluster_name: str) -> None:
    """Starts a K3D cluster with the given name.

    Args:
        cluster_name: Name of the cluster to start.
    """
    subprocess.check_call(["k3d", "cluster", "start", cluster_name])
    logger.info("Started local k3d cluster '%s'.", cluster_name)
start_kfp_ui_daemon(pid_file_path: str, log_file_path: str, port: int, kubernetes_context: str) -> None

Starts a daemon process that forwards ports.

This is so the Kubeflow Pipelines UI is accessible in the browser.

Parameters:

Name Type Description Default
pid_file_path str

Path where the file with the daemons process ID should be written.

required
log_file_path str

Path to a file where the daemon logs should be written.

required
port int

Port on which the UI should be accessible.

required
kubernetes_context str

The kubernetes context for the cluster where Kubeflow Pipelines is running.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
def start_kfp_ui_daemon(
    pid_file_path: str,
    log_file_path: str,
    port: int,
    kubernetes_context: str,
) -> None:
    """Starts a daemon process that forwards ports.

    This is so the Kubeflow Pipelines UI is accessible in the browser.

    Args:
        pid_file_path: Path where the file with the daemons process ID should
            be written.
        log_file_path: Path to a file where the daemon logs should be written.
        port: Port on which the UI should be accessible.
        kubernetes_context: The kubernetes context for the cluster where
            Kubeflow Pipelines is running.
    """
    command = [
        "kubectl",
        "--context",
        kubernetes_context,
        "--namespace",
        "kubeflow",
        "port-forward",
        "svc/ml-pipeline-ui",
        f"{port}:80",
    ]

    if not networking_utils.port_available(port):
        modified_command = command.copy()
        modified_command[-1] = "PORT:80"
        logger.warning(
            "Unable to port-forward Kubeflow Pipelines UI to local port %d "
            "because the port is occupied. In order to access the Kubeflow "
            "Pipelines UI at http://localhost:PORT/, please run '%s' in a "
            "separate command line shell (replace PORT with a free port of "
            "your choice).",
            port,
            " ".join(modified_command),
        )
    elif sys.platform == "win32":
        logger.warning(
            "Daemon functionality not supported on Windows. "
            "In order to access the Kubeflow Pipelines UI at "
            "http://localhost:%d/, please run '%s' in a separate command "
            "line shell.",
            port,
            " ".join(command),
        )
    else:
        from zenml.utils import daemon

        def _daemon_function() -> None:
            """Port-forwards the Kubeflow Pipelines UI pod."""
            subprocess.check_call(command)

        daemon.run_as_daemon(
            _daemon_function, pid_file=pid_file_path, log_file=log_file_path
        )
        logger.info(
            "Started Kubeflow Pipelines UI daemon (check the daemon logs at %s "
            "in case you're not able to view the UI). The Kubeflow Pipelines "
            "UI should now be accessible at http://localhost:%d/.",
            log_file_path,
            port,
        )
stop_k3d_cluster(cluster_name: str) -> None

Stops a K3D cluster with the given name.

Parameters:

Name Type Description Default
cluster_name str

Name of the cluster to stop.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
157
158
159
160
161
162
163
164
def stop_k3d_cluster(cluster_name: str) -> None:
    """Stops a K3D cluster with the given name.

    Args:
        cluster_name: Name of the cluster to stop.
    """
    subprocess.check_call(["k3d", "cluster", "stop", cluster_name])
    logger.info("Stopped local k3d cluster '%s'.", cluster_name)
stop_kfp_ui_daemon(pid_file_path: str) -> None

Stops the KFP UI daemon process if it is running.

Parameters:

Name Type Description Default
pid_file_path str

Path to the file with the daemons process ID.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def stop_kfp_ui_daemon(pid_file_path: str) -> None:
    """Stops the KFP UI daemon process if it is running.

    Args:
        pid_file_path: Path to the file with the daemons process ID.
    """
    if fileio.exists(pid_file_path):
        if sys.platform == "win32":
            # Daemon functionality is not supported on Windows, so the PID
            # file won't exist. This if clause exists just for mypy to not
            # complain about missing functions
            pass
        else:
            from zenml.utils import daemon

            daemon.stop_daemon(pid_file_path)
            fileio.remove(pid_file_path)
            logger.info("Stopped Kubeflow Pipelines UI daemon.")
wait_until_kubeflow_pipelines_ready(kubernetes_context: str) -> None

Waits until all Kubeflow Pipelines pods are ready.

Parameters:

Name Type Description Default
kubernetes_context str

The kubernetes context in which the pods should be checked.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def wait_until_kubeflow_pipelines_ready(kubernetes_context: str) -> None:
    """Waits until all Kubeflow Pipelines pods are ready.

    Args:
        kubernetes_context: The kubernetes context in which the pods
            should be checked.
    """
    logger.info(
        "Waiting for all Kubeflow Pipelines pods to be ready (this might "
        "take a few minutes)."
    )
    while True:
        logger.info("Current pod status:")
        subprocess.check_call(
            [
                "kubectl",
                "--context",
                kubernetes_context,
                "--namespace",
                "kubeflow",
                "get",
                "pods",
            ]
        )
        if kubeflow_pipelines_ready(kubernetes_context=kubernetes_context):
            break

        logger.info(
            "One or more pods not ready yet, waiting for 30 seconds..."
        )
        time.sleep(30)
write_local_registry_yaml(yaml_path: str, registry_name: str, registry_uri: str) -> None

Writes a K3D registry config file.

Parameters:

Name Type Description Default
yaml_path str

Path where the config file should be written to.

required
registry_name str

Name of the registry.

required
registry_uri str

URI of the registry.

required
Source code in src/zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def write_local_registry_yaml(
    yaml_path: str, registry_name: str, registry_uri: str
) -> None:
    """Writes a K3D registry config file.

    Args:
        yaml_path: Path where the config file should be written to.
        registry_name: Name of the registry.
        registry_uri: URI of the registry.
    """
    yaml_content = {
        "mirrors": {registry_uri: {"endpoint": [f"http://{registry_name}"]}}
    }
    yaml_utils.write_yaml(yaml_path, yaml_content)
Modules