Kubeflow
        zenml.integrations.kubeflow
  
      special
  
    Initialization of the Kubeflow integration for ZenML.
The Kubeflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubeflow orchestrator with the CLI tool.
        
KubeflowIntegration            (Integration)
        
    Definition of Kubeflow Integration for ZenML.
Source code in zenml/integrations/kubeflow/__init__.py
          class KubeflowIntegration(Integration):
    """Definition of Kubeflow Integration for ZenML."""
    NAME = KUBEFLOW
    REQUIREMENTS = ["kfp>=2.6.0", "kfp-kubernetes>=1.1.0"]  # Only 1.x version that supports pyyaml 6
    REQUIREMENTS_IGNORED_ON_UNINSTALL = [
        "kfp", # it is used by GCP as well
    ]
    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Kubeflow integration.
        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.kubeflow.flavors import (
            KubeflowOrchestratorFlavor,
        )
        return [KubeflowOrchestratorFlavor]
flavors()
  
      classmethod
  
    Declare the stack component flavors for the Kubeflow integration.
Returns:
| Type | Description | 
|---|---|
| List[Type[zenml.stack.flavor.Flavor]] | List of stack component flavors for this integration. | 
Source code in zenml/integrations/kubeflow/__init__.py
          @classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Kubeflow integration.
    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.kubeflow.flavors import (
        KubeflowOrchestratorFlavor,
    )
    return [KubeflowOrchestratorFlavor]
        flavors
  
      special
  
    Kubeflow integration flavors.
        kubeflow_orchestrator_flavor
    Kubeflow orchestrator flavor.
        
KubeflowOrchestratorConfig            (BaseOrchestratorConfig, KubeflowOrchestratorSettings)
        
    Configuration for the Kubeflow orchestrator.
Attributes:
| Name | Type | Description | 
|---|---|---|
| kubeflow_hostname | Optional[str] | The hostname to use to talk to the Kubeflow Pipelines API. If not set, the hostname will be derived from the Kubernetes API proxy. Mandatory when connecting to a multi-tenant Kubeflow Pipelines deployment. | 
| kubeflow_namespace | str | The Kubernetes namespace in which Kubeflow
Pipelines is deployed. Defaults to  | 
| kubernetes_context | Optional[str] | Name of a kubernetes context to run
pipelines in. Not applicable when connecting to a multi-tenant
Kubeflow Pipelines deployment (i.e. when  | 
Source code in zenml/integrations/kubeflow/flavors/kubeflow_orchestrator_flavor.py
          class KubeflowOrchestratorConfig(
    BaseOrchestratorConfig, KubeflowOrchestratorSettings
):
    """Configuration for the Kubeflow orchestrator.
    Attributes:
        kubeflow_hostname: The hostname to use to talk to the Kubeflow Pipelines
            API. If not set, the hostname will be derived from the Kubernetes
            API proxy. Mandatory when connecting to a multi-tenant Kubeflow
            Pipelines deployment.
        kubeflow_namespace: The Kubernetes namespace in which Kubeflow
            Pipelines is deployed. Defaults to `kubeflow`.
        kubernetes_context: Name of a kubernetes context to run
            pipelines in. Not applicable when connecting to a multi-tenant
            Kubeflow Pipelines deployment (i.e. when `kubeflow_hostname` is
            set) or if the stack component is linked to a Kubernetes service
            connector.
    """
    kubeflow_hostname: Optional[str] = None
    kubeflow_namespace: str = "kubeflow"
    kubernetes_context: Optional[str] = None  # TODO: Potential setting
    @model_validator(mode="before")
    @classmethod
    @before_validator_handler
    def _validate_deprecated_attrs(
        cls, data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Pydantic root_validator for deprecated attributes.
        This root validator is used for backwards compatibility purposes. E.g.
        it handles attributes that are no longer available or that have become
        mandatory in the meantime.
        Args:
            data: Values passed to the object constructor
        Returns:
            Values passed to the object constructor
        """
        provisioning_attrs = [
            "skip_cluster_provisioning",
            "skip_ui_daemon_provisioning",
            "kubeflow_pipelines_ui_port",
        ]
        # remove deprecated attributes from values dict
        for attr in provisioning_attrs:
            if attr in data:
                del data[attr]
        return data
    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.
        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.
        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.
        Returns:
            True if this config is for a local component, False otherwise.
        """
        return False
    @property
    def is_synchronous(self) -> bool:
        """Whether the orchestrator runs synchronous or not.
        Returns:
            Whether the orchestrator runs synchronous or not.
        """
        return self.synchronous
    @property
    def is_schedulable(self) -> bool:
        """Whether the orchestrator is schedulable or not.
        Returns:
            Whether the orchestrator is schedulable or not.
        """
        return True
is_local: bool
  
      property
      readonly
  
    Checks if this stack component is running locally.
Returns:
| Type | Description | 
|---|---|
| bool | True if this config is for a local component, False otherwise. | 
is_remote: bool
  
      property
      readonly
  
    Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
| Type | Description | 
|---|---|
| bool | True if this config is for a remote component, False otherwise. | 
is_schedulable: bool
  
      property
      readonly
  
    Whether the orchestrator is schedulable or not.
Returns:
| Type | Description | 
|---|---|
| bool | Whether the orchestrator is schedulable or not. | 
is_synchronous: bool
  
      property
      readonly
  
    Whether the orchestrator runs synchronous or not.
Returns:
| Type | Description | 
|---|---|
| bool | Whether the orchestrator runs synchronous or not. | 
        
KubeflowOrchestratorFlavor            (BaseOrchestratorFlavor)
        
    Kubeflow orchestrator flavor.
Source code in zenml/integrations/kubeflow/flavors/kubeflow_orchestrator_flavor.py
          class KubeflowOrchestratorFlavor(BaseOrchestratorFlavor):
    """Kubeflow orchestrator flavor."""
    @property
    def name(self) -> str:
        """Name of the flavor.
        Returns:
            The name of the flavor.
        """
        return KUBEFLOW_ORCHESTRATOR_FLAVOR
    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.
        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.
        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
        )
    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.
        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()
    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.
        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()
    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.
        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/kubeflow.png"
    @property
    def config_class(self) -> Type[KubeflowOrchestratorConfig]:
        """Returns `KubeflowOrchestratorConfig` config class.
        Returns:
                The config class.
        """
        return KubeflowOrchestratorConfig
    @property
    def implementation_class(self) -> Type["KubeflowOrchestrator"]:
        """Implementation class for this flavor.
        Returns:
            The implementation class.
        """
        from zenml.integrations.kubeflow.orchestrators import (
            KubeflowOrchestrator,
        )
        return KubeflowOrchestrator
config_class: Type[zenml.integrations.kubeflow.flavors.kubeflow_orchestrator_flavor.KubeflowOrchestratorConfig]
  
      property
      readonly
  
    Returns KubeflowOrchestratorConfig config class.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.integrations.kubeflow.flavors.kubeflow_orchestrator_flavor.KubeflowOrchestratorConfig] | The config class. | 
docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor docs url. | 
implementation_class: Type[KubeflowOrchestrator]
  
      property
      readonly
  
    Implementation class for this flavor.
Returns:
| Type | Description | 
|---|---|
| Type[KubeflowOrchestrator] | The implementation class. | 
logo_url: str
  
      property
      readonly
  
    A url to represent the flavor in the dashboard.
Returns:
| Type | Description | 
|---|---|
| str | The flavor logo. | 
name: str
  
      property
      readonly
  
    Name of the flavor.
Returns:
| Type | Description | 
|---|---|
| str | The name of the flavor. | 
sdk_docs_url: Optional[str]
  
      property
      readonly
  
    A url to point at SDK docs explaining this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[str] | A flavor SDK docs url. | 
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
  
      property
      readonly
  
    Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
| Type | Description | 
|---|---|
| Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] | Requirements for compatible service connectors, if a service connector is required for this flavor. | 
        
KubeflowOrchestratorSettings            (BaseSettings)
        
    Settings for the Kubeflow orchestrator.
Attributes:
| Name | Type | Description | 
|---|---|---|
| synchronous | bool | If  | 
| timeout | int | How many seconds to wait for synchronous runs. | 
| client_args | Dict[str, Any] | Arguments to pass when initializing the KFP client. | 
| client_username | Optional[str] | Username to generate a session cookie for the kubeflow client. Both  | 
| client_password | Optional[str] | Password to generate a session cookie for the kubeflow client. Both  | 
| user_namespace | Optional[str] | The user namespace to use when creating experiments and runs. | 
| pod_settings | Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] | Pod settings to apply. | 
Source code in zenml/integrations/kubeflow/flavors/kubeflow_orchestrator_flavor.py
          class KubeflowOrchestratorSettings(BaseSettings):
    """Settings for the Kubeflow orchestrator.
    Attributes:
        synchronous: If `True`, the client running a pipeline using this
            orchestrator waits until all steps finish running. If `False`,
            the client returns immediately and the pipeline is executed
            asynchronously. Defaults to `True`. This setting only
            has an effect when specified on the pipeline and will be ignored if
            specified on steps.
        timeout: How many seconds to wait for synchronous runs.
        client_args: Arguments to pass when initializing the KFP client.
        client_username: Username to generate a session cookie for the kubeflow client. Both `client_username`
        and `client_password` need to be set together.
        client_password: Password to generate a session cookie for the kubeflow client. Both `client_username`
        and `client_password` need to be set together.
        user_namespace: The user namespace to use when creating experiments
            and runs.
        pod_settings: Pod settings to apply.
    """
    synchronous: bool = True
    timeout: int = 1200
    client_args: Dict[str, Any] = {}
    client_username: Optional[str] = SecretField(default=None)
    client_password: Optional[str] = SecretField(default=None)
    user_namespace: Optional[str] = None
    pod_settings: Optional[KubernetesPodSettings] = None
    @model_validator(mode="before")
    @classmethod
    @before_validator_handler
    def _validate_and_migrate_pod_settings(
        cls, data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Validates settings and migrates pod settings from older version.
        Args:
            data: Dict representing user-specified runtime settings.
        Returns:
            Validated settings.
        Raises:
            ValueError: If username and password are not specified together.
        """
        node_selectors = cast(Dict[str, str], data.get("node_selectors") or {})
        node_affinity = cast(
            Dict[str, List[str]], data.get("node_affinity") or {}
        )
        affinity = {}
        if node_affinity:
            from kubernetes import client as k8s_client
            match_expressions = [
                k8s_client.V1NodeSelectorRequirement(
                    key=key,
                    operator="In",
                    values=values,
                )
                for key, values in node_affinity.items()
            ]
            affinity = k8s_client.V1Affinity(
                node_affinity=k8s_client.V1NodeAffinity(
                    required_during_scheduling_ignored_during_execution=k8s_client.V1NodeSelector(
                        node_selector_terms=[
                            k8s_client.V1NodeSelectorTerm(
                                match_expressions=match_expressions
                            )
                        ]
                    )
                )
            )
        pod_settings = KubernetesPodSettings(
            node_selectors=node_selectors, affinity=affinity
        )
        data["pod_settings"] = pod_settings
        # Validate username and password for auth cookie logic
        username = data.get("client_username")
        password = data.get("client_password")
        client_creds_error = "`client_username` and `client_password` both need to be set together."
        if username and password is None:
            raise ValueError(client_creds_error)
        if password and username is None:
            raise ValueError(client_creds_error)
        return data
        orchestrators
  
      special
  
    Initialization of the Kubeflow ZenML orchestrator.
        kubeflow_orchestrator
    Implementation of the Kubeflow orchestrator.
        
KubeClientKFPClient            (Client)
        
    KFP client initialized from a Kubernetes client.
This is a workaround for the fact that the native KFP client does not support initialization from an existing Kubernetes client.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          class KubeClientKFPClient(kfp.Client):  # type: ignore[misc]
    """KFP client initialized from a Kubernetes client.
    This is a workaround for the fact that the native KFP client does not
    support initialization from an existing Kubernetes client.
    """
    def __init__(
        self, client: k8s_client.ApiClient, *args: Any, **kwargs: Any
    ) -> None:
        """Initializes the KFP client from a Kubernetes client.
        Args:
            client: pre-configured Kubernetes client.
            args: standard KFP client positional arguments.
            kwargs: standard KFP client keyword arguments.
        """
        self._k8s_client = client
        super().__init__(*args, **kwargs)
    def _load_config(self, *args: Any, **kwargs: Any) -> Any:
        """Loads the KFP configuration.
        Initializes the KFP configuration from the Kubernetes client.
        Args:
            args: standard KFP client positional arguments.
            kwargs: standard KFP client keyword arguments.
        Returns:
            The KFP configuration.
        """
        from kfp_server_api.configuration import Configuration
        kube_config = self._k8s_client.configuration
        host = (
            kube_config.host
            + "/"
            + self._KUBE_PROXY_PATH.format(kwargs.get("namespace", "kubeflow"))
        )
        config = Configuration(
            host=host,
            api_key=kube_config.api_key,
            api_key_prefix=kube_config.api_key_prefix,
            username=kube_config.username,
            password=kube_config.password,
            discard_unknown_keys=kube_config.discard_unknown_keys,
        )
        # Extra attributes not present in the Configuration constructor
        keys = ["ssl_ca_cert", "cert_file", "key_file", "verify_ssl"]
        for key in keys:
            if key in kube_config.__dict__:
                setattr(config, key, getattr(kube_config, key))
        return config
__init__(self, client, *args, **kwargs)
  
      special
  
    Initializes the KFP client from a Kubernetes client.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| client | kubernetes.client.ApiClient | pre-configured Kubernetes client. | required | 
| args | Any | standard KFP client positional arguments. | () | 
| kwargs | Any | standard KFP client keyword arguments. | {} | 
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          def __init__(
    self, client: k8s_client.ApiClient, *args: Any, **kwargs: Any
) -> None:
    """Initializes the KFP client from a Kubernetes client.
    Args:
        client: pre-configured Kubernetes client.
        args: standard KFP client positional arguments.
        kwargs: standard KFP client keyword arguments.
    """
    self._k8s_client = client
    super().__init__(*args, **kwargs)
        
KubeflowOrchestrator            (ContainerizedOrchestrator)
        
    Orchestrator responsible for running pipelines using Kubeflow.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          class KubeflowOrchestrator(ContainerizedOrchestrator):
    """Orchestrator responsible for running pipelines using Kubeflow."""
    _k8s_client: Optional[k8s_client.ApiClient] = None
    def _get_kfp_client(
        self,
        settings: KubeflowOrchestratorSettings,
    ) -> kfp.Client:
        """Creates a KFP client instance.
        Args:
            settings: Settings which can be used to
                configure the client instance.
        Returns:
            A KFP client instance.
        Raises:
            RuntimeError: If the linked Kubernetes connector behaves
                unexpectedly.
        """
        connector = self.get_connector()
        client_args = settings.client_args.copy()
        # The kube_context, host and namespace are stack component
        # configurations that refer to the Kubeflow deployment. We don't want
        # these overwritten on a run by run basis by user settings
        client_args["namespace"] = self.config.kubeflow_namespace
        if connector:
            client = connector.connect()
            if not isinstance(client, k8s_client.ApiClient):
                raise RuntimeError(
                    f"Expected a k8s_client.ApiClient while trying to use the "
                    f"linked connector, but got {type(client)}."
                )
            return KubeClientKFPClient(
                client=client,
                **client_args,
            )
        elif self.config.kubernetes_context:
            client_args["kube_context"] = self.config.kubernetes_context
        elif self.config.kubeflow_hostname:
            client_args["host"] = self.config.kubeflow_hostname
            # Handle username and password, ignore the case if one is passed and
            # not the other. Also do not attempt to get cookie if cookie is
            # already passed in client_args
            if settings.client_username and settings.client_password:
                # If cookie is already set, then ignore
                if "cookie" in client_args:
                    logger.warning(
                        "Cookie already set in `client_args`, ignoring "
                        "`client_username` and `client_password`..."
                    )
                else:
                    session_cookie = self._get_session_cookie(
                        username=settings.client_username,
                        password=settings.client_password,
                    )
                    client_args["cookies"] = session_cookie
        return KFPClient(**client_args)
    @property
    def config(self) -> KubeflowOrchestratorConfig:
        """Returns the `KubeflowOrchestratorConfig` config.
        Returns:
            The configuration.
        """
        return cast(KubeflowOrchestratorConfig, self._config)
    def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
        """Get the list of configured Kubernetes contexts and the active context.
        Returns:
            A tuple containing the list of configured Kubernetes contexts and
            the active context.
        """
        try:
            contexts, active_context = k8s_config.list_kube_config_contexts()
        except k8s_config.config_exception.ConfigException:
            return [], None
        context_names = [c["name"] for c in contexts]
        active_context_name = active_context["name"]
        return context_names, active_context_name
    @property
    def settings_class(self) -> Type[KubeflowOrchestratorSettings]:
        """Settings class for the Kubeflow orchestrator.
        Returns:
            The settings class.
        """
        return KubeflowOrchestratorSettings
    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates that the stack contains a container registry.
        Also check that requirements are met for local components.
        Returns:
            A `StackValidator` instance.
        """
        msg = f"'{self.name}' Kubeflow orchestrator error: "
        def _validate_kube_context(
            kubernetes_context: str,
        ) -> Tuple[bool, str]:
            contexts, active_context = self.get_kubernetes_contexts()
            if kubernetes_context and kubernetes_context not in contexts:
                if not self.config.is_local:
                    return False, (
                        f"{msg}could not find a Kubernetes context named "
                        f"'{kubernetes_context}' in the local Kubernetes "
                        f"configuration. Please make sure that the Kubernetes "
                        f"cluster is running and that the kubeconfig file is "
                        f"configured correctly. To list all configured "
                        f"contexts, run:\n\n"
                        f"  `kubectl config get-contexts`\n"
                    )
            elif (
                kubernetes_context
                and active_context
                and kubernetes_context != active_context
            ):
                logger.warning(
                    f"{msg}the Kubernetes context '{kubernetes_context}' "  # nosec
                    f"configured for the Kubeflow orchestrator is not the "
                    f"same as the active context in the local Kubernetes "
                    f"configuration. If this is not deliberate, you should "
                    f"update the orchestrator's `kubernetes_context` field by "
                    f"running:\n\n"
                    f"  `zenml orchestrator update {self.name} "
                    f"--kubernetes_context={active_context}`\n"
                    f"To list all configured contexts, run:\n\n"
                    f"  `kubectl config get-contexts`\n"
                    f"To set the active context to be the same as the one "
                    f"configured in the Kubeflow orchestrator and silence "
                    f"this warning, run:\n\n"
                    f"  `kubectl config use-context "
                    f"{kubernetes_context}`\n"
                )
            return True, ""
        def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]:
            container_registry = stack.container_registry
            # should not happen, because the stack validation takes care of
            # this, but just in case
            assert container_registry is not None
            kubernetes_context = self.config.kubernetes_context
            if not self.connector:
                if (
                    not kubernetes_context
                    and not self.config.kubeflow_hostname
                ):
                    return False, (
                        f"{msg}the Kubeflow orchestrator is incompletely "
                        "configured. For a multi-tenant Kubeflow deployment, "
                        "you must set the `kubeflow_hostname` attribute in the "
                        "orchestrator configuration. For a single-tenant "
                        "deployment, you must either set the "
                        "`kubernetes_context` attribute in the orchestrator "
                        "configuration to the name of the Kubernetes config "
                        "context pointing to the cluster where you would like "
                        "to run pipelines or link this stack component to a "
                        "Kubernetes cluster via a service connector (see the "
                        "'zenml orchestrator connect' CLI command)."
                    )
                if kubernetes_context:
                    valid, err = _validate_kube_context(kubernetes_context)
                    if not valid:
                        return False, err
            silence_local_validations_msg = (
                f"To silence this warning, set the "
                f"`skip_local_validations` attribute to True in the "
                f"orchestrator configuration by running:\n\n"
                f"  'zenml orchestrator update {self.name} "
                f"--skip_local_validations=True'\n"
            )
            if not self.config.is_local:
                # if the orchestrator is not running in a local k3d cluster,
                # we cannot have any other local components in our stack,
                # because we cannot mount the local path into the container.
                # This may result in problems when running the pipeline,
                # because the local components will not be available inside the
                # Kubeflow containers.
                # go through all stack components and identify those that
                # advertise a local path where they persist information that
                # they need to be available when running pipelines.
                for stack_comp in stack.components.values():
                    local_path = stack_comp.local_path
                    if not local_path:
                        continue
                    return False, (
                        f"{msg}the Kubeflow orchestrator is configured to run "
                        f"pipelines in a remote Kubernetes cluster but the "
                        f"'{stack_comp.name}' {stack_comp.type.value} is a "
                        "local stack component and will not be available in "
                        "the Kubeflow pipeline step.\n"
                        "Please ensure that you always use non-local "
                        f"stack components with a remote Kubeflow "
                        f"orchestrator, otherwise you may run into pipeline "
                        f"execution problems. You should use a flavor of "
                        f"{stack_comp.type.value} other than "
                        f"'{stack_comp.flavor}'.\n"
                        + silence_local_validations_msg
                    )
                # if the orchestrator is remote, the container registry must
                # also be remote.
                if container_registry.config.is_local:
                    return False, (
                        f"{msg}the Kubeflow orchestrator is configured to run "
                        f"pipelines in a remote Kubernetes cluster, but the "
                        f"'{container_registry.name}' container registry URI "
                        f"'{container_registry.config.uri}' "
                        f"points to a local container registry. Please ensure "
                        f"that you always use non-local stack components with "
                        f"a remote Kubeflow orchestrator, otherwise you will "
                        f"run into problems. You should use a flavor of "
                        f"container registry other than "
                        f"'{container_registry.flavor}'.\n"
                        + silence_local_validations_msg
                    )
            return True, ""
        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_local_requirements,
        )
    @property
    def root_directory(self) -> str:
        """Path to the root directory for all files concerning this orchestrator.
        Returns:
            Path to the root directory.
        """
        return os.path.join(
            io_utils.get_global_config_directory(),
            "kubeflow",
            str(self.id),
        )
    @property
    def pipeline_directory(self) -> str:
        """Returns path to a directory in which the kubeflow pipeline files are stored.
        Returns:
            Path to the pipeline directory.
        """
        return os.path.join(self.root_directory, "pipelines")
    def _create_dynamic_component(
        self,
        image: str,
        command: List[str],
        arguments: List[str],
        component_name: str,
    ) -> dsl.PipelineTask:
        """Creates a dynamic container component for a Kubeflow pipeline.
        Args:
            image: The image to use for the component.
            command: The command to use for the component.
            arguments: The arguments to use for the component.
            component_name: The name of the component.
        Returns:
            The dynamic container component.
        """
        def dynamic_container_component() -> dsl.ContainerSpec:
            """Dynamic container component.
            Returns:
                The dynamic container component.
            """
            return dsl.ContainerSpec(
                image=image,
                command=command,
                args=arguments,
            )
        # Change the name of the function
        new_container_spec_func = types.FunctionType(
            dynamic_container_component.__code__,
            dynamic_container_component.__globals__,
            name=component_name,
            argdefs=dynamic_container_component.__defaults__,
            closure=dynamic_container_component.__closure__,
        )
        pipeline_task = dsl.container_component(new_container_spec_func)
        return pipeline_task
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        stack: "Stack",
        environment: Dict[str, str],
    ) -> Any:
        """Creates a kfp yaml file.
        This functions as an intermediary representation of the pipeline which
        is then deployed to the kubeflow pipelines instance.
        How it works:
        -------------
        Before this method is called the `prepare_pipeline_deployment()`
        method builds a docker image that contains the code for the
        pipeline, all steps the context around these files.
        Based on this docker image a callable is created which builds
        container_ops for each step (`_construct_kfp_pipeline`).
        To do this the entrypoint of the docker image is configured to
        run the correct step within the docker image. The dependencies
        between these container_ops are then also configured onto each
        container_op by pointing at the downstream steps.
        This callable is then compiled into a kfp yaml file that is used as
        the intermediary representation of the kubeflow pipeline.
        This file, together with some metadata, runtime configurations is
        then uploaded into the kubeflow pipelines cluster for execution.
        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
            environment: Environment variables to set in the orchestration
                environment.
        Raises:
            RuntimeError: If trying to run a pipeline in a notebook
                environment.
        """
        # First check whether the code running in a notebook
        if Environment.in_notebook():
            raise RuntimeError(
                "The Kubeflow orchestrator cannot run pipelines in a notebook "
                "environment. The reason is that it is non-trivial to create "
                "a Docker image of a notebook. Please consider refactoring "
                "your notebook cells into separate scripts in a Python module "
                "and run the code outside of a notebook when using this "
                "orchestrator."
            )
        assert stack.container_registry
        # Create a callable for future compilation into a dsl.Pipeline.
        orchestrator_run_name = get_orchestrator_run_name(
            pipeline_name=deployment.pipeline_configuration.name
        ).replace("_", "-")
        def _create_dynamic_pipeline() -> Any:
            """Create a dynamic pipeline including each step.
            Returns:
                pipeline_func
            """
            step_name_to_dynamic_component: Dict[str, Any] = {}
            node_selector_constraint: Optional[Tuple[str, str]] = None
            for step_name, step in deployment.step_configurations.items():
                image = self.get_image(
                    deployment=deployment,
                    step_name=step_name,
                )
                command = StepEntrypointConfiguration.get_entrypoint_command()
                arguments = (
                    StepEntrypointConfiguration.get_entrypoint_arguments(
                        step_name=step_name,
                        deployment_id=deployment.id,
                    )
                )
                dynamic_component = self._create_dynamic_component(
                    image, command, arguments, step_name
                )
                step_settings = cast(
                    KubeflowOrchestratorSettings, self.get_settings(step)
                )
                pod_settings = step_settings.pod_settings
                if pod_settings:
                    if pod_settings.host_ipc:
                        logger.warning(
                            "Host IPC is set to `True` but not supported in "
                            "this orchestrator. Ignoring..."
                        )
                    if pod_settings.affinity:
                        logger.warning(
                            "Affinity is set but not supported in Kubeflow with "
                            "Kubeflow Pipelines 2.x. Ignoring..."
                        )
                    if pod_settings.tolerations:
                        logger.warning(
                            "Tolerations are set but not supported in "
                            "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                        )
                    if pod_settings.volumes:
                        logger.warning(
                            "Volumes are set but not supported in Kubeflow with "
                            "Kubeflow Pipelines 2.x. Ignoring..."
                        )
                    if pod_settings.volume_mounts:
                        logger.warning(
                            "Volume mounts are set but not supported in "
                            "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                        )
                    # apply pod settings
                    if (
                        KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                        in pod_settings.node_selectors.keys()
                    ):
                        node_selector_constraint = (
                            KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            pod_settings.node_selectors[
                                KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                            ],
                        )
                step_name_to_dynamic_component[step_name] = dynamic_component
            @dsl.pipeline(  # type: ignore[misc]
                display_name=orchestrator_run_name,
            )
            def dynamic_pipeline() -> None:
                """Dynamic pipeline."""
                # iterate through the components one by one
                # (from step_name_to_dynamic_component)
                for (
                    component_name,
                    component,
                ) in step_name_to_dynamic_component.items():
                    # for each component, check to see what other steps are
                    # upstream of it
                    step = deployment.step_configurations[component_name]
                    upstream_step_components = [
                        step_name_to_dynamic_component[upstream_step_name]
                        for upstream_step_name in step.spec.upstream_steps
                    ]
                    task = (
                        component()
                        .set_display_name(
                            name=component_name,
                        )
                        .set_caching_options(enable_caching=False)
                        .set_env_variable(
                            name=ENV_KFP_RUN_ID,
                            value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
                        )
                        .after(*upstream_step_components)
                    )
                    self._configure_container_resources(
                        task,
                        step.config.resource_settings,
                        node_selector_constraint,
                    )
            return dynamic_pipeline
        def _update_yaml_with_environment(
            yaml_file_path: str, environment: Dict[str, str]
        ) -> None:
            """Updates the env section of the steps in the YAML file with the given environment variables.
            Args:
                yaml_file_path: The path to the YAML file to update.
                environment: A dictionary of environment variables to add.
            """
            pipeline_definition = yaml_utils.read_yaml(pipeline_file_path)
            # Iterate through each component and add the environment variables
            for executor in pipeline_definition["deploymentSpec"]["executors"]:
                if (
                    "container"
                    in pipeline_definition["deploymentSpec"]["executors"][
                        executor
                    ]
                ):
                    container = pipeline_definition["deploymentSpec"][
                        "executors"
                    ][executor]["container"]
                    if "env" not in container:
                        container["env"] = []
                    for key, value in environment.items():
                        container["env"].append({"name": key, "value": value})
            yaml_utils.write_yaml(pipeline_file_path, pipeline_definition)
            print(
                f"Updated YAML file with environment variables at {yaml_file_path}"
            )
        # Get a filepath to use to save the finished yaml to
        fileio.makedirs(self.pipeline_directory)
        pipeline_file_path = os.path.join(
            self.pipeline_directory, f"{orchestrator_run_name}.yaml"
        )
        # write the argo pipeline yaml
        Compiler().compile(
            pipeline_func=_create_dynamic_pipeline(),
            package_path=pipeline_file_path,
            pipeline_name=orchestrator_run_name,
        )
        # Let's update the YAML file with the environment variables
        _update_yaml_with_environment(pipeline_file_path, environment)
        logger.info(
            "Writing Kubeflow workflow definition to `%s`.", pipeline_file_path
        )
        # using the kfp client uploads the pipeline to kubeflow pipelines and
        # runs it there
        self._upload_and_run_pipeline(
            deployment=deployment,
            pipeline_file_path=pipeline_file_path,
            run_name=orchestrator_run_name,
        )
    def _upload_and_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponse",
        pipeline_file_path: str,
        run_name: str,
    ) -> None:
        """Tries to upload and run a KFP pipeline.
        Args:
            deployment: The pipeline deployment.
            pipeline_file_path: Path to the pipeline definition file.
            run_name: The Kubeflow run name.
        Raises:
            RuntimeError: If Kubeflow API returns an error.
        """
        pipeline_name = deployment.pipeline_configuration.name
        settings = cast(
            KubeflowOrchestratorSettings, self.get_settings(deployment)
        )
        user_namespace = settings.user_namespace
        kubernetes_context = self.config.kubernetes_context
        try:
            if kubernetes_context:
                logger.info(
                    "Running in kubernetes context '%s'.",
                    kubernetes_context,
                )
            elif self.config.kubeflow_hostname:
                logger.info(
                    "Running on Kubeflow deployment '%s'.",
                    self.config.kubeflow_hostname,
                )
            elif self.connector:
                logger.info(
                    "Running with Kubernetes credentials from connector '%s'.",
                    str(self.connector),
                )
            # upload the pipeline to Kubeflow and start it
            client = self._get_kfp_client(settings=settings)
            if deployment.schedule:
                try:
                    experiment = client.get_experiment(
                        pipeline_name, namespace=user_namespace
                    )
                    logger.info(
                        "A recurring run has already been created with this "
                        "pipeline. Creating new recurring run now.."
                    )
                except (ValueError, ApiException):
                    experiment = client.create_experiment(
                        pipeline_name, namespace=user_namespace
                    )
                    logger.info(
                        "Creating a new recurring run for pipeline '%s'.. ",
                        pipeline_name,
                    )
                logger.info(
                    "You can see all recurring runs under the '%s' experiment.",
                    pipeline_name,
                )
                interval_seconds = (
                    deployment.schedule.interval_second.seconds
                    if deployment.schedule.interval_second
                    else None
                )
                result = client.create_recurring_run(
                    experiment_id=experiment.experiment_id,
                    job_name=run_name,
                    pipeline_package_path=pipeline_file_path,
                    enable_caching=False,
                    cron_expression=deployment.schedule.cron_expression,
                    start_time=deployment.schedule.utc_start_time,
                    end_time=deployment.schedule.utc_end_time,
                    interval_second=interval_seconds,
                    no_catchup=not deployment.schedule.catchup,
                )
                logger.info(
                    "Started recurring run with ID '%s'.",
                    result.recurring_run_id,
                )
            else:
                logger.info(
                    "No schedule detected. Creating a one-off pipeline run.."
                )
                try:
                    result = client.create_run_from_pipeline_package(
                        pipeline_file_path,
                        arguments={},
                        run_name=run_name,
                        enable_caching=False,
                        namespace=user_namespace,
                    )
                except ApiException:
                    raise RuntimeError(
                        f"Failed to create {run_name} on kubeflow! "
                        "Please check stack component settings and "
                        "configuration!"
                    )
                logger.info(
                    "Started one-off pipeline run with ID '%s'.", result.run_id
                )
                if settings.synchronous:
                    client.wait_for_run_completion(
                        run_id=result.run_id, timeout=settings.timeout
                    )
        except urllib3.exceptions.HTTPError as error:
            if kubernetes_context:
                msg = (
                    f"Please make sure your kubernetes config is present and "
                    f"the '{kubernetes_context}' kubernetes context is "
                    "configured correctly."
                )
            elif self.connector:
                msg = (
                    f"Please check that the '{self.connector}' connector "
                    f"linked to this component is configured correctly with "
                    "valid credentials."
                )
            else:
                msg = ""
            logger.warning(
                f"Failed to upload Kubeflow pipeline: {error}. {msg}",
            )
    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.
        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.
        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_KFP_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_KFP_RUN_ID}."
            )
    def _get_session_cookie(self, username: str, password: str) -> str:
        """Gets session cookie from username and password.
        Args:
            username: Username for kubeflow host.
            password: Password for kubeflow host.
        Raises:
            RuntimeError: If the cookie fetching failed.
        Returns:
            Cookie with the prefix `authsession=`.
        """
        if self.config.kubeflow_hostname is None:
            raise RuntimeError(
                "You must configure the Kubeflow orchestrator "
                "with the `kubeflow_hostname` parameter which usually ends "
                "with `/pipeline` (e.g. `https://mykubeflow.com/pipeline`). "
                "Please update the current kubeflow orchestrator with: "
                f"`zenml orchestrator update {self.name} "
                "--kubeflow_hostname=<MY_KUBEFLOW_HOST>`"
            )
        # Get cookie
        logger.info(
            f"Attempting to fetch session cookie from {self.config.kubeflow_hostname} "
            "with supplied username and password..."
        )
        session = requests.Session()
        try:
            response = session.get(self.config.kubeflow_hostname)
            response.raise_for_status()
        except (
            requests.exceptions.HTTPError,
            requests.exceptions.ConnectionError,
            requests.exceptions.Timeout,
            requests.exceptions.RequestException,
        ) as e:
            raise RuntimeError(
                f"Error while trying to fetch kubeflow cookie: {e}"
            )
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
        }
        data = {"login": username, "password": password}
        try:
            response = session.post(response.url, headers=headers, data=data)
            response.raise_for_status()
        except requests.exceptions.HTTPError as errh:
            raise RuntimeError(
                f"Error while trying to fetch kubeflow cookie: {errh}"
            )
        cookie_dict: Dict[str, str] = session.cookies.get_dict()  # type: ignore[no-untyped-call]
        if "authservice_session" not in cookie_dict:
            raise RuntimeError("Invalid username and/or password!")
        logger.info("Session cookie fetched successfully!")
        return "authservice_session=" + str(cookie_dict["authservice_session"])
    def get_pipeline_run_metadata(
        self, run_id: UUID
    ) -> Dict[str, "MetadataType"]:
        """Get general component-specific metadata for a pipeline run.
        Args:
            run_id: The ID of the pipeline run.
        Returns:
            A dictionary of metadata.
        """
        hostname = self.config.kubeflow_hostname
        if not hostname:
            return {}
        hostname = hostname.rstrip("/")
        pipeline_suffix = "/pipeline"
        if hostname.endswith(pipeline_suffix):
            hostname = hostname[: -len(pipeline_suffix)]
        run = Client().get_pipeline_run(run_id)
        settings_key = settings_utils.get_stack_component_setting_key(self)
        run_settings = self.settings_class.model_validate(
            run.config.model_dump().get(settings_key, self.config)
        )
        user_namespace = run_settings.user_namespace
        if user_namespace:
            run_url = (
                f"{hostname}/_/pipeline/?ns={user_namespace}#"
                f"/runs/details/{self.get_orchestrator_run_id()}"
            )
            return {
                METADATA_ORCHESTRATOR_URL: Uri(run_url),
            }
        else:
            return {
                METADATA_ORCHESTRATOR_URL: Uri(f"{hostname}"),
            }
    def _configure_container_resources(
        self,
        dynamic_component: dsl.PipelineTask,
        resource_settings: "ResourceSettings",
        node_selector_constraint: Optional[Tuple[str, str]] = None,
    ) -> dsl.PipelineTask:
        """Adds resource requirements to the container.
        Args:
            dynamic_component: The dynamic component to add the resource
                settings to.
            resource_settings: The resource settings to use for this
                container.
            node_selector_constraint: Node selector constraint to apply to
                the container.
        Returns:
            The dynamic component with the resource settings applied.
        """
        # Set optional CPU, RAM and GPU constraints for the pipeline
        if resource_settings:
            cpu_limit = resource_settings.cpu_count or None
        if cpu_limit is not None:
            dynamic_component = dynamic_component.set_cpu_limit(str(cpu_limit))
        memory_limit = resource_settings.get_memory() or None
        if memory_limit is not None:
            dynamic_component = dynamic_component.set_memory_limit(
                memory_limit
            )
        gpu_limit = (
            resource_settings.gpu_count
            if resource_settings.gpu_count is not None
            else 0
        )
        if node_selector_constraint:
            (constraint_label, value) = node_selector_constraint
            if gpu_limit is not None and gpu_limit > 0:
                dynamic_component = (
                    dynamic_component.set_accelerator_type(value)
                    .set_accelerator_limit(gpu_limit)
                    .set_gpu_limit(gpu_limit)
                )
            elif constraint_label == "accelerator" and gpu_limit == 0:
                logger.warning(
                    "GPU limit is set to 0 but a GPU type is specified. Ignoring GPU settings."
                )
        return dynamic_component
config: KubeflowOrchestratorConfig
  
      property
      readonly
  
    Returns the KubeflowOrchestratorConfig config.
Returns:
| Type | Description | 
|---|---|
| KubeflowOrchestratorConfig | The configuration. | 
pipeline_directory: str
  
      property
      readonly
  
    Returns path to a directory in which the kubeflow pipeline files are stored.
Returns:
| Type | Description | 
|---|---|
| str | Path to the pipeline directory. | 
root_directory: str
  
      property
      readonly
  
    Path to the root directory for all files concerning this orchestrator.
Returns:
| Type | Description | 
|---|---|
| str | Path to the root directory. | 
settings_class: Type[zenml.integrations.kubeflow.flavors.kubeflow_orchestrator_flavor.KubeflowOrchestratorSettings]
  
      property
      readonly
  
    Settings class for the Kubeflow orchestrator.
Returns:
| Type | Description | 
|---|---|
| Type[zenml.integrations.kubeflow.flavors.kubeflow_orchestrator_flavor.KubeflowOrchestratorSettings] | The settings class. | 
validator: Optional[zenml.stack.stack_validator.StackValidator]
  
      property
      readonly
  
    Validates that the stack contains a container registry.
Also check that requirements are met for local components.
Returns:
| Type | Description | 
|---|---|
| Optional[zenml.stack.stack_validator.StackValidator] | A  | 
get_kubernetes_contexts(self)
    Get the list of configured Kubernetes contexts and the active context.
Returns:
| Type | Description | 
|---|---|
| Tuple[List[str], Optional[str]] | A tuple containing the list of configured Kubernetes contexts and the active context. | 
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
    """Get the list of configured Kubernetes contexts and the active context.
    Returns:
        A tuple containing the list of configured Kubernetes contexts and
        the active context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException:
        return [], None
    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id(self)
    Returns the active orchestrator run id.
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If the environment variable specifying the run id is not set. | 
Returns:
| Type | Description | 
|---|---|
| str | The orchestrator run id. | 
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.
    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.
    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_KFP_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_KFP_RUN_ID}."
        )
get_pipeline_run_metadata(self, run_id)
    Get general component-specific metadata for a pipeline run.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| run_id | UUID | The ID of the pipeline run. | required | 
Returns:
| Type | Description | 
|---|---|
| Dict[str, MetadataType] | A dictionary of metadata. | 
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.
    Args:
        run_id: The ID of the pipeline run.
    Returns:
        A dictionary of metadata.
    """
    hostname = self.config.kubeflow_hostname
    if not hostname:
        return {}
    hostname = hostname.rstrip("/")
    pipeline_suffix = "/pipeline"
    if hostname.endswith(pipeline_suffix):
        hostname = hostname[: -len(pipeline_suffix)]
    run = Client().get_pipeline_run(run_id)
    settings_key = settings_utils.get_stack_component_setting_key(self)
    run_settings = self.settings_class.model_validate(
        run.config.model_dump().get(settings_key, self.config)
    )
    user_namespace = run_settings.user_namespace
    if user_namespace:
        run_url = (
            f"{hostname}/_/pipeline/?ns={user_namespace}#"
            f"/runs/details/{self.get_orchestrator_run_id()}"
        )
        return {
            METADATA_ORCHESTRATOR_URL: Uri(run_url),
        }
    else:
        return {
            METADATA_ORCHESTRATOR_URL: Uri(f"{hostname}"),
        }
prepare_or_run_pipeline(self, deployment, stack, environment)
    Creates a kfp yaml file.
This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.
How it works:
Before this method is called the prepare_pipeline_deployment()
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
container_ops for each step (_construct_kfp_pipeline).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these container_ops are then also configured onto each
container_op by pointing at the downstream steps.
This callable is then compiled into a kfp yaml file that is used as the intermediary representation of the kubeflow pipeline.
This file, together with some metadata, runtime configurations is then uploaded into the kubeflow pipelines cluster for execution.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| deployment | PipelineDeploymentResponse | The pipeline deployment to prepare or run. | required | 
| stack | Stack | The stack the pipeline will run on. | required | 
| environment | Dict[str, str] | Environment variables to set in the orchestration environment. | required | 
Exceptions:
| Type | Description | 
|---|---|
| RuntimeError | If trying to run a pipeline in a notebook environment. | 
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
          def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
) -> Any:
    """Creates a kfp yaml file.
    This functions as an intermediary representation of the pipeline which
    is then deployed to the kubeflow pipelines instance.
    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()`
    method builds a docker image that contains the code for the
    pipeline, all steps the context around these files.
    Based on this docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`).
    To do this the entrypoint of the docker image is configured to
    run the correct step within the docker image. The dependencies
    between these container_ops are then also configured onto each
    container_op by pointing at the downstream steps.
    This callable is then compiled into a kfp yaml file that is used as
    the intermediary representation of the kubeflow pipeline.
    This file, together with some metadata, runtime configurations is
    then uploaded into the kubeflow pipelines cluster for execution.
    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
    Raises:
        RuntimeError: If trying to run a pipeline in a notebook
            environment.
    """
    # First check whether the code running in a notebook
    if Environment.in_notebook():
        raise RuntimeError(
            "The Kubeflow orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )
    assert stack.container_registry
    # Create a callable for future compilation into a dsl.Pipeline.
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    ).replace("_", "-")
    def _create_dynamic_pipeline() -> Any:
        """Create a dynamic pipeline including each step.
        Returns:
            pipeline_func
        """
        step_name_to_dynamic_component: Dict[str, Any] = {}
        node_selector_constraint: Optional[Tuple[str, str]] = None
        for step_name, step in deployment.step_configurations.items():
            image = self.get_image(
                deployment=deployment,
                step_name=step_name,
            )
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment.id,
                )
            )
            dynamic_component = self._create_dynamic_component(
                image, command, arguments, step_name
            )
            step_settings = cast(
                KubeflowOrchestratorSettings, self.get_settings(step)
            )
            pod_settings = step_settings.pod_settings
            if pod_settings:
                if pod_settings.host_ipc:
                    logger.warning(
                        "Host IPC is set to `True` but not supported in "
                        "this orchestrator. Ignoring..."
                    )
                if pod_settings.affinity:
                    logger.warning(
                        "Affinity is set but not supported in Kubeflow with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.tolerations:
                    logger.warning(
                        "Tolerations are set but not supported in "
                        "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volumes:
                    logger.warning(
                        "Volumes are set but not supported in Kubeflow with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volume_mounts:
                    logger.warning(
                        "Volume mounts are set but not supported in "
                        "Kubeflow with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                # apply pod settings
                if (
                    KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                    in pod_settings.node_selectors.keys()
                ):
                    node_selector_constraint = (
                        KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                        pod_settings.node_selectors[
                            KFP_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                        ],
                    )
            step_name_to_dynamic_component[step_name] = dynamic_component
        @dsl.pipeline(  # type: ignore[misc]
            display_name=orchestrator_run_name,
        )
        def dynamic_pipeline() -> None:
            """Dynamic pipeline."""
            # iterate through the components one by one
            # (from step_name_to_dynamic_component)
            for (
                component_name,
                component,
            ) in step_name_to_dynamic_component.items():
                # for each component, check to see what other steps are
                # upstream of it
                step = deployment.step_configurations[component_name]
                upstream_step_components = [
                    step_name_to_dynamic_component[upstream_step_name]
                    for upstream_step_name in step.spec.upstream_steps
                ]
                task = (
                    component()
                    .set_display_name(
                        name=component_name,
                    )
                    .set_caching_options(enable_caching=False)
                    .set_env_variable(
                        name=ENV_KFP_RUN_ID,
                        value=dsl.PIPELINE_JOB_NAME_PLACEHOLDER,
                    )
                    .after(*upstream_step_components)
                )
                self._configure_container_resources(
                    task,
                    step.config.resource_settings,
                    node_selector_constraint,
                )
        return dynamic_pipeline
    def _update_yaml_with_environment(
        yaml_file_path: str, environment: Dict[str, str]
    ) -> None:
        """Updates the env section of the steps in the YAML file with the given environment variables.
        Args:
            yaml_file_path: The path to the YAML file to update.
            environment: A dictionary of environment variables to add.
        """
        pipeline_definition = yaml_utils.read_yaml(pipeline_file_path)
        # Iterate through each component and add the environment variables
        for executor in pipeline_definition["deploymentSpec"]["executors"]:
            if (
                "container"
                in pipeline_definition["deploymentSpec"]["executors"][
                    executor
                ]
            ):
                container = pipeline_definition["deploymentSpec"][
                    "executors"
                ][executor]["container"]
                if "env" not in container:
                    container["env"] = []
                for key, value in environment.items():
                    container["env"].append({"name": key, "value": value})
        yaml_utils.write_yaml(pipeline_file_path, pipeline_definition)
        print(
            f"Updated YAML file with environment variables at {yaml_file_path}"
        )
    # Get a filepath to use to save the finished yaml to
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory, f"{orchestrator_run_name}.yaml"
    )
    # write the argo pipeline yaml
    Compiler().compile(
        pipeline_func=_create_dynamic_pipeline(),
        package_path=pipeline_file_path,
        pipeline_name=orchestrator_run_name,
    )
    # Let's update the YAML file with the environment variables
    _update_yaml_with_environment(pipeline_file_path, environment)
    logger.info(
        "Writing Kubeflow workflow definition to `%s`.", pipeline_file_path
    )
    # using the kfp client uploads the pipeline to kubeflow pipelines and
    # runs it there
    self._upload_and_run_pipeline(
        deployment=deployment,
        pipeline_file_path=pipeline_file_path,
        run_name=orchestrator_run_name,
    )
        local_deployment_utils
    Utils for the local Kubeflow deployment behaviors.
add_hostpath_to_kubeflow_pipelines(kubernetes_context, local_path)
    Patches the Kubeflow Pipelines deployment to mount a local folder.
This folder serves as a hostpath for visualization purposes.
This function reconfigures the Kubeflow pipelines deployment to use a shared local folder to support loading the TensorBoard viewer and other pipeline visualization results from a local artifact store, as described here:
https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| kubernetes_context | str | The kubernetes context on which Kubeflow Pipelines should be patched. | required | 
| local_path | str | The path to the local folder to mount as a hostpath. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def add_hostpath_to_kubeflow_pipelines(
    kubernetes_context: str, local_path: str
) -> None:
    """Patches the Kubeflow Pipelines deployment to mount a local folder.
    This folder serves as a hostpath for visualization purposes.
    This function reconfigures the Kubeflow pipelines deployment to use a
    shared local folder to support loading the TensorBoard viewer and other
    pipeline visualization results from a local artifact store, as described
    here:
    https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md
    Args:
        kubernetes_context: The kubernetes context on which Kubeflow Pipelines
            should be patched.
        local_path: The path to the local folder to mount as a hostpath.
    """
    logger.info("Patching Kubeflow Pipelines to mount a local folder.")
    pod_template = {
        "spec": {
            "serviceAccountName": "kubeflow-pipelines-viewer",
            "containers": [
                {
                    "volumeMounts": [
                        {
                            "mountPath": local_path,
                            "name": "local-artifact-store",
                        }
                    ]
                }
            ],
            "volumes": [
                {
                    "hostPath": {
                        "path": local_path,
                        "type": "Directory",
                    },
                    "name": "local-artifact-store",
                }
            ],
        }
    }
    pod_template_json = json.dumps(pod_template, indent=2)
    config_map_data = {"data": {"viewer-pod-template.json": pod_template_json}}
    config_map_data_json = json.dumps(config_map_data, indent=2)
    logger.debug(
        "Adding host path volume for local path `%s` to kubeflow pipeline"
        "viewer pod template configuration.",
        local_path,
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "-n",
            "kubeflow",
            "patch",
            "configmap/ml-pipeline-ui-configmap",
            "--type",
            "merge",
            "-p",
            config_map_data_json,
        ]
    )
    deployment_patch = {
        "spec": {
            "template": {
                "spec": {
                    "containers": [
                        {
                            "name": "ml-pipeline-ui",
                            "volumeMounts": [
                                {
                                    "mountPath": local_path,
                                    "name": "local-artifact-store",
                                }
                            ],
                        }
                    ],
                    "volumes": [
                        {
                            "hostPath": {
                                "path": local_path,
                                "type": "Directory",
                            },
                            "name": "local-artifact-store",
                        }
                    ],
                }
            }
        }
    }
    deployment_patch_json = json.dumps(deployment_patch, indent=2)
    logger.debug(
        "Adding host path volume for local path `%s` to the kubeflow UI",
        local_path,
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "-n",
            "kubeflow",
            "patch",
            "deployment/ml-pipeline-ui",
            "--type",
            "strategic",
            "-p",
            deployment_patch_json,
        ]
    )
    wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context)
    logger.info("Finished patching Kubeflow Pipelines setup.")
check_prerequisites(skip_k3d=False, skip_kubectl=False)
    Checks prerequisites for a local kubeflow pipelines deployment.
It makes sure they are installed.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| skip_k3d | bool | Whether to skip the check for the k3d command. | False | 
| skip_kubectl | bool | Whether to skip the check for the kubectl command. | False | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether all prerequisites are installed. | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def check_prerequisites(
    skip_k3d: bool = False, skip_kubectl: bool = False
) -> bool:
    """Checks prerequisites for a local kubeflow pipelines deployment.
    It makes sure they are installed.
    Args:
        skip_k3d: Whether to skip the check for the k3d command.
        skip_kubectl: Whether to skip the check for the kubectl command.
    Returns:
        Whether all prerequisites are installed.
    """
    k3d_installed = skip_k3d or shutil.which("k3d") is not None
    kubectl_installed = skip_kubectl or shutil.which("kubectl") is not None
    logger.debug(
        "Local kubeflow deployment prerequisites: K3D - %s, Kubectl - %s",
        k3d_installed,
        kubectl_installed,
    )
    return k3d_installed and kubectl_installed
create_k3d_cluster(cluster_name, registry_name, registry_config_path)
    Creates a K3D cluster.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cluster_name | str | Name of the cluster to create. | required | 
| registry_name | str | Name of the registry to create for this cluster. | required | 
| registry_config_path | str | Path to the registry config file. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def create_k3d_cluster(
    cluster_name: str, registry_name: str, registry_config_path: str
) -> None:
    """Creates a K3D cluster.
    Args:
        cluster_name: Name of the cluster to create.
        registry_name: Name of the registry to create for this cluster.
        registry_config_path: Path to the registry config file.
    """
    logger.info("Creating local K3D cluster '%s'.", cluster_name)
    local_stores_path = GlobalConfiguration().local_stores_path
    subprocess.check_call(
        [
            "k3d",
            "cluster",
            "create",
            cluster_name,
            "--image",
            K3S_IMAGE_NAME,
            "--registry-create",
            registry_name,
            "--registry-config",
            registry_config_path,
            "--volume",
            f"{local_stores_path}:{local_stores_path}",
        ]
    )
    logger.info("Finished K3D cluster creation.")
delete_k3d_cluster(cluster_name)
    Deletes a K3D cluster with the given name.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cluster_name | str | Name of the cluster to delete. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def delete_k3d_cluster(cluster_name: str) -> None:
    """Deletes a K3D cluster with the given name.
    Args:
        cluster_name: Name of the cluster to delete.
    """
    subprocess.check_call(["k3d", "cluster", "delete", cluster_name])
    logger.info("Deleted local k3d cluster '%s'.", cluster_name)
deploy_kubeflow_pipelines(kubernetes_context)
    Deploys Kubeflow Pipelines.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| kubernetes_context | str | The kubernetes context on which Kubeflow Pipelines should be deployed. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def deploy_kubeflow_pipelines(kubernetes_context: str) -> None:
    """Deploys Kubeflow Pipelines.
    Args:
        kubernetes_context: The kubernetes context on which Kubeflow Pipelines
            should be deployed.
    """
    logger.info("Deploying Kubeflow Pipelines.")
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "apply",
            "-k",
            f"github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref={KFP_VERSION}&timeout=5m",
        ]
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "wait",
            "--timeout=60s",
            "--for",
            "condition=established",
            "crd/applications.app.k8s.io",
        ]
    )
    subprocess.check_call(
        [
            "kubectl",
            "--context",
            kubernetes_context,
            "apply",
            "-k",
            f"github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref={KFP_VERSION}&timeout=5m",
        ]
    )
    wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context)
    logger.info("Finished Kubeflow Pipelines setup.")
k3d_cluster_exists(cluster_name)
    Checks whether there exists a K3D cluster with the given name.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cluster_name | str | Name of the cluster to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the cluster exists. | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def k3d_cluster_exists(cluster_name: str) -> bool:
    """Checks whether there exists a K3D cluster with the given name.
    Args:
        cluster_name: Name of the cluster to check.
    Returns:
        Whether the cluster exists.
    """
    output = subprocess.check_output(
        ["k3d", "cluster", "list", "--output", "json"]
    )
    clusters = json.loads(output)
    for cluster in clusters:
        if cluster["name"] == cluster_name:
            return True
    return False
k3d_cluster_running(cluster_name)
    Checks whether the K3D cluster with the given name is running.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cluster_name | str | Name of the cluster to check. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether the cluster is running. | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def k3d_cluster_running(cluster_name: str) -> bool:
    """Checks whether the K3D cluster with the given name is running.
    Args:
        cluster_name: Name of the cluster to check.
    Returns:
        Whether the cluster is running.
    """
    output = subprocess.check_output(
        ["k3d", "cluster", "list", "--output", "json"]
    )
    clusters = json.loads(output)
    for cluster in clusters:
        if cluster["name"] == cluster_name:
            server_count: int = cluster["serversCount"]
            servers_running: int = cluster["serversRunning"]
            return servers_running == server_count
    return False
kubeflow_pipelines_ready(kubernetes_context)
    Returns whether all Kubeflow Pipelines pods are ready.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| kubernetes_context | str | The kubernetes context in which the pods should be checked. | required | 
Returns:
| Type | Description | 
|---|---|
| bool | Whether all Kubeflow Pipelines pods are ready. | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def kubeflow_pipelines_ready(kubernetes_context: str) -> bool:
    """Returns whether all Kubeflow Pipelines pods are ready.
    Args:
        kubernetes_context: The kubernetes context in which the pods
            should be checked.
    Returns:
        Whether all Kubeflow Pipelines pods are ready.
    """
    try:
        subprocess.check_call(
            [
                "kubectl",
                "--context",
                kubernetes_context,
                "--namespace",
                "kubeflow",
                "wait",
                "--for",
                "condition=ready",
                "--timeout=0s",
                "pods",
                "-l",
                "application-crd-id=kubeflow-pipelines",
            ],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
        return True
    except subprocess.CalledProcessError:
        return False
start_k3d_cluster(cluster_name)
    Starts a K3D cluster with the given name.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cluster_name | str | Name of the cluster to start. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def start_k3d_cluster(cluster_name: str) -> None:
    """Starts a K3D cluster with the given name.
    Args:
        cluster_name: Name of the cluster to start.
    """
    subprocess.check_call(["k3d", "cluster", "start", cluster_name])
    logger.info("Started local k3d cluster '%s'.", cluster_name)
start_kfp_ui_daemon(pid_file_path, log_file_path, port, kubernetes_context)
    Starts a daemon process that forwards ports.
This is so the Kubeflow Pipelines UI is accessible in the browser.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pid_file_path | str | Path where the file with the daemons process ID should be written. | required | 
| log_file_path | str | Path to a file where the daemon logs should be written. | required | 
| port | int | Port on which the UI should be accessible. | required | 
| kubernetes_context | str | The kubernetes context for the cluster where Kubeflow Pipelines is running. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def start_kfp_ui_daemon(
    pid_file_path: str,
    log_file_path: str,
    port: int,
    kubernetes_context: str,
) -> None:
    """Starts a daemon process that forwards ports.
    This is so the Kubeflow Pipelines UI is accessible in the browser.
    Args:
        pid_file_path: Path where the file with the daemons process ID should
            be written.
        log_file_path: Path to a file where the daemon logs should be written.
        port: Port on which the UI should be accessible.
        kubernetes_context: The kubernetes context for the cluster where
            Kubeflow Pipelines is running.
    """
    command = [
        "kubectl",
        "--context",
        kubernetes_context,
        "--namespace",
        "kubeflow",
        "port-forward",
        "svc/ml-pipeline-ui",
        f"{port}:80",
    ]
    if not networking_utils.port_available(port):
        modified_command = command.copy()
        modified_command[-1] = "PORT:80"
        logger.warning(
            "Unable to port-forward Kubeflow Pipelines UI to local port %d "
            "because the port is occupied. In order to access the Kubeflow "
            "Pipelines UI at http://localhost:PORT/, please run '%s' in a "
            "separate command line shell (replace PORT with a free port of "
            "your choice).",
            port,
            " ".join(modified_command),
        )
    elif sys.platform == "win32":
        logger.warning(
            "Daemon functionality not supported on Windows. "
            "In order to access the Kubeflow Pipelines UI at "
            "http://localhost:%d/, please run '%s' in a separate command "
            "line shell.",
            port,
            " ".join(command),
        )
    else:
        from zenml.utils import daemon
        def _daemon_function() -> None:
            """Port-forwards the Kubeflow Pipelines UI pod."""
            subprocess.check_call(command)
        daemon.run_as_daemon(
            _daemon_function, pid_file=pid_file_path, log_file=log_file_path
        )
        logger.info(
            "Started Kubeflow Pipelines UI daemon (check the daemon logs at %s "
            "in case you're not able to view the UI). The Kubeflow Pipelines "
            "UI should now be accessible at http://localhost:%d/.",
            log_file_path,
            port,
        )
stop_k3d_cluster(cluster_name)
    Stops a K3D cluster with the given name.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cluster_name | str | Name of the cluster to stop. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def stop_k3d_cluster(cluster_name: str) -> None:
    """Stops a K3D cluster with the given name.
    Args:
        cluster_name: Name of the cluster to stop.
    """
    subprocess.check_call(["k3d", "cluster", "stop", cluster_name])
    logger.info("Stopped local k3d cluster '%s'.", cluster_name)
stop_kfp_ui_daemon(pid_file_path)
    Stops the KFP UI daemon process if it is running.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| pid_file_path | str | Path to the file with the daemons process ID. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def stop_kfp_ui_daemon(pid_file_path: str) -> None:
    """Stops the KFP UI daemon process if it is running.
    Args:
        pid_file_path: Path to the file with the daemons process ID.
    """
    if fileio.exists(pid_file_path):
        if sys.platform == "win32":
            # Daemon functionality is not supported on Windows, so the PID
            # file won't exist. This if clause exists just for mypy to not
            # complain about missing functions
            pass
        else:
            from zenml.utils import daemon
            daemon.stop_daemon(pid_file_path)
            fileio.remove(pid_file_path)
            logger.info("Stopped Kubeflow Pipelines UI daemon.")
wait_until_kubeflow_pipelines_ready(kubernetes_context)
    Waits until all Kubeflow Pipelines pods are ready.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| kubernetes_context | str | The kubernetes context in which the pods should be checked. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def wait_until_kubeflow_pipelines_ready(kubernetes_context: str) -> None:
    """Waits until all Kubeflow Pipelines pods are ready.
    Args:
        kubernetes_context: The kubernetes context in which the pods
            should be checked.
    """
    logger.info(
        "Waiting for all Kubeflow Pipelines pods to be ready (this might "
        "take a few minutes)."
    )
    while True:
        logger.info("Current pod status:")
        subprocess.check_call(
            [
                "kubectl",
                "--context",
                kubernetes_context,
                "--namespace",
                "kubeflow",
                "get",
                "pods",
            ]
        )
        if kubeflow_pipelines_ready(kubernetes_context=kubernetes_context):
            break
        logger.info(
            "One or more pods not ready yet, waiting for 30 seconds..."
        )
        time.sleep(30)
write_local_registry_yaml(yaml_path, registry_name, registry_uri)
    Writes a K3D registry config file.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| yaml_path | str | Path where the config file should be written to. | required | 
| registry_name | str | Name of the registry. | required | 
| registry_uri | str | URI of the registry. | required | 
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
          def write_local_registry_yaml(
    yaml_path: str, registry_name: str, registry_uri: str
) -> None:
    """Writes a K3D registry config file.
    Args:
        yaml_path: Path where the config file should be written to.
        registry_name: Name of the registry.
        registry_uri: URI of the registry.
    """
    yaml_content = {
        "mirrors": {registry_uri: {"endpoint": [f"http://{registry_name}"]}}
    }
    yaml_utils.write_yaml(yaml_path, yaml_content)