Kubeflow
zenml.integrations.kubeflow
special
Initialization of the Kubeflow integration for ZenML.
The Kubeflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubeflow orchestrator with the CLI tool.
KubeflowIntegration (Integration)
Definition of Kubeflow Integration for ZenML.
Source code in zenml/integrations/kubeflow/__init__.py
class KubeflowIntegration(Integration):
"""Definition of Kubeflow Integration for ZenML."""
NAME = KUBEFLOW
REQUIREMENTS = ["kfp==1.8.13"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kubeflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.kubeflow.flavors import (
KubeflowOrchestratorFlavor,
)
return [KubeflowOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Kubeflow integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/kubeflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kubeflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.kubeflow.flavors import (
KubeflowOrchestratorFlavor,
)
return [KubeflowOrchestratorFlavor]
flavors
special
Kubeflow integration flavors.
kubeflow_orchestrator_flavor
Kubeflow orchestrator flavor.
KubeflowOrchestratorConfig (BaseOrchestratorConfig)
pydantic-model
Configuration for the Kubeflow orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
kubeflow_pipelines_ui_port |
int |
A local port to which the KFP UI will be forwarded. |
kubeflow_hostname |
Optional[str] |
The hostname to use to talk to the Kubeflow Pipelines API. If not set, the hostname will be derived from the Kubernetes API proxy. |
kubeflow_namespace |
str |
The Kubernetes namespace in which Kubeflow
Pipelines is deployed. Defaults to |
kubernetes_context |
Optional[str] |
Optional name of a kubernetes context to run pipelines in. If not set, will try to spin up a local K3d cluster. |
synchronous |
bool |
If |
skip_local_validations |
bool |
If |
skip_cluster_provisioning |
bool |
If |
skip_ui_daemon_provisioning |
bool |
If |
Source code in zenml/integrations/kubeflow/flavors/kubeflow_orchestrator_flavor.py
class KubeflowOrchestratorConfig(BaseOrchestratorConfig):
"""Configuration for the Kubeflow orchestrator.
Attributes:
kubeflow_pipelines_ui_port: A local port to which the KFP UI will be
forwarded.
kubeflow_hostname: The hostname to use to talk to the Kubeflow Pipelines
API. If not set, the hostname will be derived from the Kubernetes
API proxy.
kubeflow_namespace: The Kubernetes namespace in which Kubeflow
Pipelines is deployed. Defaults to `kubeflow`.
kubernetes_context: Optional name of a kubernetes context to run
pipelines in. If not set, will try to spin up a local K3d cluster.
synchronous: If `True`, running a pipeline using this orchestrator will
block until all steps finished running on KFP.
skip_local_validations: If `True`, the local validations will be
skipped.
skip_cluster_provisioning: If `True`, the k3d cluster provisioning will
be skipped.
skip_ui_daemon_provisioning: If `True`, provisioning the KFP UI daemon
will be skipped.
"""
kubeflow_pipelines_ui_port: int = DEFAULT_KFP_UI_PORT
kubeflow_hostname: Optional[str] = None
kubeflow_namespace: str = "kubeflow"
kubernetes_context: Optional[str] = None
synchronous: bool = False
skip_local_validations: bool = False
skip_cluster_provisioning: bool = False
skip_ui_daemon_provisioning: bool = False
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
if (
self.kubernetes_context is not None
and not self.kubernetes_context.startswith("k3d-zenml-kubeflow-")
):
return True
return False
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
if (
self.kubernetes_context is None
or self.kubernetes_context.startswith("k3d-zenml-kubeflow-")
):
return True
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
KubeflowOrchestratorFlavor (BaseOrchestratorFlavor)
Kubeflow orchestrator flavor.
Source code in zenml/integrations/kubeflow/flavors/kubeflow_orchestrator_flavor.py
class KubeflowOrchestratorFlavor(BaseOrchestratorFlavor):
"""Kubeflow orchestrator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return KUBEFLOW_ORCHESTRATOR_FLAVOR
@property
def config_class(self) -> Type[KubeflowOrchestratorConfig]:
"""Returns `KubeflowOrchestratorConfig` config class.
Returns:
The config class.
"""
return KubeflowOrchestratorConfig
@property
def implementation_class(self) -> Type["KubeflowOrchestrator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kubeflow.orchestrators import (
KubeflowOrchestrator,
)
return KubeflowOrchestrator
config_class: Type[zenml.integrations.kubeflow.flavors.kubeflow_orchestrator_flavor.KubeflowOrchestratorConfig]
property
readonly
Returns KubeflowOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kubeflow.flavors.kubeflow_orchestrator_flavor.KubeflowOrchestratorConfig] |
The config class. |
implementation_class: Type[KubeflowOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KubeflowOrchestrator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
KubeflowOrchestratorSettings (BaseSettings)
pydantic-model
Settings for the Kubeflow orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
client_args |
Dict[str, Any] |
Arguments to pass when initializing the KFP client. |
user_namespace |
Optional[str] |
The user namespace to use when creating experiments and runs. |
node_selectors |
Dict[str, str] |
Node selectors to apply to KFP pods. |
node_affinity |
Dict[str, List[str]] |
Node affinities to apply to KFP pods. |
Source code in zenml/integrations/kubeflow/flavors/kubeflow_orchestrator_flavor.py
class KubeflowOrchestratorSettings(BaseSettings):
"""Settings for the Kubeflow orchestrator.
Attributes:
client_args: Arguments to pass when initializing the KFP client.
user_namespace: The user namespace to use when creating experiments
and runs.
node_selectors: Node selectors to apply to KFP pods.
node_affinity: Node affinities to apply to KFP pods.
"""
client_args: Dict[str, Any] = {}
user_namespace: Optional[str] = None
node_selectors: Dict[str, str] = {}
node_affinity: Dict[str, List[str]] = {}
orchestrators
special
Initialization of the Kubeflow ZenML orchestrator.
kubeflow_entrypoint_configuration
Implementation of the Kubeflow entrypoint configuration.
KubeflowEntrypointConfiguration (StepEntrypointConfiguration)
Entrypoint configuration for running steps on kubeflow.
This class writes a markdown file that will be displayed in the KFP UI.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_entrypoint_configuration.py
class KubeflowEntrypointConfiguration(StepEntrypointConfiguration):
"""Entrypoint configuration for running steps on kubeflow.
This class writes a markdown file that will be displayed in the KFP UI.
"""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
The metadata ui path option expects a path where the markdown file
that will be displayed in the kubeflow UI should be written. The same
path needs to be added as an output artifact called
`mlpipeline-ui-metadata` for the corresponding `kfp.dsl.ContainerOp`.
Returns:
The superclass options as well as an option for the metadata ui
path.
"""
return super().get_entrypoint_options() | {METADATA_UI_PATH_OPTION}
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, must include the metadata ui path.
Returns:
The superclass arguments as well as arguments for the metadata ui
path.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{METADATA_UI_PATH_OPTION}",
kwargs[METADATA_UI_PATH_OPTION],
]
def get_run_name(self, pipeline_name: str) -> Optional[str]:
"""Returns the Kubeflow pipeline run name.
Args:
pipeline_name: The name of the pipeline.
Returns:
The Kubeflow pipeline run name.
Raises:
RuntimeError: If the run name environment variable is not set.
"""
try:
return os.environ[ENV_ZENML_RUN_NAME]
except KeyError:
raise RuntimeError(
"Unable to read run name from environment variable "
f"{ENV_ZENML_RUN_NAME}."
)
def post_run(
self,
pipeline_name: str,
step_name: str,
execution_info: Optional[data_types.ExecutionInfo] = None,
) -> None:
"""Writes a markdown file that will display information.
This will be about the step execution and input/output artifacts in the
KFP UI.
Args:
pipeline_name: The name of the pipeline.
step_name: The name of the step.
execution_info: The execution info of the step.
"""
if execution_info:
utils.dump_ui_metadata(
execution_info=execution_info,
metadata_ui_path=self.entrypoint_args[METADATA_UI_PATH_OPTION],
)
get_entrypoint_arguments(**kwargs)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Kwargs, must include the metadata ui path. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
The superclass arguments as well as arguments for the metadata ui path. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, must include the metadata ui path.
Returns:
The superclass arguments as well as arguments for the metadata ui
path.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{METADATA_UI_PATH_OPTION}",
kwargs[METADATA_UI_PATH_OPTION],
]
get_entrypoint_options()
classmethod
Gets all options required for running with this configuration.
The metadata ui path option expects a path where the markdown file
that will be displayed in the kubeflow UI should be written. The same
path needs to be added as an output artifact called
mlpipeline-ui-metadata
for the corresponding kfp.dsl.ContainerOp
.
Returns:
Type | Description |
---|---|
Set[str] |
The superclass options as well as an option for the metadata ui path. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
The metadata ui path option expects a path where the markdown file
that will be displayed in the kubeflow UI should be written. The same
path needs to be added as an output artifact called
`mlpipeline-ui-metadata` for the corresponding `kfp.dsl.ContainerOp`.
Returns:
The superclass options as well as an option for the metadata ui
path.
"""
return super().get_entrypoint_options() | {METADATA_UI_PATH_OPTION}
get_run_name(self, pipeline_name)
Returns the Kubeflow pipeline run name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
The name of the pipeline. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
The Kubeflow pipeline run name. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the run name environment variable is not set. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_entrypoint_configuration.py
def get_run_name(self, pipeline_name: str) -> Optional[str]:
"""Returns the Kubeflow pipeline run name.
Args:
pipeline_name: The name of the pipeline.
Returns:
The Kubeflow pipeline run name.
Raises:
RuntimeError: If the run name environment variable is not set.
"""
try:
return os.environ[ENV_ZENML_RUN_NAME]
except KeyError:
raise RuntimeError(
"Unable to read run name from environment variable "
f"{ENV_ZENML_RUN_NAME}."
)
post_run(self, pipeline_name, step_name, execution_info=None)
Writes a markdown file that will display information.
This will be about the step execution and input/output artifacts in the KFP UI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
The name of the pipeline. |
required |
step_name |
str |
The name of the step. |
required |
execution_info |
Optional[tfx.orchestration.portable.data_types.ExecutionInfo] |
The execution info of the step. |
None |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_entrypoint_configuration.py
def post_run(
self,
pipeline_name: str,
step_name: str,
execution_info: Optional[data_types.ExecutionInfo] = None,
) -> None:
"""Writes a markdown file that will display information.
This will be about the step execution and input/output artifacts in the
KFP UI.
Args:
pipeline_name: The name of the pipeline.
step_name: The name of the step.
execution_info: The execution info of the step.
"""
if execution_info:
utils.dump_ui_metadata(
execution_info=execution_info,
metadata_ui_path=self.entrypoint_args[METADATA_UI_PATH_OPTION],
)
kubeflow_orchestrator
Implementation of the Kubeflow orchestrator.
KubeflowOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines using Kubeflow.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
class KubeflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Kubeflow."""
@property
def config(self) -> KubeflowOrchestratorConfig:
"""Returns the `KubeflowOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(KubeflowOrchestratorConfig, self._config)
@staticmethod
def _get_k3d_cluster_name(uuid: UUID) -> str:
"""Returns the k3d cluster name corresponding to the orchestrator UUID.
Args:
uuid: The UUID of the orchestrator.
Returns:
The k3d cluster name.
"""
# k3d only allows cluster names with up to 32 characters; use the
# first 8 chars of the orchestrator UUID as identifier
return f"zenml-kubeflow-{str(uuid)[:8]}"
@staticmethod
def _get_k3d_kubernetes_context(uuid: UUID) -> str:
"""Gets the k3d kubernetes context.
Args:
uuid: The UUID of the orchestrator.
Returns:
The name of the kubernetes context associated with the k3d
cluster managed locally by ZenML corresponding to the orchestrator UUID.
"""
return f"k3d-{KubeflowOrchestrator._get_k3d_cluster_name(uuid)}"
@property
def kubernetes_context(self) -> str:
"""Gets the kubernetes context associated with the orchestrator.
This sets the default `kubernetes_context` value to the value that is
used to create the locally managed k3d cluster, if not explicitly set.
Returns:
The kubernetes context associated with the orchestrator.
"""
if self.config.kubernetes_context:
return self.config.kubernetes_context
return self._get_k3d_kubernetes_context(self.id)
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
"""Get the list of configured Kubernetes contexts and the active context.
Returns:
A tuple containing the list of configured Kubernetes contexts and
the active context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException:
return [], None
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Kubeflow orchestrator.
Returns:
The settings class.
"""
return KubeflowOrchestratorSettings
@property
def validator(self) -> Optional[StackValidator]:
"""Validates that the stack contains a container registry.
Also check that requirements are met for local components.
Returns:
A `StackValidator` instance.
"""
def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]:
container_registry = stack.container_registry
# should not happen, because the stack validation takes care of
# this, but just in case
assert container_registry is not None
contexts, active_context = self.get_kubernetes_contexts()
if self.kubernetes_context not in contexts:
if not self.is_local:
return False, (
f"Could not find a Kubernetes context named "
f"'{self.kubernetes_context}' in the local Kubernetes "
f"configuration. Please make sure that the Kubernetes "
f"cluster is running and that the kubeconfig file is "
f"configured correctly. To list all configured "
f"contexts, run:\n\n"
f" `kubectl config get-contexts`\n"
)
elif active_context and self.kubernetes_context != active_context:
logger.warning(
f"The Kubernetes context '{self.kubernetes_context}' "
f"configured for the Kubeflow orchestrator is not the "
f"same as the active context in the local Kubernetes "
f"configuration. If this is not deliberate, you should "
f"update the orchestrator's `kubernetes_context` field by "
f"running:\n\n"
f" `zenml orchestrator update {self.name} "
f"--kubernetes_context={active_context}`\n"
f"To list all configured contexts, run:\n\n"
f" `kubectl config get-contexts`\n"
f"To set the active context to be the same as the one "
f"configured in the Kubeflow orchestrator and silence "
f"this warning, run:\n\n"
f" `kubectl config use-context "
f"{self.kubernetes_context}`\n"
)
silence_local_validations_msg = (
f"To silence this warning, set the "
f"`skip_local_validations` attribute to True in the "
f"orchestrator configuration by running:\n\n"
f" 'zenml orchestrator update {self.name} "
f"--skip_local_validations=True'\n"
)
if not self.config.skip_local_validations and not self.is_local:
# if the orchestrator is not running in a local k3d cluster,
# we cannot have any other local components in our stack,
# because we cannot mount the local path into the container.
# This may result in problems when running the pipeline, because
# the local components will not be available inside the
# Kubeflow containers.
# go through all stack components and identify those that
# advertise a local path where they persist information that
# they need to be available when running pipelines.
for stack_comp in stack.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
return False, (
f"The Kubeflow orchestrator is configured to run "
f"pipelines in a remote Kubernetes cluster designated "
f"by the '{self.kubernetes_context}' configuration "
f"context, but the '{stack_comp.name}' "
f"{stack_comp.type.value} is a local stack component "
f"and will not be available in the Kubeflow pipeline "
f"step.\nPlease ensure that you always use non-local "
f"stack components with a remote Kubeflow orchestrator, "
f"otherwise you may run into pipeline execution "
f"problems. You should use a flavor of "
f"{stack_comp.type.value} other than "
f"'{stack_comp.flavor}'.\n"
+ silence_local_validations_msg
)
# if the orchestrator is remote, the container registry must
# also be remote.
if container_registry.config.is_local:
return False, (
f"The Kubeflow orchestrator is configured to run "
f"pipelines in a remote Kubernetes cluster designated "
f"by the '{self.kubernetes_context}' configuration "
f"context, but the '{container_registry.name}' "
f"container registry URI "
f"'{container_registry.config.uri}' "
f"points to a local container registry. Please ensure "
f"that you always use non-local stack components with "
f"a remote Kubeflow orchestrator, otherwise you will "
f"run into problems. You should use a flavor of "
f"container registry other than "
f"'{container_registry.flavor}'.\n"
+ silence_local_validations_msg
)
if not self.config.skip_local_validations and self.is_local:
# if the orchestrator is local, the container registry must
# also be local.
if not container_registry.config.is_local:
return False, (
f"The Kubeflow orchestrator is configured to run "
f"pipelines in a local k3d Kubernetes cluster "
f"designated by the '{self.kubernetes_context}' "
f"configuration context, but the container registry "
f"URI '{container_registry.config.uri}' doesn't "
f"match the expected format 'localhost:$PORT'. "
f"The local Kubeflow orchestrator only works with a "
f"local container registry because it cannot "
f"currently authenticate to external container "
f"registries. You should use a flavor of container "
f"registry other than '{container_registry.flavor}'.\n"
+ silence_local_validations_msg
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_local_requirements,
)
@property
def is_local(self) -> bool:
"""Checks if the KFP orchestrator is running locally.
Returns:
`True` if the KFP orchestrator is running locally (i.e. in
the local k3d cluster managed by ZenML).
"""
return self.kubernetes_context == self._get_k3d_kubernetes_context(
self.id
)
@property
def root_directory(self) -> str:
"""Path to the root directory for all files concerning this orchestrator.
Returns:
Path to the root directory.
"""
return os.path.join(
io_utils.get_global_config_directory(),
"kubeflow",
str(self.id),
)
@property
def pipeline_directory(self) -> str:
"""Returns path to a directory in which the kubeflow pipeline files are stored.
Returns:
Path to the pipeline directory.
"""
return os.path.join(self.root_directory, "pipelines")
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
def _configure_container_op(
self,
container_op: dsl.ContainerOp,
is_scheduled_run: bool,
settings: Optional[KubeflowOrchestratorSettings] = None,
) -> None:
"""Makes changes in place to the configuration of the container op.
Configures persistent mounted volumes for each stack component that
writes to a local path. Adds some labels to the container_op and applies
some functions to ir.
Args:
container_op: The kubeflow container operation to configure.
is_scheduled_run: Whether the pipeline is scheduled or a single run.
settings: Optional orchestrator settings for this step.
"""
# Path to a metadata file that will be displayed in the KFP UI
# This metadata file needs to be in a mounted emptyDir to avoid
# sporadic failures with the (not mature) PNS executor
# See these links for more information about limitations of PNS +
# security context:
# https://www.kubeflow.org/docs/components/pipelines/installation/localcluster-deployment/#deploying-kubeflow-pipelines
# https://argoproj.github.io/argo-workflows/empty-dir/
# KFP will switch to the Emissary executor (soon), when this emptyDir
# mount will not be necessary anymore, but for now it's still in alpha
# status (https://www.kubeflow.org/docs/components/pipelines/installation/choose-executor/#emissary-executor)
volumes: Dict[str, k8s_client.V1Volume] = {
"/outputs": k8s_client.V1Volume(
name="outputs", empty_dir=k8s_client.V1EmptyDirVolumeSource()
),
}
stack = Client().active_stack
if self.is_local:
stack.check_local_paths()
local_stores_path = GlobalConfiguration().local_stores_path
host_path = k8s_client.V1HostPathVolumeSource(
path=local_stores_path, type="Directory"
)
volumes[local_stores_path] = k8s_client.V1Volume(
name="local-stores",
host_path=host_path,
)
logger.debug(
"Adding host path volume for the local ZenML stores (path: %s) "
"in kubeflow pipelines container.",
local_stores_path,
)
if sys.platform == "win32":
# File permissions are not checked on Windows. This if clause
# prevents mypy from complaining about unused 'type: ignore'
# statements
pass
else:
# Run KFP containers in the context of the local UID/GID
# to ensure that the artifact and metadata stores can be shared
# with the local pipeline runs.
container_op.container.security_context = (
k8s_client.V1SecurityContext(
run_as_user=os.getuid(),
run_as_group=os.getgid(),
)
)
logger.debug(
"Setting security context UID and GID to local user/group "
"in kubeflow pipelines container."
)
container_op.container.add_env_variable(
k8s_client.V1EnvVar(
name=ENV_ZENML_LOCAL_STORES_PATH,
value=local_stores_path,
)
)
container_op.add_pvolumes(volumes)
# Add some pod labels to the container_op
for k, v in KFP_POD_LABELS.items():
container_op.add_pod_label(k, v)
run_name = (
SCHEDULED_RUN_NAME_PLACEHOLDER
if is_scheduled_run
else SINGLE_RUN_RUN_NAME_PLACEHOLDER
)
container_op.container.add_env_variable(
k8s_client.V1EnvVar(
name=ENV_ZENML_RUN_NAME,
value=run_name,
)
)
if settings:
for key, value in settings.node_selectors.items():
container_op.add_node_selector_constraint(
label_name=key, value=value
)
if settings.node_affinity:
match_expressions = []
for key, values in settings.node_affinity.items():
match_expressions.append(
k8s_client.V1NodeSelectorRequirement(
key=key,
operator="In",
values=values,
)
)
affinity = k8s_client.V1Affinity(
node_affinity=k8s_client.V1NodeAffinity(
required_during_scheduling_ignored_during_execution=k8s_client.V1NodeSelector(
node_selector_terms=[
k8s_client.V1NodeSelectorTerm(
match_expressions=match_expressions
)
]
)
)
)
container_op.add_affinity(affinity)
# Mounts configmap containing Metadata gRPC server configuration.
container_op.apply(utils.mount_config_map_op("metadata-grpc-configmap"))
@staticmethod
def _configure_container_resources(
container_op: dsl.ContainerOp,
resource_settings: "ResourceSettings",
) -> None:
"""Adds resource requirements to the container.
Args:
container_op: The kubeflow container operation to configure.
resource_settings: The resource settings to use for this
container.
"""
if resource_settings.cpu_count is not None:
container_op = container_op.set_cpu_limit(
str(resource_settings.cpu_count)
)
if resource_settings.gpu_count is not None:
container_op = container_op.set_gpu_limit(
resource_settings.gpu_count
)
if resource_settings.memory is not None:
memory_limit = resource_settings.memory[:-1]
container_op = container_op.set_memory_limit(memory_limit)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Creates a kfp yaml file.
This functions as an intermediary representation of the pipeline which
is then deployed to the kubeflow pipelines instance.
How it works:
-------------
Before this method is called the `prepare_pipeline_deployment()`
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
container_ops for each step (`_construct_kfp_pipeline`).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these container_ops are then also configured onto each
container_op by pointing at the downstream steps.
This callable is then compiled into a kfp yaml file that is used as
the intermediary representation of the kubeflow pipeline.
This file, together with some metadata, runtime configurations is
then uploaded into the kubeflow pipelines cluster for execution.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
RuntimeError: If trying to run a pipeline in a notebook environment.
"""
# First check whether the code running in a notebook
if Environment.in_notebook():
raise RuntimeError(
"The Kubeflow orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
assert stack.container_registry
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
if self.is_local and stack.container_registry.config.is_local:
image_name = f"k3d-zenml-kubeflow-registry.{image_name}"
is_scheduled_run = bool(deployment.schedule)
# Create a callable for future compilation into a dsl.Pipeline.
def _construct_kfp_pipeline() -> None:
"""Create a container_op for each step.
This should contain the name of the docker image and configures the
entrypoint of the docker image to run the step.
Additionally, this gives each container_op information about its
direct downstream steps.
If this callable is passed to the `_create_and_write_workflow()`
method of a KFPCompiler all dsl.ContainerOp instances will be
automatically added to a singular dsl.Pipeline instance.
"""
# Dictionary of container_ops index by the associated step name
step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}
for step_name, step in deployment.steps.items():
# The command will be needed to eventually call the python step
# within the docker container
command = (
KubeflowEntrypointConfiguration.get_entrypoint_command()
)
# The arguments are passed to configure the entrypoint of the
# docker container when the step is called.
metadata_ui_path = "/outputs/mlpipeline-ui-metadata.json"
arguments = (
KubeflowEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
**{METADATA_UI_PATH_OPTION: metadata_ui_path},
)
)
# Create a container_op - the kubeflow equivalent of a step. It
# contains the name of the step, the name of the docker image,
# the command to use to run the step entrypoint
# (e.g. `python -m zenml.entrypoints.step_entrypoint`)
# and the arguments to be passed along with the command. Find
# out more about how these arguments are parsed and used
# in the base entrypoint `run()` method.
container_op = dsl.ContainerOp(
name=step.config.name,
image=image_name,
command=command,
arguments=arguments,
output_artifact_paths={
"mlpipeline-ui-metadata": metadata_ui_path,
},
)
settings = cast(
Optional[KubeflowOrchestratorSettings],
self.get_settings(step),
)
self._configure_container_op(
container_op=container_op,
is_scheduled_run=is_scheduled_run,
settings=settings,
)
if self.requires_resources_in_orchestration_environment(step):
self._configure_container_resources(
container_op=container_op,
resource_settings=step.config.resource_settings,
)
# Find the upstream container ops of the current step and
# configure the current container op to run after them
for upstream_step_name in step.spec.upstream_steps:
upstream_container_op = step_name_to_container_op[
upstream_step_name
]
container_op.after(upstream_container_op)
# Update dictionary of container ops with the current one
step_name_to_container_op[step.config.name] = container_op
# Get a filepath to use to save the finished yaml to
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory, f"{deployment.run_name}.yaml"
)
# write the argo pipeline yaml
KFPCompiler()._create_and_write_workflow(
pipeline_func=_construct_kfp_pipeline,
pipeline_name=deployment.pipeline.name,
package_path=pipeline_file_path,
)
# using the kfp client uploads the pipeline to kubeflow pipelines and
# runs it there
self._upload_and_run_pipeline(
deployment=deployment,
pipeline_file_path=pipeline_file_path,
)
def _upload_and_run_pipeline(
self,
deployment: "PipelineDeployment",
pipeline_file_path: str,
) -> None:
"""Tries to upload and run a KFP pipeline.
Args:
deployment: The pipeline deployment.
pipeline_file_path: Path to the pipeline definition file.
"""
pipeline_name = deployment.pipeline.name
run_name = deployment.run_name
enable_cache = deployment.pipeline.enable_cache
settings = cast(
Optional[KubeflowOrchestratorSettings],
self.get_settings(deployment),
)
user_namespace = settings.user_namespace if settings else None
try:
logger.info(
"Running in kubernetes context '%s'.",
self.kubernetes_context,
)
# upload the pipeline to Kubeflow and start it
client = self._get_kfp_client(settings=settings)
if deployment.schedule:
try:
experiment = client.get_experiment(
pipeline_name, namespace=user_namespace
)
logger.info(
"A recurring run has already been created with this "
"pipeline. Creating new recurring run now.."
)
except (ValueError, ApiException):
experiment = client.create_experiment(
pipeline_name, namespace=user_namespace
)
logger.info(
"Creating a new recurring run for pipeline '%s'.. ",
pipeline_name,
)
logger.info(
"You can see all recurring runs under the '%s' experiment.",
pipeline_name,
)
interval_seconds = (
deployment.schedule.interval_second.seconds
if deployment.schedule.interval_second
else None
)
result = client.create_recurring_run(
experiment_id=experiment.id,
job_name=run_name,
pipeline_package_path=pipeline_file_path,
enable_caching=enable_cache,
cron_expression=deployment.schedule.cron_expression,
start_time=deployment.schedule.utc_start_time,
end_time=deployment.schedule.utc_end_time,
interval_second=interval_seconds,
no_catchup=not deployment.schedule.catchup,
)
logger.info("Started recurring run with ID '%s'.", result.id)
else:
logger.info(
"No schedule detected. Creating a one-off pipeline run.."
)
result = client.create_run_from_pipeline_package(
pipeline_file_path,
arguments={},
run_name=run_name,
enable_caching=enable_cache,
namespace=user_namespace,
)
logger.info(
"Started one-off pipeline run with ID '%s'.", result.run_id
)
if self.config.synchronous:
# TODO [ENG-698]: Allow configuration of the timeout as a
# setting
client.wait_for_run_completion(
run_id=result.run_id, timeout=1200
)
except urllib3.exceptions.HTTPError as error:
logger.warning(
f"Failed to upload Kubeflow pipeline: %s. "
f"Please make sure your kubernetes config is present and the "
f"{self.kubernetes_context} kubernetes context is configured "
f"correctly.",
error,
)
def _get_kfp_client(
self,
settings: Optional[KubeflowOrchestratorSettings] = None,
) -> kfp.Client:
"""Creates a KFP client instance.
Args:
settings: Optional settings which can be used to
configure the client instance.
Returns:
A KFP client instance.
"""
client_args = {
"kube_context": self.config.kubernetes_context,
}
if settings:
client_args.update(settings.client_args)
# The host and namespace are stack component configurations that refer
# to the Kubeflow deployment. We don't want these overwritten on a
# run by run basis by user settings
client_args["host"] = self.config.kubeflow_hostname
client_args["namespace"] = self.config.kubeflow_namespace
return kfp.Client(**client_args)
@property
def _pid_file_path(self) -> str:
"""Returns path to the daemon PID file.
Returns:
Path to the daemon PID file.
"""
return os.path.join(self.root_directory, "kubeflow_daemon.pid")
@property
def log_file(self) -> str:
"""Path of the daemon log file.
Returns:
Path of the daemon log file.
"""
return os.path.join(self.root_directory, "kubeflow_daemon.log")
@property
def _k3d_cluster_name(self) -> str:
"""Returns the K3D cluster name.
Returns:
The K3D cluster name.
"""
return self._get_k3d_cluster_name(self.id)
def _get_k3d_registry_name(self, port: int) -> str:
"""Returns the K3D registry name.
Args:
port: Port of the registry.
Returns:
The registry name.
"""
return f"k3d-zenml-kubeflow-registry.localhost:{port}"
@property
def _k3d_registry_config_path(self) -> str:
"""Returns the path to the K3D registry config yaml.
Returns:
str: Path to the K3D registry config yaml.
"""
return os.path.join(self.root_directory, "k3d_registry.yaml")
def _get_kfp_ui_daemon_port(self) -> int:
"""Port to use for the KFP UI daemon.
Returns:
Port to use for the KFP UI daemon.
"""
port = self.config.kubeflow_pipelines_ui_port
if port == DEFAULT_KFP_UI_PORT and not networking_utils.port_available(
port
):
# if the user didn't specify a specific port and the default
# port is occupied, fallback to a random open port
port = networking_utils.find_available_port()
return port
def list_manual_setup_steps(
self, container_registry_name: str, container_registry_path: str
) -> None:
"""Logs manual steps needed to setup the Kubeflow local orchestrator.
Args:
container_registry_name: Name of the container registry.
container_registry_path: Path to the container registry.
"""
if not self.is_local:
# Make sure we're not telling users to deploy Kubeflow on their
# remote clusters
logger.warning(
"This Kubeflow orchestrator is configured to use a non-local "
f"Kubernetes context {self.kubernetes_context}. Manually "
f"deploying Kubeflow Pipelines is only possible for local "
f"Kubeflow orchestrators."
)
return
global_config_dir_path = io_utils.get_global_config_directory()
kubeflow_commands = [
f"> k3d cluster create {self._k3d_cluster_name} --image {local_deployment_utils.K3S_IMAGE_NAME} --registry-create {container_registry_name} --registry-config {container_registry_path} --volume {global_config_dir_path}:{global_config_dir_path}\n",
f"> kubectl --context {self.kubernetes_context} apply -k github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref={KFP_VERSION}&timeout=5m",
f"> kubectl --context {self.kubernetes_context} wait --timeout=60s --for condition=established crd/applications.app.k8s.io",
f"> kubectl --context {self.kubernetes_context} apply -k github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref={KFP_VERSION}&timeout=5m",
f"> kubectl --context {self.kubernetes_context} --namespace kubeflow port-forward svc/ml-pipeline-ui {self.config.kubeflow_pipelines_ui_port}:80",
]
logger.info(
"If you wish to spin up this Kubeflow local orchestrator manually, "
"please enter the following commands:\n"
)
logger.info("\n".join(kubeflow_commands))
@property
def is_provisioned(self) -> bool:
"""Returns if a local k3d cluster for this orchestrator exists.
Returns:
True if a local k3d cluster exists, False otherwise.
"""
if not local_deployment_utils.check_prerequisites(
skip_k3d=self.config.skip_cluster_provisioning or not self.is_local,
skip_kubectl=self.config.skip_cluster_provisioning
and self.config.skip_ui_daemon_provisioning,
):
# if any prerequisites are missing there is certainly no
# local deployment running
return False
return self.is_cluster_provisioned
@property
def is_running(self) -> bool:
"""Checks if the local k3d cluster and UI daemon are both running.
Returns:
True if the local k3d cluster and UI daemon for this orchestrator are both running.
"""
return (
self.is_provisioned
and self.is_cluster_running
and self.is_daemon_running
)
@property
def is_suspended(self) -> bool:
"""Checks if the local k3d cluster and UI daemon are both stopped.
Returns:
True if the cluster and daemon for this orchestrator are both stopped, False otherwise.
"""
return (
self.is_provisioned
and (
self.config.skip_cluster_provisioning
or not self.is_cluster_running
)
and (
self.config.skip_ui_daemon_provisioning
or not self.is_daemon_running
)
)
@property
def is_cluster_provisioned(self) -> bool:
"""Returns if the local k3d cluster for this orchestrator is provisioned.
For remote (i.e. not managed by ZenML) Kubeflow Pipelines installations,
this always returns True.
Returns:
True if the local k3d cluster is provisioned, False otherwise.
"""
if self.config.skip_cluster_provisioning or not self.is_local:
return True
return local_deployment_utils.k3d_cluster_exists(
cluster_name=self._k3d_cluster_name
)
@property
def is_cluster_running(self) -> bool:
"""Returns if the local k3d cluster for this orchestrator is running.
For remote (i.e. not managed by ZenML) Kubeflow Pipelines installations,
this always returns True.
Returns:
True if the local k3d cluster is running, False otherwise.
"""
if self.config.skip_cluster_provisioning or not self.is_local:
return True
return local_deployment_utils.k3d_cluster_running(
cluster_name=self._k3d_cluster_name
)
@property
def is_daemon_running(self) -> bool:
"""Returns if the local Kubeflow UI daemon for this orchestrator is running.
Returns:
True if the daemon is running, False otherwise.
"""
if self.config.skip_ui_daemon_provisioning:
return True
if sys.platform != "win32":
from zenml.utils.daemon import check_if_daemon_is_running
return check_if_daemon_is_running(self._pid_file_path)
else:
return True
def provision(self) -> None:
"""Provisions a local Kubeflow Pipelines deployment.
Raises:
ProvisioningError: If the provisioning fails.
"""
if self.config.skip_cluster_provisioning:
return
if self.is_running:
logger.info(
"Found already existing local Kubeflow Pipelines deployment. "
"If there are any issues with the existing deployment, please "
"run 'zenml stack down --force' to delete it."
)
return
if not local_deployment_utils.check_prerequisites():
raise ProvisioningError(
"Unable to provision local Kubeflow Pipelines deployment: "
"Please install 'k3d' and 'kubectl' and try again."
)
container_registry = Client().active_stack.container_registry
# should not happen, because the stack validation takes care of this,
# but just in case
assert container_registry is not None
fileio.makedirs(self.root_directory)
if not self.is_local:
# don't provision any resources if using a remote KFP installation
return
logger.info("Provisioning local Kubeflow Pipelines deployment...")
container_registry_port = int(
container_registry.config.uri.split(":")[-1]
)
container_registry_name = self._get_k3d_registry_name(
port=container_registry_port
)
local_deployment_utils.write_local_registry_yaml(
yaml_path=self._k3d_registry_config_path,
registry_name=container_registry_name,
registry_uri=container_registry.config.uri,
)
try:
local_deployment_utils.create_k3d_cluster(
cluster_name=self._k3d_cluster_name,
registry_name=container_registry_name,
registry_config_path=self._k3d_registry_config_path,
)
kubernetes_context = self.kubernetes_context
# will never happen, but mypy doesn't know that
assert kubernetes_context is not None
local_deployment_utils.deploy_kubeflow_pipelines(
kubernetes_context=kubernetes_context
)
artifact_store = Client().active_stack.artifact_store
if isinstance(artifact_store, LocalArtifactStore):
local_deployment_utils.add_hostpath_to_kubeflow_pipelines(
kubernetes_context=kubernetes_context,
local_path=artifact_store.path,
)
except Exception as e:
logger.error(e)
logger.error(
"Unable to spin up local Kubeflow Pipelines deployment."
)
self.list_manual_setup_steps(
container_registry_name, self._k3d_registry_config_path
)
self.deprovision()
def deprovision(self) -> None:
"""Deprovisions a local Kubeflow Pipelines deployment."""
if self.config.skip_cluster_provisioning:
return
if (
not self.config.skip_ui_daemon_provisioning
and self.is_daemon_running
):
local_deployment_utils.stop_kfp_ui_daemon(
pid_file_path=self._pid_file_path
)
if self.is_local:
# don't deprovision any resources if using a remote KFP installation
local_deployment_utils.delete_k3d_cluster(
cluster_name=self._k3d_cluster_name
)
logger.info("Local kubeflow pipelines deployment deprovisioned.")
if fileio.exists(self.log_file):
fileio.remove(self.log_file)
def resume(self) -> None:
"""Resumes the local k3d cluster.
Raises:
ProvisioningError: If the k3d cluster is not provisioned.
"""
if self.is_running:
logger.info("Local kubeflow pipelines deployment already running.")
return
if not self.is_provisioned:
raise ProvisioningError(
"Unable to resume local kubeflow pipelines deployment: No "
"resources provisioned for local deployment."
)
kubernetes_context = self.kubernetes_context
# will never happen, but mypy doesn't know that
assert kubernetes_context is not None
if (
not self.config.skip_cluster_provisioning
and self.is_local
and not self.is_cluster_running
):
# don't resume any resources if using a remote KFP installation
local_deployment_utils.start_k3d_cluster(
cluster_name=self._k3d_cluster_name
)
local_deployment_utils.wait_until_kubeflow_pipelines_ready(
kubernetes_context=kubernetes_context
)
if not self.is_daemon_running:
local_deployment_utils.start_kfp_ui_daemon(
pid_file_path=self._pid_file_path,
log_file_path=self.log_file,
port=self._get_kfp_ui_daemon_port(),
kubernetes_context=kubernetes_context,
)
def suspend(self) -> None:
"""Suspends the local k3d cluster."""
if not self.is_provisioned:
logger.info("Local kubeflow pipelines deployment not provisioned.")
return
if (
not self.config.skip_ui_daemon_provisioning
and self.is_daemon_running
):
local_deployment_utils.stop_kfp_ui_daemon(
pid_file_path=self._pid_file_path
)
if (
not self.config.skip_cluster_provisioning
and self.is_local
and self.is_cluster_running
):
# don't suspend any resources if using a remote KFP installation
local_deployment_utils.stop_k3d_cluster(
cluster_name=self._k3d_cluster_name
)
config: KubeflowOrchestratorConfig
property
readonly
Returns the KubeflowOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
KubeflowOrchestratorConfig |
The configuration. |
is_cluster_provisioned: bool
property
readonly
Returns if the local k3d cluster for this orchestrator is provisioned.
For remote (i.e. not managed by ZenML) Kubeflow Pipelines installations, this always returns True.
Returns:
Type | Description |
---|---|
bool |
True if the local k3d cluster is provisioned, False otherwise. |
is_cluster_running: bool
property
readonly
Returns if the local k3d cluster for this orchestrator is running.
For remote (i.e. not managed by ZenML) Kubeflow Pipelines installations, this always returns True.
Returns:
Type | Description |
---|---|
bool |
True if the local k3d cluster is running, False otherwise. |
is_daemon_running: bool
property
readonly
Returns if the local Kubeflow UI daemon for this orchestrator is running.
Returns:
Type | Description |
---|---|
bool |
True if the daemon is running, False otherwise. |
is_local: bool
property
readonly
Checks if the KFP orchestrator is running locally.
Returns:
Type | Description |
---|---|
bool |
|
is_provisioned: bool
property
readonly
Returns if a local k3d cluster for this orchestrator exists.
Returns:
Type | Description |
---|---|
bool |
True if a local k3d cluster exists, False otherwise. |
is_running: bool
property
readonly
Checks if the local k3d cluster and UI daemon are both running.
Returns:
Type | Description |
---|---|
bool |
True if the local k3d cluster and UI daemon for this orchestrator are both running. |
is_suspended: bool
property
readonly
Checks if the local k3d cluster and UI daemon are both stopped.
Returns:
Type | Description |
---|---|
bool |
True if the cluster and daemon for this orchestrator are both stopped, False otherwise. |
kubernetes_context: str
property
readonly
Gets the kubernetes context associated with the orchestrator.
This sets the default kubernetes_context
value to the value that is
used to create the locally managed k3d cluster, if not explicitly set.
Returns:
Type | Description |
---|---|
str |
The kubernetes context associated with the orchestrator. |
log_file: str
property
readonly
Path of the daemon log file.
Returns:
Type | Description |
---|---|
str |
Path of the daemon log file. |
pipeline_directory: str
property
readonly
Returns path to a directory in which the kubeflow pipeline files are stored.
Returns:
Type | Description |
---|---|
str |
Path to the pipeline directory. |
root_directory: str
property
readonly
Path to the root directory for all files concerning this orchestrator.
Returns:
Type | Description |
---|---|
str |
Path to the root directory. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Kubeflow orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates that the stack contains a container registry.
Also check that requirements are met for local components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
deprovision(self)
Deprovisions a local Kubeflow Pipelines deployment.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def deprovision(self) -> None:
"""Deprovisions a local Kubeflow Pipelines deployment."""
if self.config.skip_cluster_provisioning:
return
if (
not self.config.skip_ui_daemon_provisioning
and self.is_daemon_running
):
local_deployment_utils.stop_kfp_ui_daemon(
pid_file_path=self._pid_file_path
)
if self.is_local:
# don't deprovision any resources if using a remote KFP installation
local_deployment_utils.delete_k3d_cluster(
cluster_name=self._k3d_cluster_name
)
logger.info("Local kubeflow pipelines deployment deprovisioned.")
if fileio.exists(self.log_file):
fileio.remove(self.log_file)
get_kubernetes_contexts(self)
Get the list of configured Kubernetes contexts and the active context.
Returns:
Type | Description |
---|---|
Tuple[List[str], Optional[str]] |
A tuple containing the list of configured Kubernetes contexts and the active context. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
"""Get the list of configured Kubernetes contexts and the active context.
Returns:
A tuple containing the list of configured Kubernetes contexts and
the active context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException:
return [], None
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
list_manual_setup_steps(self, container_registry_name, container_registry_path)
Logs manual steps needed to setup the Kubeflow local orchestrator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
container_registry_name |
str |
Name of the container registry. |
required |
container_registry_path |
str |
Path to the container registry. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def list_manual_setup_steps(
self, container_registry_name: str, container_registry_path: str
) -> None:
"""Logs manual steps needed to setup the Kubeflow local orchestrator.
Args:
container_registry_name: Name of the container registry.
container_registry_path: Path to the container registry.
"""
if not self.is_local:
# Make sure we're not telling users to deploy Kubeflow on their
# remote clusters
logger.warning(
"This Kubeflow orchestrator is configured to use a non-local "
f"Kubernetes context {self.kubernetes_context}. Manually "
f"deploying Kubeflow Pipelines is only possible for local "
f"Kubeflow orchestrators."
)
return
global_config_dir_path = io_utils.get_global_config_directory()
kubeflow_commands = [
f"> k3d cluster create {self._k3d_cluster_name} --image {local_deployment_utils.K3S_IMAGE_NAME} --registry-create {container_registry_name} --registry-config {container_registry_path} --volume {global_config_dir_path}:{global_config_dir_path}\n",
f"> kubectl --context {self.kubernetes_context} apply -k github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref={KFP_VERSION}&timeout=5m",
f"> kubectl --context {self.kubernetes_context} wait --timeout=60s --for condition=established crd/applications.app.k8s.io",
f"> kubectl --context {self.kubernetes_context} apply -k github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref={KFP_VERSION}&timeout=5m",
f"> kubectl --context {self.kubernetes_context} --namespace kubeflow port-forward svc/ml-pipeline-ui {self.config.kubeflow_pipelines_ui_port}:80",
]
logger.info(
"If you wish to spin up this Kubeflow local orchestrator manually, "
"please enter the following commands:\n"
)
logger.info("\n".join(kubeflow_commands))
prepare_or_run_pipeline(self, deployment, stack)
Creates a kfp yaml file.
This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.
How it works:
Before this method is called the prepare_pipeline_deployment()
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
container_ops for each step (_construct_kfp_pipeline
).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these container_ops are then also configured onto each
container_op by pointing at the downstream steps.
This callable is then compiled into a kfp yaml file that is used as the intermediary representation of the kubeflow pipeline.
This file, together with some metadata, runtime configurations is then uploaded into the kubeflow pipelines cluster for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If trying to run a pipeline in a notebook environment. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Creates a kfp yaml file.
This functions as an intermediary representation of the pipeline which
is then deployed to the kubeflow pipelines instance.
How it works:
-------------
Before this method is called the `prepare_pipeline_deployment()`
method builds a docker image that contains the code for the
pipeline, all steps the context around these files.
Based on this docker image a callable is created which builds
container_ops for each step (`_construct_kfp_pipeline`).
To do this the entrypoint of the docker image is configured to
run the correct step within the docker image. The dependencies
between these container_ops are then also configured onto each
container_op by pointing at the downstream steps.
This callable is then compiled into a kfp yaml file that is used as
the intermediary representation of the kubeflow pipeline.
This file, together with some metadata, runtime configurations is
then uploaded into the kubeflow pipelines cluster for execution.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
RuntimeError: If trying to run a pipeline in a notebook environment.
"""
# First check whether the code running in a notebook
if Environment.in_notebook():
raise RuntimeError(
"The Kubeflow orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
assert stack.container_registry
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
if self.is_local and stack.container_registry.config.is_local:
image_name = f"k3d-zenml-kubeflow-registry.{image_name}"
is_scheduled_run = bool(deployment.schedule)
# Create a callable for future compilation into a dsl.Pipeline.
def _construct_kfp_pipeline() -> None:
"""Create a container_op for each step.
This should contain the name of the docker image and configures the
entrypoint of the docker image to run the step.
Additionally, this gives each container_op information about its
direct downstream steps.
If this callable is passed to the `_create_and_write_workflow()`
method of a KFPCompiler all dsl.ContainerOp instances will be
automatically added to a singular dsl.Pipeline instance.
"""
# Dictionary of container_ops index by the associated step name
step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}
for step_name, step in deployment.steps.items():
# The command will be needed to eventually call the python step
# within the docker container
command = (
KubeflowEntrypointConfiguration.get_entrypoint_command()
)
# The arguments are passed to configure the entrypoint of the
# docker container when the step is called.
metadata_ui_path = "/outputs/mlpipeline-ui-metadata.json"
arguments = (
KubeflowEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
**{METADATA_UI_PATH_OPTION: metadata_ui_path},
)
)
# Create a container_op - the kubeflow equivalent of a step. It
# contains the name of the step, the name of the docker image,
# the command to use to run the step entrypoint
# (e.g. `python -m zenml.entrypoints.step_entrypoint`)
# and the arguments to be passed along with the command. Find
# out more about how these arguments are parsed and used
# in the base entrypoint `run()` method.
container_op = dsl.ContainerOp(
name=step.config.name,
image=image_name,
command=command,
arguments=arguments,
output_artifact_paths={
"mlpipeline-ui-metadata": metadata_ui_path,
},
)
settings = cast(
Optional[KubeflowOrchestratorSettings],
self.get_settings(step),
)
self._configure_container_op(
container_op=container_op,
is_scheduled_run=is_scheduled_run,
settings=settings,
)
if self.requires_resources_in_orchestration_environment(step):
self._configure_container_resources(
container_op=container_op,
resource_settings=step.config.resource_settings,
)
# Find the upstream container ops of the current step and
# configure the current container op to run after them
for upstream_step_name in step.spec.upstream_steps:
upstream_container_op = step_name_to_container_op[
upstream_step_name
]
container_op.after(upstream_container_op)
# Update dictionary of container ops with the current one
step_name_to_container_op[step.config.name] = container_op
# Get a filepath to use to save the finished yaml to
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory, f"{deployment.run_name}.yaml"
)
# write the argo pipeline yaml
KFPCompiler()._create_and_write_workflow(
pipeline_func=_construct_kfp_pipeline,
pipeline_name=deployment.pipeline.name,
package_path=pipeline_file_path,
)
# using the kfp client uploads the pipeline to kubeflow pipelines and
# runs it there
self._upload_and_run_pipeline(
deployment=deployment,
pipeline_file_path=pipeline_file_path,
)
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
provision(self)
Provisions a local Kubeflow Pipelines deployment.
Exceptions:
Type | Description |
---|---|
ProvisioningError |
If the provisioning fails. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def provision(self) -> None:
"""Provisions a local Kubeflow Pipelines deployment.
Raises:
ProvisioningError: If the provisioning fails.
"""
if self.config.skip_cluster_provisioning:
return
if self.is_running:
logger.info(
"Found already existing local Kubeflow Pipelines deployment. "
"If there are any issues with the existing deployment, please "
"run 'zenml stack down --force' to delete it."
)
return
if not local_deployment_utils.check_prerequisites():
raise ProvisioningError(
"Unable to provision local Kubeflow Pipelines deployment: "
"Please install 'k3d' and 'kubectl' and try again."
)
container_registry = Client().active_stack.container_registry
# should not happen, because the stack validation takes care of this,
# but just in case
assert container_registry is not None
fileio.makedirs(self.root_directory)
if not self.is_local:
# don't provision any resources if using a remote KFP installation
return
logger.info("Provisioning local Kubeflow Pipelines deployment...")
container_registry_port = int(
container_registry.config.uri.split(":")[-1]
)
container_registry_name = self._get_k3d_registry_name(
port=container_registry_port
)
local_deployment_utils.write_local_registry_yaml(
yaml_path=self._k3d_registry_config_path,
registry_name=container_registry_name,
registry_uri=container_registry.config.uri,
)
try:
local_deployment_utils.create_k3d_cluster(
cluster_name=self._k3d_cluster_name,
registry_name=container_registry_name,
registry_config_path=self._k3d_registry_config_path,
)
kubernetes_context = self.kubernetes_context
# will never happen, but mypy doesn't know that
assert kubernetes_context is not None
local_deployment_utils.deploy_kubeflow_pipelines(
kubernetes_context=kubernetes_context
)
artifact_store = Client().active_stack.artifact_store
if isinstance(artifact_store, LocalArtifactStore):
local_deployment_utils.add_hostpath_to_kubeflow_pipelines(
kubernetes_context=kubernetes_context,
local_path=artifact_store.path,
)
except Exception as e:
logger.error(e)
logger.error(
"Unable to spin up local Kubeflow Pipelines deployment."
)
self.list_manual_setup_steps(
container_registry_name, self._k3d_registry_config_path
)
self.deprovision()
resume(self)
Resumes the local k3d cluster.
Exceptions:
Type | Description |
---|---|
ProvisioningError |
If the k3d cluster is not provisioned. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def resume(self) -> None:
"""Resumes the local k3d cluster.
Raises:
ProvisioningError: If the k3d cluster is not provisioned.
"""
if self.is_running:
logger.info("Local kubeflow pipelines deployment already running.")
return
if not self.is_provisioned:
raise ProvisioningError(
"Unable to resume local kubeflow pipelines deployment: No "
"resources provisioned for local deployment."
)
kubernetes_context = self.kubernetes_context
# will never happen, but mypy doesn't know that
assert kubernetes_context is not None
if (
not self.config.skip_cluster_provisioning
and self.is_local
and not self.is_cluster_running
):
# don't resume any resources if using a remote KFP installation
local_deployment_utils.start_k3d_cluster(
cluster_name=self._k3d_cluster_name
)
local_deployment_utils.wait_until_kubeflow_pipelines_ready(
kubernetes_context=kubernetes_context
)
if not self.is_daemon_running:
local_deployment_utils.start_kfp_ui_daemon(
pid_file_path=self._pid_file_path,
log_file_path=self.log_file,
port=self._get_kfp_ui_daemon_port(),
kubernetes_context=kubernetes_context,
)
suspend(self)
Suspends the local k3d cluster.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
def suspend(self) -> None:
"""Suspends the local k3d cluster."""
if not self.is_provisioned:
logger.info("Local kubeflow pipelines deployment not provisioned.")
return
if (
not self.config.skip_ui_daemon_provisioning
and self.is_daemon_running
):
local_deployment_utils.stop_kfp_ui_daemon(
pid_file_path=self._pid_file_path
)
if (
not self.config.skip_cluster_provisioning
and self.is_local
and self.is_cluster_running
):
# don't suspend any resources if using a remote KFP installation
local_deployment_utils.stop_k3d_cluster(
cluster_name=self._k3d_cluster_name
)
local_deployment_utils
Utils for the local Kubeflow deployment behaviors.
add_hostpath_to_kubeflow_pipelines(kubernetes_context, local_path)
Patches the Kubeflow Pipelines deployment to mount a local folder.
This folder serves as a hostpath for visualization purposes.
This function reconfigures the Kubeflow pipelines deployment to use a shared local folder to support loading the TensorBoard viewer and other pipeline visualization results from a local artifact store, as described here:
https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kubernetes_context |
str |
The kubernetes context on which Kubeflow Pipelines should be patched. |
required |
local_path |
str |
The path to the local folder to mount as a hostpath. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def add_hostpath_to_kubeflow_pipelines(
kubernetes_context: str, local_path: str
) -> None:
"""Patches the Kubeflow Pipelines deployment to mount a local folder.
This folder serves as a hostpath for visualization purposes.
This function reconfigures the Kubeflow pipelines deployment to use a
shared local folder to support loading the TensorBoard viewer and other
pipeline visualization results from a local artifact store, as described
here:
https://github.com/kubeflow/pipelines/blob/master/docs/config/volume-support.md
Args:
kubernetes_context: The kubernetes context on which Kubeflow Pipelines
should be patched.
local_path: The path to the local folder to mount as a hostpath.
"""
logger.info("Patching Kubeflow Pipelines to mount a local folder.")
pod_template = {
"spec": {
"serviceAccountName": "kubeflow-pipelines-viewer",
"containers": [
{
"volumeMounts": [
{
"mountPath": local_path,
"name": "local-artifact-store",
}
]
}
],
"volumes": [
{
"hostPath": {
"path": local_path,
"type": "Directory",
},
"name": "local-artifact-store",
}
],
}
}
pod_template_json = json.dumps(pod_template, indent=2)
config_map_data = {"data": {"viewer-pod-template.json": pod_template_json}}
config_map_data_json = json.dumps(config_map_data, indent=2)
logger.debug(
"Adding host path volume for local path `%s` to kubeflow pipeline"
"viewer pod template configuration.",
local_path,
)
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"-n",
"kubeflow",
"patch",
"configmap/ml-pipeline-ui-configmap",
"--type",
"merge",
"-p",
config_map_data_json,
]
)
deployment_patch = {
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "ml-pipeline-ui",
"volumeMounts": [
{
"mountPath": local_path,
"name": "local-artifact-store",
}
],
}
],
"volumes": [
{
"hostPath": {
"path": local_path,
"type": "Directory",
},
"name": "local-artifact-store",
}
],
}
}
}
}
deployment_patch_json = json.dumps(deployment_patch, indent=2)
logger.debug(
"Adding host path volume for local path `%s` to the kubeflow UI",
local_path,
)
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"-n",
"kubeflow",
"patch",
"deployment/ml-pipeline-ui",
"--type",
"strategic",
"-p",
deployment_patch_json,
]
)
wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context)
logger.info("Finished patching Kubeflow Pipelines setup.")
check_prerequisites(skip_k3d=False, skip_kubectl=False)
Checks prerequisites for a local kubeflow pipelines deployment.
It makes sure they are installed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
skip_k3d |
bool |
Whether to skip the check for the k3d command. |
False |
skip_kubectl |
bool |
Whether to skip the check for the kubectl command. |
False |
Returns:
Type | Description |
---|---|
bool |
Whether all prerequisites are installed. |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def check_prerequisites(
skip_k3d: bool = False, skip_kubectl: bool = False
) -> bool:
"""Checks prerequisites for a local kubeflow pipelines deployment.
It makes sure they are installed.
Args:
skip_k3d: Whether to skip the check for the k3d command.
skip_kubectl: Whether to skip the check for the kubectl command.
Returns:
Whether all prerequisites are installed.
"""
k3d_installed = skip_k3d or shutil.which("k3d") is not None
kubectl_installed = skip_kubectl or shutil.which("kubectl") is not None
logger.debug(
"Local kubeflow deployment prerequisites: K3D - %s, Kubectl - %s",
k3d_installed,
kubectl_installed,
)
return k3d_installed and kubectl_installed
create_k3d_cluster(cluster_name, registry_name, registry_config_path)
Creates a K3D cluster.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cluster_name |
str |
Name of the cluster to create. |
required |
registry_name |
str |
Name of the registry to create for this cluster. |
required |
registry_config_path |
str |
Path to the registry config file. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def create_k3d_cluster(
cluster_name: str, registry_name: str, registry_config_path: str
) -> None:
"""Creates a K3D cluster.
Args:
cluster_name: Name of the cluster to create.
registry_name: Name of the registry to create for this cluster.
registry_config_path: Path to the registry config file.
"""
logger.info("Creating local K3D cluster '%s'.", cluster_name)
local_stores_path = GlobalConfiguration().local_stores_path
subprocess.check_call(
[
"k3d",
"cluster",
"create",
cluster_name,
"--image",
K3S_IMAGE_NAME,
"--registry-create",
registry_name,
"--registry-config",
registry_config_path,
"--volume",
f"{local_stores_path}:{local_stores_path}",
]
)
logger.info("Finished K3D cluster creation.")
delete_k3d_cluster(cluster_name)
Deletes a K3D cluster with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cluster_name |
str |
Name of the cluster to delete. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def delete_k3d_cluster(cluster_name: str) -> None:
"""Deletes a K3D cluster with the given name.
Args:
cluster_name: Name of the cluster to delete.
"""
subprocess.check_call(["k3d", "cluster", "delete", cluster_name])
logger.info("Deleted local k3d cluster '%s'.", cluster_name)
deploy_kubeflow_pipelines(kubernetes_context)
Deploys Kubeflow Pipelines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kubernetes_context |
str |
The kubernetes context on which Kubeflow Pipelines should be deployed. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def deploy_kubeflow_pipelines(kubernetes_context: str) -> None:
"""Deploys Kubeflow Pipelines.
Args:
kubernetes_context: The kubernetes context on which Kubeflow Pipelines
should be deployed.
"""
logger.info("Deploying Kubeflow Pipelines.")
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"apply",
"-k",
f"github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref={KFP_VERSION}&timeout=5m",
]
)
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"wait",
"--timeout=60s",
"--for",
"condition=established",
"crd/applications.app.k8s.io",
]
)
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"apply",
"-k",
f"github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref={KFP_VERSION}&timeout=5m",
]
)
wait_until_kubeflow_pipelines_ready(kubernetes_context=kubernetes_context)
logger.info("Finished Kubeflow Pipelines setup.")
k3d_cluster_exists(cluster_name)
Checks whether there exists a K3D cluster with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cluster_name |
str |
Name of the cluster to check. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the cluster exists. |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def k3d_cluster_exists(cluster_name: str) -> bool:
"""Checks whether there exists a K3D cluster with the given name.
Args:
cluster_name: Name of the cluster to check.
Returns:
Whether the cluster exists.
"""
output = subprocess.check_output(
["k3d", "cluster", "list", "--output", "json"]
)
clusters = json.loads(output)
for cluster in clusters:
if cluster["name"] == cluster_name:
return True
return False
k3d_cluster_running(cluster_name)
Checks whether the K3D cluster with the given name is running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cluster_name |
str |
Name of the cluster to check. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether the cluster is running. |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def k3d_cluster_running(cluster_name: str) -> bool:
"""Checks whether the K3D cluster with the given name is running.
Args:
cluster_name: Name of the cluster to check.
Returns:
Whether the cluster is running.
"""
output = subprocess.check_output(
["k3d", "cluster", "list", "--output", "json"]
)
clusters = json.loads(output)
for cluster in clusters:
if cluster["name"] == cluster_name:
server_count: int = cluster["serversCount"]
servers_running: int = cluster["serversRunning"]
return servers_running == server_count
return False
kubeflow_pipelines_ready(kubernetes_context)
Returns whether all Kubeflow Pipelines pods are ready.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kubernetes_context |
str |
The kubernetes context in which the pods should be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
Whether all Kubeflow Pipelines pods are ready. |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def kubeflow_pipelines_ready(kubernetes_context: str) -> bool:
"""Returns whether all Kubeflow Pipelines pods are ready.
Args:
kubernetes_context: The kubernetes context in which the pods
should be checked.
Returns:
Whether all Kubeflow Pipelines pods are ready.
"""
try:
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"--namespace",
"kubeflow",
"wait",
"--for",
"condition=ready",
"--timeout=0s",
"pods",
"-l",
"application-crd-id=kubeflow-pipelines",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True
except subprocess.CalledProcessError:
return False
start_k3d_cluster(cluster_name)
Starts a K3D cluster with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cluster_name |
str |
Name of the cluster to start. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def start_k3d_cluster(cluster_name: str) -> None:
"""Starts a K3D cluster with the given name.
Args:
cluster_name: Name of the cluster to start.
"""
subprocess.check_call(["k3d", "cluster", "start", cluster_name])
logger.info("Started local k3d cluster '%s'.", cluster_name)
start_kfp_ui_daemon(pid_file_path, log_file_path, port, kubernetes_context)
Starts a daemon process that forwards ports.
This is so the Kubeflow Pipelines UI is accessible in the browser.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file_path |
str |
Path where the file with the daemons process ID should be written. |
required |
log_file_path |
str |
Path to a file where the daemon logs should be written. |
required |
port |
int |
Port on which the UI should be accessible. |
required |
kubernetes_context |
str |
The kubernetes context for the cluster where Kubeflow Pipelines is running. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def start_kfp_ui_daemon(
pid_file_path: str,
log_file_path: str,
port: int,
kubernetes_context: str,
) -> None:
"""Starts a daemon process that forwards ports.
This is so the Kubeflow Pipelines UI is accessible in the browser.
Args:
pid_file_path: Path where the file with the daemons process ID should
be written.
log_file_path: Path to a file where the daemon logs should be written.
port: Port on which the UI should be accessible.
kubernetes_context: The kubernetes context for the cluster where
Kubeflow Pipelines is running.
"""
command = [
"kubectl",
"--context",
kubernetes_context,
"--namespace",
"kubeflow",
"port-forward",
"svc/ml-pipeline-ui",
f"{port}:80",
]
if not networking_utils.port_available(port):
modified_command = command.copy()
modified_command[-1] = "PORT:80"
logger.warning(
"Unable to port-forward Kubeflow Pipelines UI to local port %d "
"because the port is occupied. In order to access the Kubeflow "
"Pipelines UI at http://localhost:PORT/, please run '%s' in a "
"separate command line shell (replace PORT with a free port of "
"your choice).",
port,
" ".join(modified_command),
)
elif sys.platform == "win32":
logger.warning(
"Daemon functionality not supported on Windows. "
"In order to access the Kubeflow Pipelines UI at "
"http://localhost:%d/, please run '%s' in a separate command "
"line shell.",
port,
" ".join(command),
)
else:
from zenml.utils import daemon
def _daemon_function() -> None:
"""Port-forwards the Kubeflow Pipelines UI pod."""
subprocess.check_call(command)
daemon.run_as_daemon(
_daemon_function, pid_file=pid_file_path, log_file=log_file_path
)
logger.info(
"Started Kubeflow Pipelines UI daemon (check the daemon logs at %s "
"in case you're not able to view the UI). The Kubeflow Pipelines "
"UI should now be accessible at http://localhost:%d/.",
log_file_path,
port,
)
stop_k3d_cluster(cluster_name)
Stops a K3D cluster with the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cluster_name |
str |
Name of the cluster to stop. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def stop_k3d_cluster(cluster_name: str) -> None:
"""Stops a K3D cluster with the given name.
Args:
cluster_name: Name of the cluster to stop.
"""
subprocess.check_call(["k3d", "cluster", "stop", cluster_name])
logger.info("Stopped local k3d cluster '%s'.", cluster_name)
stop_kfp_ui_daemon(pid_file_path)
Stops the KFP UI daemon process if it is running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file_path |
str |
Path to the file with the daemons process ID. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def stop_kfp_ui_daemon(pid_file_path: str) -> None:
"""Stops the KFP UI daemon process if it is running.
Args:
pid_file_path: Path to the file with the daemons process ID.
"""
if fileio.exists(pid_file_path):
if sys.platform == "win32":
# Daemon functionality is not supported on Windows, so the PID
# file won't exist. This if clause exists just for mypy to not
# complain about missing functions
pass
else:
from zenml.utils import daemon
daemon.stop_daemon(pid_file_path)
fileio.remove(pid_file_path)
logger.info("Stopped Kubeflow Pipelines UI daemon.")
wait_until_kubeflow_pipelines_ready(kubernetes_context)
Waits until all Kubeflow Pipelines pods are ready.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kubernetes_context |
str |
The kubernetes context in which the pods should be checked. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def wait_until_kubeflow_pipelines_ready(kubernetes_context: str) -> None:
"""Waits until all Kubeflow Pipelines pods are ready.
Args:
kubernetes_context: The kubernetes context in which the pods
should be checked.
"""
logger.info(
"Waiting for all Kubeflow Pipelines pods to be ready (this might "
"take a few minutes)."
)
while True:
logger.info("Current pod status:")
subprocess.check_call(
[
"kubectl",
"--context",
kubernetes_context,
"--namespace",
"kubeflow",
"get",
"pods",
]
)
if kubeflow_pipelines_ready(kubernetes_context=kubernetes_context):
break
logger.info("One or more pods not ready yet, waiting for 30 seconds...")
time.sleep(30)
write_local_registry_yaml(yaml_path, registry_name, registry_uri)
Writes a K3D registry config file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
yaml_path |
str |
Path where the config file should be written to. |
required |
registry_name |
str |
Name of the registry. |
required |
registry_uri |
str |
URI of the registry. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/local_deployment_utils.py
def write_local_registry_yaml(
yaml_path: str, registry_name: str, registry_uri: str
) -> None:
"""Writes a K3D registry config file.
Args:
yaml_path: Path where the config file should be written to.
registry_name: Name of the registry.
registry_uri: URI of the registry.
"""
yaml_content = {
"mirrors": {registry_uri: {"endpoint": [f"http://{registry_name}"]}}
}
yaml_utils.write_yaml(yaml_path, yaml_content)
utils
Utils for ZenML Kubeflow orchestrators implementation.
dump_ui_metadata(execution_info, metadata_ui_path)
Dump KFP UI metadata json file for visualization purpose.
For general components we just render a simple Markdown file for exec_properties/inputs/outputs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_info |
ExecutionInfo |
runtime execution info for this component, including materialized inputs/outputs/execution properties and id. |
required |
metadata_ui_path |
str |
path to dump ui metadata. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/utils.py
def dump_ui_metadata(
execution_info: data_types.ExecutionInfo,
metadata_ui_path: str,
) -> None:
"""Dump KFP UI metadata json file for visualization purpose.
For general components we just render a simple Markdown file for
exec_properties/inputs/outputs.
Args:
execution_info: runtime execution info for this component, including
materialized inputs/outputs/execution properties and id.
metadata_ui_path: path to dump ui metadata.
"""
node = execution_info.pipeline_node
if not node:
return
exec_properties_list = [
"**{}**: {}".format(
_sanitize_underscore(name), _sanitize_underscore(exec_property)
)
for name, exec_property in execution_info.exec_properties.items()
]
src_str_exec_properties = "# Execution properties:\n{}".format(
"\n\n".join(exec_properties_list) or "No execution property."
)
def _dump_input_populated_artifacts(
node_inputs: MutableMapping[str, InputSpec],
name_to_artifacts: Dict[str, List[artifact.Artifact]],
) -> List[str]:
"""Dump artifacts markdown string for inputs.
Args:
node_inputs: maps from input name to input sepc proto.
name_to_artifacts: maps from input key to list of populated artifacts.
Returns:
A list of dumped markdown string, each of which represents a channel.
"""
rendered_list = []
for name, spec in node_inputs.items():
# Need to look for materialized artifacts in the execution decision.
rendered_artifacts = "".join(
[
_render_artifact_as_mdstr(single_artifact)
for single_artifact in name_to_artifacts.get(name, [])
]
)
# There must be at least a channel in a input, and all channels in
# a input share the same artifact type.
artifact_type = spec.channels[0].artifact_query.type.name
rendered_list.append(
"## {name}\n\n**Type**: {channel_type}\n\n{artifacts}".format(
name=_sanitize_underscore(name),
channel_type=_sanitize_underscore(artifact_type),
artifacts=rendered_artifacts,
)
)
return rendered_list
def _dump_output_populated_artifacts(
node_outputs: MutableMapping[str, OutputSpec],
name_to_artifacts: Dict[str, List[artifact.Artifact]],
) -> List[str]:
"""Dump artifacts markdown string for outputs.
Args:
node_outputs: maps from output name to output sepc proto.
name_to_artifacts: maps from output key to list of populated
artifacts.
Returns:
A list of dumped markdown string, each of which represents a channel.
"""
rendered_list = []
for name, spec in node_outputs.items():
# Need to look for materialized artifacts in the execution decision.
rendered_artifacts = "".join(
[
_render_artifact_as_mdstr(single_artifact)
for single_artifact in name_to_artifacts.get(name, [])
]
)
# There must be at least a channel in a input, and all channels
# in a input share the same artifact type.
artifact_type = spec.artifact_spec.type.name
rendered_list.append(
"## {name}\n\n**Type**: {channel_type}\n\n{artifacts}".format(
name=_sanitize_underscore(name),
channel_type=_sanitize_underscore(artifact_type),
artifacts=rendered_artifacts,
)
)
return rendered_list
src_str_inputs = "# Inputs:\n{}".format(
"".join(
_dump_input_populated_artifacts(
node_inputs=node.inputs.inputs,
name_to_artifacts=execution_info.input_dict or {},
)
)
or "No input."
)
src_str_outputs = "# Outputs:\n{}".format(
"".join(
_dump_output_populated_artifacts(
node_outputs=node.outputs.outputs,
name_to_artifacts=execution_info.output_dict or {},
)
)
or "No output."
)
outputs = [
{
"storage": "inline",
"source": "{exec_properties}\n\n{inputs}\n\n{outputs}".format(
exec_properties=src_str_exec_properties,
inputs=src_str_inputs,
outputs=src_str_outputs,
),
"type": "markdown",
}
]
# Add TensorBoard view for ModelRun outputs.
for name, spec in node.outputs.outputs.items():
if (
spec.artifact_spec.type.name
== standard_artifacts.ModelRun.TYPE_NAME
or spec.artifact_spec.type.name == ModelArtifact.TYPE_NAME
):
output_model = execution_info.output_dict[name][0]
source = output_model.uri
# For local artifact repository, use a path that is relative to
# the point where the local artifact folder is mounted as a volume
artifact_store = Client().active_stack.artifact_store
if isinstance(artifact_store, LocalArtifactStore):
source = os.path.relpath(source, artifact_store.path)
source = f"volume://local-artifact-store/{source}"
# Add TensorBoard view.
tensorboard_output = {
"type": "tensorboard",
"source": source,
}
outputs.append(tensorboard_output)
metadata_dict = {"outputs": outputs}
with open(metadata_ui_path, "w") as f:
json.dump(metadata_dict, f)
mount_config_map_op(config_map_name)
Mounts all key-value pairs found in the named Kubernetes ConfigMap.
All key-value pairs in the ConfigMap are mounted as environment variables.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_map_name |
str |
The name of the ConfigMap resource. |
required |
Returns:
Type | Description |
---|---|
Callable[[kfp.dsl._container_op.ContainerOp], NoneType] |
An OpFunc for mounting the ConfigMap. |
Source code in zenml/integrations/kubeflow/orchestrators/utils.py
def mount_config_map_op(
config_map_name: str,
) -> Callable[[dsl.ContainerOp], None]:
"""Mounts all key-value pairs found in the named Kubernetes ConfigMap.
All key-value pairs in the ConfigMap are mounted as environment variables.
Args:
config_map_name: The name of the ConfigMap resource.
Returns:
An OpFunc for mounting the ConfigMap.
"""
def mount_config_map(container_op: dsl.ContainerOp) -> None:
"""Mounts all key-value pairs found in the Kubernetes ConfigMap.
Args:
container_op: The container op to mount the ConfigMap.
"""
config_map_ref = k8s_client.V1ConfigMapEnvSource(
name=config_map_name, optional=True
)
container_op.container.add_env_from(
k8s_client.V1EnvFromSource(config_map_ref=config_map_ref)
)
return mount_config_map