Kubernetes
zenml.integrations.kubernetes
special
Kubernetes integration for Kubernetes-native orchestration.
The Kubernetes integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubernetes orchestrator with the CLI tool.
KubernetesIntegration (Integration)
Definition of Kubernetes integration for ZenML.
Source code in zenml/integrations/kubernetes/__init__.py
class KubernetesIntegration(Integration):
"""Definition of Kubernetes integration for ZenML."""
NAME = KUBERNETES
REQUIREMENTS = ["kubernetes>=21.7,<26"]
REQUIREMENTS_IGNORED_ON_UNINSTALL = [
"kfp", # it is used by many others
]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kubernetes integration.
Returns:
List of new stack component flavors.
"""
from zenml.integrations.kubernetes.flavors import (
KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor
)
return [KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Kubernetes integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of new stack component flavors. |
Source code in zenml/integrations/kubernetes/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kubernetes integration.
Returns:
List of new stack component flavors.
"""
from zenml.integrations.kubernetes.flavors import (
KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor
)
return [KubernetesOrchestratorFlavor, KubernetesStepOperatorFlavor]
flavors
special
Kubernetes integration flavors.
kubernetes_orchestrator_flavor
Kubernetes orchestrator flavor.
KubernetesOrchestratorConfig (BaseOrchestratorConfig, KubernetesOrchestratorSettings)
Configuration for the Kubernetes orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
incluster |
bool |
If |
kubernetes_context |
Optional[str] |
Name of a Kubernetes context to run pipelines in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory. |
kubernetes_namespace |
str |
Name of the Kubernetes namespace to be used.
If not provided, |
local |
bool |
If |
skip_local_validations |
bool |
If |
parallel_step_startup_waiting_period |
Optional[float] |
How long to wait in between starting parallel steps. This can be used to distribute server load when running pipelines with a huge amount of parallel steps. |
Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorConfig(
BaseOrchestratorConfig, KubernetesOrchestratorSettings
):
"""Configuration for the Kubernetes orchestrator.
Attributes:
incluster: If `True`, the orchestrator will run the pipeline inside the
same cluster in which it itself is running. This requires the client
to run in a Kubernetes pod itself. If set, the `kubernetes_context`
config option is ignored. If the stack component is linked to a
Kubernetes service connector, this field is ignored.
kubernetes_context: Name of a Kubernetes context to run pipelines in.
If the stack component is linked to a Kubernetes service connector,
this field is ignored. Otherwise, it is mandatory.
kubernetes_namespace: Name of the Kubernetes namespace to be used.
If not provided, `zenml` namespace will be used.
local: If `True`, the orchestrator will assume it is connected to a
local kubernetes cluster and will perform additional validations and
operations to allow using the orchestrator in combination with other
local stack components that store data in the local filesystem
(i.e. it will mount the local stores directory into the pipeline
containers).
skip_local_validations: If `True`, the local validations will be
skipped.
parallel_step_startup_waiting_period: How long to wait in between
starting parallel steps. This can be used to distribute server
load when running pipelines with a huge amount of parallel steps.
"""
incluster: bool = False
kubernetes_context: Optional[str] = None
kubernetes_namespace: str = "zenml"
local: bool = False
skip_local_validations: bool = False
parallel_step_startup_waiting_period: Optional[float] = None
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return not self.local
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
return self.local
@property
def is_synchronous(self) -> bool:
"""Whether the orchestrator runs synchronous or not.
Returns:
Whether the orchestrator runs synchronous or not.
"""
return self.synchronous
@property
def is_schedulable(self) -> bool:
"""Whether the orchestrator is schedulable or not.
Returns:
Whether the orchestrator is schedulable or not.
"""
return True
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
is_schedulable: bool
property
readonly
Whether the orchestrator is schedulable or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator is schedulable or not. |
is_synchronous: bool
property
readonly
Whether the orchestrator runs synchronous or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator runs synchronous or not. |
KubernetesOrchestratorFlavor (BaseOrchestratorFlavor)
Kubernetes orchestrator flavor.
Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorFlavor(BaseOrchestratorFlavor):
"""Kubernetes orchestrator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return KUBERNETES_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/kubernetes.png"
@property
def config_class(self) -> Type[KubernetesOrchestratorConfig]:
"""Returns `KubernetesOrchestratorConfig` config class.
Returns:
The config class.
"""
return KubernetesOrchestratorConfig
@property
def implementation_class(self) -> Type["KubernetesOrchestrator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kubernetes.orchestrators import (
KubernetesOrchestrator,
)
return KubernetesOrchestrator
config_class: Type[zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor.KubernetesOrchestratorConfig]
property
readonly
Returns KubernetesOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor.KubernetesOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[KubernetesOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KubernetesOrchestrator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
KubernetesOrchestratorSettings (BaseSettings)
Settings for the Kubernetes orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
synchronous |
bool |
If |
timeout |
int |
How many seconds to wait for synchronous runs. |
service_account_name |
Optional[str] |
Name of the service account to use for the orchestrator pod. If not provided, a new service account with "edit" permissions will be created. |
step_pod_service_account_name |
Optional[str] |
Name of the service account to use for the step pods. If not provided, the default service account will be used. |
privileged |
bool |
If the container should be run in privileged mode. |
pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Pod settings to apply to pods executing the steps. |
orchestrator_pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Pod settings to apply to the pod which is launching the actual steps. |
Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorSettings(BaseSettings):
"""Settings for the Kubernetes orchestrator.
Attributes:
synchronous: If `True`, the client running a pipeline using this
orchestrator waits until all steps finish running. If `False`,
the client returns immediately and the pipeline is executed
asynchronously. Defaults to `True`.
timeout: How many seconds to wait for synchronous runs. `0` means
to wait for an unlimited duration.
service_account_name: Name of the service account to use for the
orchestrator pod. If not provided, a new service account with "edit"
permissions will be created.
step_pod_service_account_name: Name of the service account to use for the
step pods. If not provided, the default service account will be used.
privileged: If the container should be run in privileged mode.
pod_settings: Pod settings to apply to pods executing the steps.
orchestrator_pod_settings: Pod settings to apply to the pod which is
launching the actual steps.
"""
synchronous: bool = True
timeout: int = 0
service_account_name: Optional[str] = None
step_pod_service_account_name: Optional[str] = None
privileged: bool = False
pod_settings: Optional[KubernetesPodSettings] = None
orchestrator_pod_settings: Optional[KubernetesPodSettings] = None
kubernetes_step_operator_flavor
Kubernetes step operator flavor.
KubernetesStepOperatorConfig (BaseStepOperatorConfig, KubernetesStepOperatorSettings)
Configuration for the Kubernetes step operator.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_namespace |
str |
Name of the Kubernetes namespace to be used. |
incluster |
bool |
If |
kubernetes_context |
Optional[str] |
Name of a Kubernetes context to run pipelines in. If the stack component is linked to a Kubernetes service connector, this field is ignored. Otherwise, it is mandatory. |
Source code in zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py
class KubernetesStepOperatorConfig(
BaseStepOperatorConfig, KubernetesStepOperatorSettings
):
"""Configuration for the Kubernetes step operator.
Attributes:
kubernetes_namespace: Name of the Kubernetes namespace to be used.
incluster: If `True`, the step operator will run the pipeline inside the
same cluster in which the orchestrator is running. For this to work,
the pod running the orchestrator needs permissions to create new
pods. If set, the `kubernetes_context` config option is ignored. If
the stack component is linked to a Kubernetes service connector,
this field is ignored.
kubernetes_context: Name of a Kubernetes context to run pipelines in.
If the stack component is linked to a Kubernetes service connector,
this field is ignored. Otherwise, it is mandatory.
"""
kubernetes_namespace: str = "zenml"
incluster: bool = False
kubernetes_context: Optional[str] = None
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
Returns:
True if this config is for a local component, False otherwise.
"""
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
KubernetesStepOperatorFlavor (BaseStepOperatorFlavor)
Kubernetes step operator flavor.
Source code in zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py
class KubernetesStepOperatorFlavor(BaseStepOperatorFlavor):
"""Kubernetes step operator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return KUBERNETES_STEP_OPERATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/kubernetes.png"
@property
def config_class(self) -> Type[KubernetesStepOperatorConfig]:
"""Returns `KubernetesStepOperatorConfig` config class.
Returns:
The config class.
"""
return KubernetesStepOperatorConfig
@property
def implementation_class(self) -> Type["KubernetesStepOperator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kubernetes.step_operators import (
KubernetesStepOperator,
)
return KubernetesStepOperator
config_class: Type[zenml.integrations.kubernetes.flavors.kubernetes_step_operator_flavor.KubernetesStepOperatorConfig]
property
readonly
Returns KubernetesStepOperatorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kubernetes.flavors.kubernetes_step_operator_flavor.KubernetesStepOperatorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[KubernetesStepOperator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KubernetesStepOperator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
KubernetesStepOperatorSettings (BaseSettings)
Settings for the Kubernetes step operator.
Attributes:
Name | Type | Description |
---|---|---|
pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Pod settings to apply to pods executing the steps. |
service_account_name |
Optional[str] |
Name of the service account to use for the pod. |
privileged |
bool |
If the container should be run in privileged mode. |
Source code in zenml/integrations/kubernetes/flavors/kubernetes_step_operator_flavor.py
class KubernetesStepOperatorSettings(BaseSettings):
"""Settings for the Kubernetes step operator.
Attributes:
pod_settings: Pod settings to apply to pods executing the steps.
service_account_name: Name of the service account to use for the pod.
privileged: If the container should be run in privileged mode.
"""
pod_settings: Optional[KubernetesPodSettings] = None
service_account_name: Optional[str] = None
privileged: bool = False
orchestrators
special
Kubernetes-native orchestration.
kube_utils
Utilities for Kubernetes related functions.
Internal interface: no backwards compatibility guarantees. Adjusted from https://github.com/tensorflow/tfx/blob/master/tfx/utils/kube_utils.py.
PodPhase (Enum)
Phase of the Kubernetes pod.
Pod phases are defined in https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
class PodPhase(enum.Enum):
"""Phase of the Kubernetes pod.
Pod phases are defined in
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
"""
PENDING = "Pending"
RUNNING = "Running"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
UNKNOWN = "Unknown"
create_edit_service_account(core_api, rbac_api, service_account_name, namespace, cluster_role_binding_name='zenml-edit')
Create a new Kubernetes service account with "edit" rights.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
kubernetes.client.CoreV1Api |
Client of Core V1 API of Kubernetes API. |
required |
rbac_api |
kubernetes.client.RbacAuthorizationV1Api |
Client of Rbac Authorization V1 API of Kubernetes API. |
required |
service_account_name |
str |
Name of the service account. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
required |
cluster_role_binding_name |
str |
Name of the cluster role binding. Defaults to "zenml-edit". |
'zenml-edit' |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_edit_service_account(
core_api: k8s_client.CoreV1Api,
rbac_api: k8s_client.RbacAuthorizationV1Api,
service_account_name: str,
namespace: str,
cluster_role_binding_name: str = "zenml-edit",
) -> None:
"""Create a new Kubernetes service account with "edit" rights.
Args:
core_api: Client of Core V1 API of Kubernetes API.
rbac_api: Client of Rbac Authorization V1 API of Kubernetes API.
service_account_name: Name of the service account.
namespace: Kubernetes namespace. Defaults to "default".
cluster_role_binding_name: Name of the cluster role binding.
Defaults to "zenml-edit".
"""
crb_manifest = build_cluster_role_binding_manifest_for_service_account(
name=cluster_role_binding_name,
role_name="edit",
service_account_name=service_account_name,
namespace=namespace,
)
_if_not_exists(rbac_api.create_cluster_role_binding)(body=crb_manifest)
sa_manifest = build_service_account_manifest(
name=service_account_name, namespace=namespace
)
_if_not_exists(core_api.create_namespaced_service_account)(
namespace=namespace,
body=sa_manifest,
)
create_namespace(core_api, namespace)
Create a Kubernetes namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
kubernetes.client.CoreV1Api |
Client of Core V1 API of Kubernetes API. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
required |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None:
"""Create a Kubernetes namespace.
Args:
core_api: Client of Core V1 API of Kubernetes API.
namespace: Kubernetes namespace. Defaults to "default".
"""
manifest = build_namespace_manifest(namespace)
_if_not_exists(core_api.create_namespace)(body=manifest)
get_pod(core_api, pod_name, namespace)
Get a pod from Kubernetes metadata API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
kubernetes.client.CoreV1Api |
Client of |
required |
pod_name |
str |
The name of the pod. |
required |
namespace |
str |
The namespace of the pod. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
When it sees unexpected errors from Kubernetes API. |
Returns:
Type | Description |
---|---|
Optional[kubernetes.client.V1Pod] |
The found pod object. None if it's not found. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def get_pod(
core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str
) -> Optional[k8s_client.V1Pod]:
"""Get a pod from Kubernetes metadata API.
Args:
core_api: Client of `CoreV1Api` of Kubernetes API.
pod_name: The name of the pod.
namespace: The namespace of the pod.
Raises:
RuntimeError: When it sees unexpected errors from Kubernetes API.
Returns:
The found pod object. None if it's not found.
"""
try:
return core_api.read_namespaced_pod(name=pod_name, namespace=namespace)
except k8s_client.rest.ApiException as e:
if e.status == 404:
return None
raise RuntimeError from e
is_inside_kubernetes()
Check whether we are inside a Kubernetes cluster or on a remote host.
Returns:
Type | Description |
---|---|
bool |
True if inside a Kubernetes cluster, else False. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def is_inside_kubernetes() -> bool:
"""Check whether we are inside a Kubernetes cluster or on a remote host.
Returns:
True if inside a Kubernetes cluster, else False.
"""
try:
k8s_config.load_incluster_config()
return True
except k8s_config.ConfigException:
return False
load_kube_config(incluster=False, context=None)
Load the Kubernetes client config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incluster |
bool |
Whether to load the in-cluster config. |
False |
context |
Optional[str] |
Name of the Kubernetes context. If not provided, uses the
currently active context. Will be ignored if |
None |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def load_kube_config(
incluster: bool = False, context: Optional[str] = None
) -> None:
"""Load the Kubernetes client config.
Args:
incluster: Whether to load the in-cluster config.
context: Name of the Kubernetes context. If not provided, uses the
currently active context. Will be ignored if `incluster` is True.
"""
if incluster:
k8s_config.load_incluster_config()
else:
k8s_config.load_kube_config(context=context)
pod_failed(pod)
Check if pod status is 'Failed'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod |
kubernetes.client.V1Pod |
Kubernetes pod. |
required |
Returns:
Type | Description |
---|---|
bool |
True if pod status is 'Failed' else False. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_failed(pod: k8s_client.V1Pod) -> bool:
"""Check if pod status is 'Failed'.
Args:
pod: Kubernetes pod.
Returns:
True if pod status is 'Failed' else False.
"""
return pod.status.phase == PodPhase.FAILED.value # type: ignore[no-any-return]
pod_is_done(pod)
Check if pod status is 'Succeeded'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod |
kubernetes.client.V1Pod |
Kubernetes pod. |
required |
Returns:
Type | Description |
---|---|
bool |
True if pod status is 'Succeeded' else False. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_is_done(pod: k8s_client.V1Pod) -> bool:
"""Check if pod status is 'Succeeded'.
Args:
pod: Kubernetes pod.
Returns:
True if pod status is 'Succeeded' else False.
"""
return pod.status.phase == PodPhase.SUCCEEDED.value # type: ignore[no-any-return]
pod_is_not_pending(pod)
Check if pod status is not 'Pending'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod |
kubernetes.client.V1Pod |
Kubernetes pod. |
required |
Returns:
Type | Description |
---|---|
bool |
False if the pod status is 'Pending' else True. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_is_not_pending(pod: k8s_client.V1Pod) -> bool:
"""Check if pod status is not 'Pending'.
Args:
pod: Kubernetes pod.
Returns:
False if the pod status is 'Pending' else True.
"""
return pod.status.phase != PodPhase.PENDING.value # type: ignore[no-any-return]
sanitize_pod_name(pod_name)
Sanitize pod names so they conform to Kubernetes pod naming convention.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod_name |
str |
Arbitrary input pod name. |
required |
Returns:
Type | Description |
---|---|
str |
Sanitized pod name. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def sanitize_pod_name(pod_name: str) -> str:
"""Sanitize pod names so they conform to Kubernetes pod naming convention.
Args:
pod_name: Arbitrary input pod name.
Returns:
Sanitized pod name.
"""
pod_name = re.sub(r"[^a-z0-9-]", "-", pod_name.lower())
pod_name = re.sub(r"^[-]+", "", pod_name)
return re.sub(r"[-]+", "-", pod_name)
wait_pod(kube_client_fn, pod_name, namespace, exit_condition_lambda, timeout_sec=0, exponential_backoff=False, stream_logs=False)
Wait for a pod to meet an exit condition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kube_client_fn |
Callable[[], kubernetes.client.ApiClient] |
the kube client fn is a function that is called
periodically and is used to get a |
required |
pod_name |
str |
The name of the pod. |
required |
namespace |
str |
The namespace of the pod. |
required |
exit_condition_lambda |
Callable[[kubernetes.client.V1Pod], bool] |
A lambda which will be called periodically to wait for a pod to exit. The function returns True to exit. |
required |
timeout_sec |
int |
Timeout in seconds to wait for pod to reach exit condition, or 0 to wait for an unlimited duration. Defaults to unlimited. |
0 |
exponential_backoff |
bool |
Whether to use exponential back off for polling. Defaults to False. |
False |
stream_logs |
bool |
Whether to stream the pod logs to
|
False |
Exceptions:
Type | Description |
---|---|
RuntimeError |
when the function times out. |
Returns:
Type | Description |
---|---|
kubernetes.client.V1Pod |
The pod object which meets the exit condition. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def wait_pod(
kube_client_fn: Callable[[], k8s_client.ApiClient],
pod_name: str,
namespace: str,
exit_condition_lambda: Callable[[k8s_client.V1Pod], bool],
timeout_sec: int = 0,
exponential_backoff: bool = False,
stream_logs: bool = False,
) -> k8s_client.V1Pod:
"""Wait for a pod to meet an exit condition.
Args:
kube_client_fn: the kube client fn is a function that is called
periodically and is used to get a `CoreV1Api` client for
the Kubernetes API. It should cache the client to avoid
unnecessary overhead but should also instantiate a new client if
the previous one is using credentials that are about to expire.
pod_name: The name of the pod.
namespace: The namespace of the pod.
exit_condition_lambda: A lambda
which will be called periodically to wait for a pod to exit. The
function returns True to exit.
timeout_sec: Timeout in seconds to wait for pod to reach exit
condition, or 0 to wait for an unlimited duration.
Defaults to unlimited.
exponential_backoff: Whether to use exponential back off for polling.
Defaults to False.
stream_logs: Whether to stream the pod logs to
`zenml.logger.info()`. Defaults to False.
Raises:
RuntimeError: when the function times out.
Returns:
The pod object which meets the exit condition.
"""
start_time = datetime.datetime.utcnow()
# Link to exponential back-off algorithm used here:
# https://cloud.google.com/storage/docs/exponential-backoff
backoff_interval = 1
maximum_backoff = 32
logged_lines = 0
while True:
kube_client = kube_client_fn()
core_api = k8s_client.CoreV1Api(kube_client)
resp = get_pod(core_api, pod_name, namespace)
# Stream logs to `zenml.logger.info()`.
# TODO: can we do this without parsing all logs every time?
if stream_logs and pod_is_not_pending(resp):
response = core_api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
_preload_content=False,
)
raw_data = response.data
decoded_log = raw_data.decode("utf-8", errors="replace")
logs = decoded_log.splitlines()
if len(logs) > logged_lines:
for line in logs[logged_lines:]:
logger.info(line)
logged_lines = len(logs)
# Raise an error if the pod failed.
if pod_failed(resp):
raise RuntimeError(f"Pod `{namespace}:{pod_name}` failed.")
# Check if pod is in desired state (e.g. finished / running / ...).
if exit_condition_lambda(resp):
return resp
# Check if wait timed out.
elapse_time = datetime.datetime.utcnow() - start_time
if elapse_time.seconds >= timeout_sec and timeout_sec != 0:
raise RuntimeError(
f"Waiting for pod `{namespace}:{pod_name}` timed out after "
f"{timeout_sec} seconds."
)
# Wait (using exponential backoff).
time.sleep(backoff_interval)
if exponential_backoff and backoff_interval < maximum_backoff:
backoff_interval *= 2
kubernetes_orchestrator
Kubernetes-native orchestrator.
KubernetesOrchestrator (ContainerizedOrchestrator)
Orchestrator for running ZenML pipelines using native Kubernetes.
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
class KubernetesOrchestrator(ContainerizedOrchestrator):
"""Orchestrator for running ZenML pipelines using native Kubernetes."""
_k8s_client: Optional[k8s_client.ApiClient] = None
def get_kube_client(
self, incluster: Optional[bool] = None
) -> k8s_client.ApiClient:
"""Getter for the Kubernetes API client.
Args:
incluster: Whether to use the in-cluster config or not. Overrides
the `incluster` setting in the config.
Returns:
The Kubernetes API client.
Raises:
RuntimeError: if the Kubernetes connector behaves unexpectedly.
"""
if incluster is None:
incluster = self.config.incluster
if incluster:
kube_utils.load_kube_config(
incluster=incluster,
context=self.config.kubernetes_context,
)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
# Refresh the client also if the connector has expired
if self._k8s_client and not self.connector_has_expired():
return self._k8s_client
connector = self.get_connector()
if connector:
client = connector.connect()
if not isinstance(client, k8s_client.ApiClient):
raise RuntimeError(
f"Expected a k8s_client.ApiClient while trying to use the "
f"linked connector, but got {type(client)}."
)
self._k8s_client = client
else:
kube_utils.load_kube_config(
incluster=incluster,
context=self.config.kubernetes_context,
)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
@property
def _k8s_core_api(self) -> k8s_client.CoreV1Api:
"""Getter for the Kubernetes Core API client.
Returns:
The Kubernetes Core API client.
"""
return k8s_client.CoreV1Api(self.get_kube_client())
@property
def _k8s_batch_api(self) -> k8s_client.BatchV1Api:
"""Getter for the Kubernetes Batch API client.
Returns:
The Kubernetes Batch API client.
"""
return k8s_client.BatchV1Api(self.get_kube_client())
@property
def _k8s_rbac_api(self) -> k8s_client.RbacAuthorizationV1Api:
"""Getter for the Kubernetes RBAC API client.
Returns:
The Kubernetes RBAC API client.
"""
return k8s_client.RbacAuthorizationV1Api(self.get_kube_client())
@property
def config(self) -> KubernetesOrchestratorConfig:
"""Returns the `KubernetesOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(KubernetesOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Kubernetes orchestrator.
Returns:
The settings class.
"""
return KubernetesOrchestratorSettings
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
"""Get list of configured Kubernetes contexts and the active context.
Raises:
RuntimeError: if the Kubernetes configuration cannot be loaded.
Returns:
context_name: List of configured Kubernetes contexts
active_context_name: Name of the active Kubernetes context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException as e:
raise RuntimeError(
"Could not load the Kubernetes configuration"
) from e
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
@property
def validator(self) -> Optional[StackValidator]:
"""Defines the validator that checks whether the stack is valid.
Returns:
Stack validator.
"""
def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]:
"""Validates that the stack contains no local components.
Args:
stack: The stack.
Returns:
Whether the stack is valid or not.
An explanation why the stack is invalid, if applicable.
"""
container_registry = stack.container_registry
# should not happen, because the stack validation takes care of
# this, but just in case
assert container_registry is not None
kubernetes_context = self.config.kubernetes_context
msg = f"'{self.name}' Kubernetes orchestrator error: "
if not self.connector:
if not kubernetes_context:
return False, (
f"{msg}you must either link this stack component to a "
"Kubernetes service connector (see the 'zenml "
"orchestrator connect' CLI command) or explicitly set "
"the `kubernetes_context` attribute to the name of the "
"Kubernetes config context pointing to the cluster "
"where you would like to run pipelines."
)
contexts, active_context = self.get_kubernetes_contexts()
if kubernetes_context not in contexts:
return False, (
f"{msg}could not find a Kubernetes context named "
f"'{kubernetes_context}' in the local "
"Kubernetes configuration. Please make sure that the "
"Kubernetes cluster is running and that the kubeconfig "
"file is configured correctly. To list all configured "
"contexts, run:\n\n"
" `kubectl config get-contexts`\n"
)
if kubernetes_context != active_context:
logger.warning(
f"{msg}the Kubernetes context '{kubernetes_context}' " # nosec
f"configured for the Kubernetes orchestrator is not "
f"the same as the active context in the local "
f"Kubernetes configuration. If this is not deliberate,"
f" you should update the orchestrator's "
f"`kubernetes_context` field by running:\n\n"
f" `zenml orchestrator update {self.name} "
f"--kubernetes_context={active_context}`\n"
f"To list all configured contexts, run:\n\n"
f" `kubectl config get-contexts`\n"
f"To set the active context to be the same as the one "
f"configured in the Kubernetes orchestrator and "
f"silence this warning, run:\n\n"
f" `kubectl config use-context "
f"{kubernetes_context}`\n"
)
silence_local_validations_msg = (
f"To silence this warning, set the "
f"`skip_local_validations` attribute to True in the "
f"orchestrator configuration by running:\n\n"
f" 'zenml orchestrator update {self.name} "
f"--skip_local_validations=True'\n"
)
if (
not self.config.skip_local_validations
and not self.config.is_local
):
# if the orchestrator is not running in a local k3d cluster,
# we cannot have any other local components in our stack,
# because we cannot mount the local path into the container.
# This may result in problems when running the pipeline, because
# the local components will not be available inside the
# kubernetes containers.
# go through all stack components and identify those that
# advertise a local path where they persist information that
# they need to be available when running pipelines.
for stack_comp in stack.components.values():
if stack_comp.local_path is None:
continue
return False, (
f"{msg}the Kubernetes orchestrator is configured to "
f"run pipelines in a remote Kubernetes cluster but the "
f"'{stack_comp.name}' {stack_comp.type.value} "
f"is a local stack component "
f"and will not be available in the Kubernetes pipeline "
f"step.\nPlease ensure that you always use non-local "
f"stack components with a remote Kubernetes orchestrator, "
f"otherwise you may run into pipeline execution "
f"problems. You should use a flavor of "
f"{stack_comp.type.value} other than "
f"'{stack_comp.flavor}'.\n"
+ silence_local_validations_msg
)
# if the orchestrator is remote, the container registry must
# also be remote.
if container_registry.config.is_local:
return False, (
f"{msg}the Kubernetes orchestrator is configured to "
"run pipelines in a remote Kubernetes cluster but the "
f"'{container_registry.name}' container registry URI "
f"'{container_registry.config.uri}' "
f"points to a local container registry. Please ensure "
f"that you always use non-local stack components with "
f"a remote Kubernetes orchestrator, otherwise you will "
f"run into problems. You should use a flavor of "
f"container registry other than "
f"'{container_registry.flavor}'.\n"
+ silence_local_validations_msg
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_local_requirements,
)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs the pipeline in Kubernetes.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
RuntimeError: If the Kubernetes orchestrator is not configured.
"""
for step_name, step in deployment.step_configurations.items():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not yet supported for "
"the Kubernetes orchestrator, ignoring resource "
"configuration for step %s.",
step_name,
)
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
pod_name = kube_utils.sanitize_pod_name(orchestrator_run_name)
assert stack.container_registry
# Get Docker image for the orchestrator pod
try:
image = self.get_image(deployment=deployment)
except KeyError:
# If no generic pipeline image exists (which means all steps have
# custom builds) we use a random step image as all of them include
# dependencies for the active stack
pipeline_step_name = next(iter(deployment.step_configurations))
image = self.get_image(
deployment=deployment, step_name=pipeline_step_name
)
# Build entrypoint command and args for the orchestrator pod.
# This will internally also build the command/args for all step pods.
command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
kubernetes_namespace=self.config.kubernetes_namespace,
)
settings = cast(
KubernetesOrchestratorSettings, self.get_settings(deployment)
)
# Authorize pod to run Kubernetes commands inside the cluster.
service_account_name = self._get_service_account_name(settings)
# Schedule as CRON job if CRON schedule is given.
if deployment.schedule:
if not deployment.schedule.cron_expression:
raise RuntimeError(
"The Kubernetes orchestrator only supports scheduling via "
"CRON jobs, but the run was configured with a manual "
"schedule. Use `Schedule(cron_expression=...)` instead."
)
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image,
command=command,
args=args,
service_account_name=service_account_name,
privileged=False,
pod_settings=settings.orchestrator_pod_settings,
env=environment,
mount_local_stores=self.config.is_local,
)
self._k8s_batch_api.create_namespaced_cron_job(
body=cron_job_manifest,
namespace=self.config.kubernetes_namespace,
)
logger.info(
f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
f'`"{cron_expression}"`.'
)
return
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image,
command=command,
args=args,
privileged=False,
pod_settings=settings.orchestrator_pod_settings,
service_account_name=service_account_name,
env=environment,
mount_local_stores=self.config.is_local,
)
self._k8s_core_api.create_namespaced_pod(
namespace=self.config.kubernetes_namespace,
body=pod_manifest,
)
# Wait for the orchestrator pod to finish and stream logs.
if settings.synchronous:
logger.info("Waiting for Kubernetes orchestrator pod...")
kube_utils.wait_pod(
kube_client_fn=self.get_kube_client,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
timeout_sec=settings.timeout,
stream_logs=True,
)
else:
logger.info(
f"Orchestration started asynchronously in pod "
f"`{self.config.kubernetes_namespace}:{pod_name}`. "
f"Run the following command to inspect the logs: "
f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
)
def _get_service_account_name(
self, settings: KubernetesOrchestratorSettings
) -> str:
"""Returns the service account name to use for the orchestrator pod.
If the user has not specified a service account name in the settings,
we create a new service account with the required permissions.
Args:
settings: The orchestrator settings.
Returns:
The service account name.
"""
if settings.service_account_name:
return settings.service_account_name
else:
service_account_name = "zenml-service-account"
kube_utils.create_edit_service_account(
core_api=self._k8s_core_api,
rbac_api=self._k8s_rbac_api,
service_account_name=service_account_name,
namespace=self.config.kubernetes_namespace,
)
return service_account_name
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
)
config: KubernetesOrchestratorConfig
property
readonly
Returns the KubernetesOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
KubernetesOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Kubernetes orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Defines the validator that checks whether the stack is valid.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
Stack validator. |
get_kube_client(self, incluster=None)
Getter for the Kubernetes API client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
incluster |
Optional[bool] |
Whether to use the in-cluster config or not. Overrides
the |
None |
Returns:
Type | Description |
---|---|
kubernetes.client.ApiClient |
The Kubernetes API client. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the Kubernetes connector behaves unexpectedly. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_kube_client(
self, incluster: Optional[bool] = None
) -> k8s_client.ApiClient:
"""Getter for the Kubernetes API client.
Args:
incluster: Whether to use the in-cluster config or not. Overrides
the `incluster` setting in the config.
Returns:
The Kubernetes API client.
Raises:
RuntimeError: if the Kubernetes connector behaves unexpectedly.
"""
if incluster is None:
incluster = self.config.incluster
if incluster:
kube_utils.load_kube_config(
incluster=incluster,
context=self.config.kubernetes_context,
)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
# Refresh the client also if the connector has expired
if self._k8s_client and not self.connector_has_expired():
return self._k8s_client
connector = self.get_connector()
if connector:
client = connector.connect()
if not isinstance(client, k8s_client.ApiClient):
raise RuntimeError(
f"Expected a k8s_client.ApiClient while trying to use the "
f"linked connector, but got {type(client)}."
)
self._k8s_client = client
else:
kube_utils.load_kube_config(
incluster=incluster,
context=self.config.kubernetes_context,
)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
get_kubernetes_contexts(self)
Get list of configured Kubernetes contexts and the active context.
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the Kubernetes configuration cannot be loaded. |
Returns:
Type | Description |
---|---|
context_name |
List of configured Kubernetes contexts active_context_name: Name of the active Kubernetes context. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
"""Get list of configured Kubernetes contexts and the active context.
Raises:
RuntimeError: if the Kubernetes configuration cannot be loaded.
Returns:
context_name: List of configured Kubernetes contexts
active_context_name: Name of the active Kubernetes context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException as e:
raise RuntimeError(
"Could not load the Kubernetes configuration"
) from e
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack, environment)
Runs the pipeline in Kubernetes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the Kubernetes orchestrator is not configured. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Any:
"""Runs the pipeline in Kubernetes.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
RuntimeError: If the Kubernetes orchestrator is not configured.
"""
for step_name, step in deployment.step_configurations.items():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not yet supported for "
"the Kubernetes orchestrator, ignoring resource "
"configuration for step %s.",
step_name,
)
pipeline_name = deployment.pipeline_configuration.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
pod_name = kube_utils.sanitize_pod_name(orchestrator_run_name)
assert stack.container_registry
# Get Docker image for the orchestrator pod
try:
image = self.get_image(deployment=deployment)
except KeyError:
# If no generic pipeline image exists (which means all steps have
# custom builds) we use a random step image as all of them include
# dependencies for the active stack
pipeline_step_name = next(iter(deployment.step_configurations))
image = self.get_image(
deployment=deployment, step_name=pipeline_step_name
)
# Build entrypoint command and args for the orchestrator pod.
# This will internally also build the command/args for all step pods.
command = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
deployment_id=deployment.id,
kubernetes_namespace=self.config.kubernetes_namespace,
)
settings = cast(
KubernetesOrchestratorSettings, self.get_settings(deployment)
)
# Authorize pod to run Kubernetes commands inside the cluster.
service_account_name = self._get_service_account_name(settings)
# Schedule as CRON job if CRON schedule is given.
if deployment.schedule:
if not deployment.schedule.cron_expression:
raise RuntimeError(
"The Kubernetes orchestrator only supports scheduling via "
"CRON jobs, but the run was configured with a manual "
"schedule. Use `Schedule(cron_expression=...)` instead."
)
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image,
command=command,
args=args,
service_account_name=service_account_name,
privileged=False,
pod_settings=settings.orchestrator_pod_settings,
env=environment,
mount_local_stores=self.config.is_local,
)
self._k8s_batch_api.create_namespaced_cron_job(
body=cron_job_manifest,
namespace=self.config.kubernetes_namespace,
)
logger.info(
f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
f'`"{cron_expression}"`.'
)
return
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image,
command=command,
args=args,
privileged=False,
pod_settings=settings.orchestrator_pod_settings,
service_account_name=service_account_name,
env=environment,
mount_local_stores=self.config.is_local,
)
self._k8s_core_api.create_namespaced_pod(
namespace=self.config.kubernetes_namespace,
body=pod_manifest,
)
# Wait for the orchestrator pod to finish and stream logs.
if settings.synchronous:
logger.info("Waiting for Kubernetes orchestrator pod...")
kube_utils.wait_pod(
kube_client_fn=self.get_kube_client,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
timeout_sec=settings.timeout,
stream_logs=True,
)
else:
logger.info(
f"Orchestration started asynchronously in pod "
f"`{self.config.kubernetes_namespace}:{pod_name}`. "
f"Run the following command to inspect the logs: "
f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
)
kubernetes_orchestrator_entrypoint
Entrypoint of the Kubernetes master/orchestrator pod.
main()
Entrypoint of the k8s master/orchestrator pod.
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
def main() -> None:
"""Entrypoint of the k8s master/orchestrator pod."""
# Log to the container's stdout so it can be streamed by the client.
logger.info("Kubernetes orchestrator pod started.")
# Parse / extract args.
args = parse_args()
orchestrator_run_id = socket.gethostname()
deployment_config = Client().get_deployment(args.deployment_id)
pipeline_dag = {
step_name: step.spec.upstream_steps
for step_name, step in deployment_config.step_configurations.items()
}
step_command = StepEntrypointConfiguration.get_entrypoint_command()
active_stack = Client().active_stack
mount_local_stores = active_stack.orchestrator.config.is_local
# Get a Kubernetes client from the active Kubernetes orchestrator, but
# override the `incluster` setting to `True` since we are running inside
# the Kubernetes cluster.
orchestrator = active_stack.orchestrator
assert isinstance(orchestrator, KubernetesOrchestrator)
kube_client = orchestrator.get_kube_client(incluster=True)
core_api = k8s_client.CoreV1Api(kube_client)
def run_step_on_kubernetes(step_name: str) -> None:
"""Run a pipeline step in a separate Kubernetes pod.
Args:
step_name: Name of the step.
"""
# Define Kubernetes pod name.
pod_name = f"{orchestrator_run_id}-{step_name}"
pod_name = kube_utils.sanitize_pod_name(pod_name)
image = KubernetesOrchestrator.get_image(
deployment=deployment_config, step_name=step_name
)
step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name, deployment_id=deployment_config.id
)
step_config = deployment_config.step_configurations[step_name].config
kubernetes_settings = step_config.settings.get(
"orchestrator.kubernetes", None
)
orchestrator_settings = {}
if kubernetes_settings is not None:
orchestrator_settings = kubernetes_settings.model_dump()
settings = KubernetesOrchestratorSettings.model_validate(
orchestrator_settings
)
env = get_config_environment_vars()
env[ENV_ZENML_KUBERNETES_RUN_ID] = orchestrator_run_id
# Define Kubernetes pod manifest.
pod_manifest = build_pod_manifest(
pod_name=pod_name,
run_name=args.run_name,
pipeline_name=deployment_config.pipeline_configuration.name,
image_name=image,
command=step_command,
args=step_args,
env=env,
privileged=settings.privileged,
pod_settings=settings.pod_settings,
service_account_name=settings.step_pod_service_account_name
or settings.service_account_name,
mount_local_stores=mount_local_stores,
)
# Create and run pod.
core_api.create_namespaced_pod(
namespace=args.kubernetes_namespace,
body=pod_manifest,
)
# Wait for pod to finish.
logger.info(f"Waiting for pod of step `{step_name}` to start...")
kube_utils.wait_pod(
kube_client_fn=lambda: orchestrator.get_kube_client(
incluster=True
),
pod_name=pod_name,
namespace=args.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
stream_logs=True,
)
logger.info(f"Pod of step `{step_name}` completed.")
parallel_node_startup_waiting_period = (
orchestrator.config.parallel_step_startup_waiting_period or 0.0
)
ThreadedDagRunner(
dag=pipeline_dag,
run_fn=run_step_on_kubernetes,
parallel_node_startup_waiting_period=parallel_node_startup_waiting_period,
).run()
logger.info("Orchestration pod completed.")
parse_args()
Parse entrypoint arguments.
Returns:
Type | Description |
---|---|
Namespace |
Parsed args. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
"""Parse entrypoint arguments.
Returns:
Parsed args.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--run_name", type=str, required=True)
parser.add_argument("--deployment_id", type=str, required=True)
parser.add_argument("--kubernetes_namespace", type=str, required=True)
return parser.parse_args()
kubernetes_orchestrator_entrypoint_configuration
Entrypoint configuration for the Kubernetes master/orchestrator pod.
KubernetesOrchestratorEntrypointConfiguration
Entrypoint configuration for the k8s master/orchestrator pod.
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
class KubernetesOrchestratorEntrypointConfiguration:
"""Entrypoint configuration for the k8s master/orchestrator pod."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
DEPLOYMENT_ID_OPTION,
NAMESPACE_OPTION,
}
return options
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
]
return command
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
deployment_id: "UUID",
kubernetes_namespace: str,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
deployment_id: ID of the deployment.
kubernetes_namespace: Name of the Kubernetes namespace.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
f"--{NAMESPACE_OPTION}",
kubernetes_namespace,
]
return args
get_entrypoint_arguments(run_name, deployment_id, kubernetes_namespace)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
str |
Name of the ZenML run. |
required |
deployment_id |
UUID |
ID of the deployment. |
required |
kubernetes_namespace |
str |
Name of the Kubernetes namespace. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of entrypoint arguments. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
deployment_id: "UUID",
kubernetes_namespace: str,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
deployment_id: ID of the deployment.
kubernetes_namespace: Name of the Kubernetes namespace.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{DEPLOYMENT_ID_OPTION}",
str(deployment_id),
f"--{NAMESPACE_OPTION}",
kubernetes_namespace,
]
return args
get_entrypoint_command()
classmethod
Returns a command that runs the entrypoint module.
Returns:
Type | Description |
---|---|
List[str] |
Entrypoint command. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
]
return command
get_entrypoint_options()
classmethod
Gets all the options required for running this entrypoint.
Returns:
Type | Description |
---|---|
Set[str] |
Entrypoint options. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
DEPLOYMENT_ID_OPTION,
NAMESPACE_OPTION,
}
return options
manifest_utils
Utility functions for building manifests for k8s pods.
add_local_stores_mount(pod_spec)
Makes changes in place to the configuration of the pod spec.
Configures mounted volumes for stack components that write to a local path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod_spec |
kubernetes.client.V1PodSpec |
The pod spec to update. |
required |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def add_local_stores_mount(
pod_spec: k8s_client.V1PodSpec,
) -> None:
"""Makes changes in place to the configuration of the pod spec.
Configures mounted volumes for stack components that write to a local
path.
Args:
pod_spec: The pod spec to update.
"""
assert len(pod_spec.containers) == 1
container_spec: k8s_client.V1Container = pod_spec.containers[0]
stack = Client().active_stack
stack.check_local_paths()
local_stores_path = GlobalConfiguration().local_stores_path
host_path = k8s_client.V1HostPathVolumeSource(
path=local_stores_path, type="Directory"
)
pod_spec.volumes = pod_spec.volumes or []
pod_spec.volumes.append(
k8s_client.V1Volume(
name="local-stores",
host_path=host_path,
)
)
container_spec.volume_mounts = container_spec.volume_mounts or []
container_spec.volume_mounts.append(
k8s_client.V1VolumeMount(
name="local-stores",
mount_path=local_stores_path,
)
)
if sys.platform == "win32":
# File permissions are not checked on Windows. This if clause
# prevents mypy from complaining about unused 'type: ignore'
# statements
pass
else:
# Run KFP containers in the context of the local UID/GID
# to ensure that the local stores can be shared
# with the local pipeline runs.
pod_spec.security_context = k8s_client.V1SecurityContext(
run_as_user=os.getuid(),
run_as_group=os.getgid(),
)
container_spec.env = container_spec.env or []
container_spec.env.append(
k8s_client.V1EnvVar(
name=ENV_ZENML_LOCAL_STORES_PATH,
value=local_stores_path,
)
)
add_pod_settings(pod_spec, settings)
Updates pod spec
fields in place if passed in orchestrator settings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod_spec |
kubernetes.client.V1PodSpec |
Pod spec to update. |
required |
settings |
KubernetesPodSettings |
Pod settings to apply. |
required |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def add_pod_settings(
pod_spec: k8s_client.V1PodSpec,
settings: KubernetesPodSettings,
) -> None:
"""Updates pod `spec` fields in place if passed in orchestrator settings.
Args:
pod_spec: Pod spec to update.
settings: Pod settings to apply.
"""
if settings.node_selectors:
pod_spec.node_selector = settings.node_selectors
if settings.affinity:
pod_spec.affinity = settings.affinity
if settings.tolerations:
pod_spec.tolerations = settings.tolerations
for container in pod_spec.containers:
assert isinstance(container, k8s_client.V1Container)
container._resources = settings.resources
if settings.volume_mounts:
if container.volume_mounts:
container.volume_mounts.extend(settings.volume_mounts)
else:
container.volume_mounts = settings.volume_mounts
if settings.volumes:
if pod_spec.volumes:
pod_spec.volumes.extend(settings.volumes)
else:
pod_spec.volumes = settings.volumes
if settings.host_ipc:
pod_spec.host_ipc = settings.host_ipc
build_cluster_role_binding_manifest_for_service_account(name, role_name, service_account_name, namespace='default')
Build a manifest for a cluster role binding of a service account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the cluster role binding. |
required |
role_name |
str |
Name of the role. |
required |
service_account_name |
str |
Name of the service account. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for a cluster role binding of a service account. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_cluster_role_binding_manifest_for_service_account(
name: str,
role_name: str,
service_account_name: str,
namespace: str = "default",
) -> Dict[str, Any]:
"""Build a manifest for a cluster role binding of a service account.
Args:
name: Name of the cluster role binding.
role_name: Name of the role.
service_account_name: Name of the service account.
namespace: Kubernetes namespace. Defaults to "default".
Returns:
Manifest for a cluster role binding of a service account.
"""
return {
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRoleBinding",
"metadata": {"name": name},
"subjects": [
{
"kind": "ServiceAccount",
"name": service_account_name,
"namespace": namespace,
}
],
"roleRef": {
"kind": "ClusterRole",
"name": role_name,
"apiGroup": "rbac.authorization.k8s.io",
},
}
build_cron_job_manifest(cron_expression, pod_name, run_name, pipeline_name, image_name, command, args, privileged, pod_settings=None, service_account_name=None, env=None, mount_local_stores=False)
Create a manifest for launching a pod as scheduled CRON job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cron_expression |
str |
CRON job schedule expression, e.g. " * * * ". |
required |
pod_name |
str |
Name of the pod. |
required |
run_name |
str |
Name of the ZenML run. |
required |
pipeline_name |
str |
Name of the ZenML pipeline. |
required |
image_name |
str |
Name of the Docker image. |
required |
command |
List[str] |
Command to execute the entrypoint in the pod. |
required |
args |
List[str] |
Arguments provided to the entrypoint command. |
required |
privileged |
bool |
Whether to run the container in privileged mode. |
required |
pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Optional settings for the pod. |
None |
service_account_name |
Optional[str] |
Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster. |
None |
env |
Optional[Dict[str, str]] |
Environment variables to set. |
None |
mount_local_stores |
bool |
Whether to mount the local stores path inside the pod. |
False |
Returns:
Type | Description |
---|---|
kubernetes.client.V1CronJob |
CRON job manifest. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_cron_job_manifest(
cron_expression: str,
pod_name: str,
run_name: str,
pipeline_name: str,
image_name: str,
command: List[str],
args: List[str],
privileged: bool,
pod_settings: Optional[KubernetesPodSettings] = None,
service_account_name: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
mount_local_stores: bool = False,
) -> k8s_client.V1CronJob:
"""Create a manifest for launching a pod as scheduled CRON job.
Args:
cron_expression: CRON job schedule expression, e.g. "* * * * *".
pod_name: Name of the pod.
run_name: Name of the ZenML run.
pipeline_name: Name of the ZenML pipeline.
image_name: Name of the Docker image.
command: Command to execute the entrypoint in the pod.
args: Arguments provided to the entrypoint command.
privileged: Whether to run the container in privileged mode.
pod_settings: Optional settings for the pod.
service_account_name: Optional name of a service account.
Can be used to assign certain roles to a pod, e.g., to allow it to
run Kubernetes commands from within the cluster.
env: Environment variables to set.
mount_local_stores: Whether to mount the local stores path inside the
pod.
Returns:
CRON job manifest.
"""
pod_manifest = build_pod_manifest(
pod_name=pod_name,
run_name=run_name,
pipeline_name=pipeline_name,
image_name=image_name,
command=command,
args=args,
privileged=privileged,
pod_settings=pod_settings,
service_account_name=service_account_name,
env=env,
mount_local_stores=mount_local_stores,
)
job_spec = k8s_client.V1CronJobSpec(
schedule=cron_expression,
job_template=k8s_client.V1JobTemplateSpec(
metadata=pod_manifest.metadata,
spec=k8s_client.V1JobSpec(
template=k8s_client.V1PodTemplateSpec(
metadata=pod_manifest.metadata,
spec=pod_manifest.spec,
),
),
),
)
job_manifest = k8s_client.V1CronJob(
kind="CronJob",
api_version="batch/v1",
metadata=pod_manifest.metadata,
spec=job_spec,
)
return job_manifest
build_namespace_manifest(namespace)
Build the manifest for a new namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace |
str |
Kubernetes namespace. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest of the new namespace. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_namespace_manifest(namespace: str) -> Dict[str, Any]:
"""Build the manifest for a new namespace.
Args:
namespace: Kubernetes namespace.
Returns:
Manifest of the new namespace.
"""
return {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": namespace,
},
}
build_pod_manifest(pod_name, run_name, pipeline_name, image_name, command, args, privileged, pod_settings=None, service_account_name=None, env=None, mount_local_stores=False)
Build a Kubernetes pod manifest for a ZenML run or step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod_name |
str |
Name of the pod. |
required |
run_name |
str |
Name of the ZenML run. |
required |
pipeline_name |
str |
Name of the ZenML pipeline. |
required |
image_name |
str |
Name of the Docker image. |
required |
command |
List[str] |
Command to execute the entrypoint in the pod. |
required |
args |
List[str] |
Arguments provided to the entrypoint command. |
required |
privileged |
bool |
Whether to run the container in privileged mode. |
required |
pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Optional settings for the pod. |
None |
service_account_name |
Optional[str] |
Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster. |
None |
env |
Optional[Dict[str, str]] |
Environment variables to set. |
None |
mount_local_stores |
bool |
Whether to mount the local stores path inside the pod. |
False |
Returns:
Type | Description |
---|---|
kubernetes.client.V1Pod |
Pod manifest. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_pod_manifest(
pod_name: str,
run_name: str,
pipeline_name: str,
image_name: str,
command: List[str],
args: List[str],
privileged: bool,
pod_settings: Optional[KubernetesPodSettings] = None,
service_account_name: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
mount_local_stores: bool = False,
) -> k8s_client.V1Pod:
"""Build a Kubernetes pod manifest for a ZenML run or step.
Args:
pod_name: Name of the pod.
run_name: Name of the ZenML run.
pipeline_name: Name of the ZenML pipeline.
image_name: Name of the Docker image.
command: Command to execute the entrypoint in the pod.
args: Arguments provided to the entrypoint command.
privileged: Whether to run the container in privileged mode.
pod_settings: Optional settings for the pod.
service_account_name: Optional name of a service account.
Can be used to assign certain roles to a pod, e.g., to allow it to
run Kubernetes commands from within the cluster.
env: Environment variables to set.
mount_local_stores: Whether to mount the local stores path inside the
pod.
Returns:
Pod manifest.
"""
env = env.copy() if env else {}
env.setdefault(ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, "False")
security_context = k8s_client.V1SecurityContext(privileged=privileged)
container_spec = k8s_client.V1Container(
name="main",
image=image_name,
command=command,
args=args,
env=[
k8s_client.V1EnvVar(name=name, value=value)
for name, value in env.items()
],
security_context=security_context,
)
image_pull_secrets = []
if pod_settings:
image_pull_secrets = [
k8s_client.V1LocalObjectReference(name=name)
for name in pod_settings.image_pull_secrets
]
pod_spec = k8s_client.V1PodSpec(
containers=[container_spec],
restart_policy="Never",
image_pull_secrets=image_pull_secrets,
)
if service_account_name is not None:
pod_spec.service_account_name = service_account_name
labels = {}
if pod_settings:
add_pod_settings(pod_spec, pod_settings)
# Add pod_settings.labels to the labels
if pod_settings.labels:
labels.update(pod_settings.labels)
# Add run_name and pipeline_name to the labels
labels.update(
{
"run": run_name,
"pipeline": pipeline_name,
}
)
pod_metadata = k8s_client.V1ObjectMeta(
name=pod_name,
labels=labels,
)
if pod_settings and pod_settings.annotations:
pod_metadata.annotations = pod_settings.annotations
pod_manifest = k8s_client.V1Pod(
kind="Pod",
api_version="v1",
metadata=pod_metadata,
spec=pod_spec,
)
if mount_local_stores:
add_local_stores_mount(pod_spec)
return pod_manifest
build_service_account_manifest(name, namespace='default')
Build the manifest for a service account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the service account. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for a service account. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_service_account_manifest(
name: str, namespace: str = "default"
) -> Dict[str, Any]:
"""Build the manifest for a service account.
Args:
name: Name of the service account.
namespace: Kubernetes namespace. Defaults to "default".
Returns:
Manifest for a service account.
"""
return {
"apiVersion": "v1",
"metadata": {
"name": name,
"namespace": namespace,
},
}
pod_settings
Kubernetes pod settings.
KubernetesPodSettings (BaseSettings)
Kubernetes Pod settings.
Attributes:
Name | Type | Description |
---|---|---|
node_selectors |
Dict[str, str] |
Node selectors to apply to the pod. |
affinity |
Dict[str, Any] |
Affinity to apply to the pod. |
tolerations |
List[Dict[str, Any]] |
Tolerations to apply to the pod. |
resources |
Dict[str, Dict[str, str]] |
Resource requests and limits for the pod. |
annotations |
Dict[str, str] |
Annotations to apply to the pod metadata. |
volumes |
List[Dict[str, Any]] |
Volumes to mount in the pod. |
volume_mounts |
List[Dict[str, Any]] |
Volume mounts to apply to the pod containers. |
host_ipc |
bool |
Whether to enable host IPC for the pod. |
image_pull_secrets |
List[str] |
Image pull secrets to use for the pod. |
labels |
Dict[str, str] |
Labels to apply to the pod. |
Source code in zenml/integrations/kubernetes/pod_settings.py
class KubernetesPodSettings(BaseSettings):
"""Kubernetes Pod settings.
Attributes:
node_selectors: Node selectors to apply to the pod.
affinity: Affinity to apply to the pod.
tolerations: Tolerations to apply to the pod.
resources: Resource requests and limits for the pod.
annotations: Annotations to apply to the pod metadata.
volumes: Volumes to mount in the pod.
volume_mounts: Volume mounts to apply to the pod containers.
host_ipc: Whether to enable host IPC for the pod.
image_pull_secrets: Image pull secrets to use for the pod.
labels: Labels to apply to the pod.
"""
node_selectors: Dict[str, str] = {}
affinity: Dict[str, Any] = {}
tolerations: List[Dict[str, Any]] = []
resources: Dict[str, Dict[str, str]] = {}
annotations: Dict[str, str] = {}
volumes: List[Dict[str, Any]] = []
volume_mounts: List[Dict[str, Any]] = []
host_ipc: bool = False
image_pull_secrets: List[str] = []
labels: Dict[str, str] = {}
@field_validator("volumes", mode="before")
@classmethod
def _convert_volumes(cls, value: Any) -> Any:
"""Converts Kubernetes volumes to dicts.
Args:
value: The volumes list.
Returns:
The converted volumes.
"""
from kubernetes.client.models import V1Volume
result = []
for element in value:
if isinstance(element, V1Volume):
result.append(
serialization_utils.serialize_kubernetes_model(element)
)
else:
result.append(element)
return result
@field_validator("volume_mounts", mode="before")
@classmethod
def _convert_volume_mounts(cls, value: Any) -> Any:
"""Converts Kubernetes volume mounts to dicts.
Args:
value: The volume mounts list.
Returns:
The converted volume mounts.
"""
from kubernetes.client.models import V1VolumeMount
result = []
for element in value:
if isinstance(element, V1VolumeMount):
result.append(
serialization_utils.serialize_kubernetes_model(element)
)
else:
result.append(element)
return result
@field_validator("affinity", mode="before")
@classmethod
def _convert_affinity(cls, value: Any) -> Any:
"""Converts Kubernetes affinity to a dict.
Args:
value: The affinity value.
Returns:
The converted value.
"""
from kubernetes.client.models import V1Affinity
if isinstance(value, V1Affinity):
return serialization_utils.serialize_kubernetes_model(value)
else:
return value
@field_validator("tolerations", mode="before")
@classmethod
def _convert_tolerations(cls, value: Any) -> Any:
"""Converts Kubernetes tolerations to dicts.
Args:
value: The tolerations list.
Returns:
The converted tolerations.
"""
from kubernetes.client.models import V1Toleration
result = []
for element in value:
if isinstance(element, V1Toleration):
result.append(
serialization_utils.serialize_kubernetes_model(element)
)
else:
result.append(element)
return result
@field_validator("resources", mode="before")
@classmethod
def _convert_resources(cls, value: Any) -> Any:
"""Converts Kubernetes resource requirements to a dict.
Args:
value: The resource value.
Returns:
The converted value.
"""
from kubernetes.client.models import V1ResourceRequirements
if isinstance(value, V1ResourceRequirements):
return serialization_utils.serialize_kubernetes_model(value)
else:
return value
serialization_utils
Kubernetes serialization utils.
deserialize_kubernetes_model(data, class_name)
Deserializes a Kubernetes model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict[str, Any] |
The model data. |
required |
class_name |
str |
Name of the Kubernetes model class. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the data contains values for an invalid attribute. |
Returns:
Type | Description |
---|---|
Any |
The deserialized model. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any:
"""Deserializes a Kubernetes model.
Args:
data: The model data.
class_name: Name of the Kubernetes model class.
Raises:
KeyError: If the data contains values for an invalid attribute.
Returns:
The deserialized model.
"""
model_class = get_model_class(class_name=class_name)
assert hasattr(model_class, "openapi_types")
assert hasattr(model_class, "attribute_map")
# Mapping of the attribute name of the model class to the attribute type
type_mapping = cast(Dict[str, str], model_class.openapi_types)
reverse_attribute_mapping = cast(Dict[str, str], model_class.attribute_map)
# Mapping of the serialized key to the attribute name of the model class
attribute_mapping = {
value: key for key, value in reverse_attribute_mapping.items()
}
deserialized_attributes: Dict[str, Any] = {}
for key, value in data.items():
if key not in attribute_mapping:
raise KeyError(
f"Got value for attribute {key} which is not one of the "
f"available attributes {set(attribute_mapping)}."
)
attribute_name = attribute_mapping[key]
attribute_class = type_mapping[attribute_name]
if not value:
deserialized_attributes[attribute_name] = value
elif attribute_class.startswith("list["):
match = re.fullmatch(r"list\[(.*)\]", attribute_class)
assert match
inner_class = match.group(1)
deserialized_attributes[attribute_name] = _deserialize_list(
value, class_name=inner_class
)
elif attribute_class.startswith("dict("):
match = re.fullmatch(r"dict\(([^,]*), (.*)\)", attribute_class)
assert match
inner_class = match.group(1)
deserialized_attributes[attribute_name] = _deserialize_dict(
value, class_name=inner_class
)
elif is_model_class(attribute_class):
deserialized_attributes[attribute_name] = (
deserialize_kubernetes_model(value, attribute_class)
)
else:
deserialized_attributes[attribute_name] = value
return model_class(**deserialized_attributes)
get_model_class(class_name)
Gets a Kubernetes model class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_name |
str |
Name of the class to get. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If no Kubernetes model class exists for this name. |
Returns:
Type | Description |
---|---|
Type[Any] |
The model class. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def get_model_class(class_name: str) -> Type[Any]:
"""Gets a Kubernetes model class.
Args:
class_name: Name of the class to get.
Raises:
TypeError: If no Kubernetes model class exists for this name.
Returns:
The model class.
"""
import kubernetes.client.models
class_ = getattr(kubernetes.client.models, class_name, None)
if not class_:
raise TypeError(
f"Unable to find kubernetes model class with name {class_name}."
)
assert isinstance(class_, type)
return class_
is_model_class(class_name)
Checks whether the given class name is a Kubernetes model class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_name |
str |
Name of the class to check. |
required |
Returns:
Type | Description |
---|---|
bool |
If the given class name is a Kubernetes model class. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def is_model_class(class_name: str) -> bool:
"""Checks whether the given class name is a Kubernetes model class.
Args:
class_name: Name of the class to check.
Returns:
If the given class name is a Kubernetes model class.
"""
import kubernetes.client.models
return hasattr(kubernetes.client.models, class_name)
serialize_kubernetes_model(model)
Serializes a Kubernetes model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Any |
The model to serialize. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the model is not a Kubernetes model. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The serialized model. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def serialize_kubernetes_model(model: Any) -> Dict[str, Any]:
"""Serializes a Kubernetes model.
Args:
model: The model to serialize.
Raises:
TypeError: If the model is not a Kubernetes model.
Returns:
The serialized model.
"""
if not is_model_class(model.__class__.__name__):
raise TypeError(f"Unable to serialize non-kubernetes model {model}.")
assert hasattr(model, "attribute_map")
attribute_mapping = cast(Dict[str, str], model.attribute_map)
model_attributes = {
serialized_attribute_name: getattr(model, attribute_name)
for attribute_name, serialized_attribute_name in attribute_mapping.items()
}
return _serialize_dict(model_attributes)
service_connectors
special
Kubernetes Service Connector.
kubernetes_service_connector
Kubernetes Service Connector.
The Kubernetes Service Connector implements various authentication methods for Kubernetes clusters.
KubernetesAuthenticationMethods (StrEnum)
Kubernetes Authentication methods.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesAuthenticationMethods(StrEnum):
"""Kubernetes Authentication methods."""
PASSWORD = "password"
TOKEN = "token"
KubernetesBaseConfig (KubernetesServerConfig)
Kubernetes basic config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesBaseConfig(KubernetesServerConfig):
"""Kubernetes basic config."""
cluster_name: str = Field(
title="Kubernetes cluster name to be used in the kubeconfig file",
)
KubernetesServerConfig (KubernetesServerCredentials)
Kubernetes server config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesServerConfig(KubernetesServerCredentials):
"""Kubernetes server config."""
server: str = Field(
title="Kubernetes Server URL",
)
insecure: bool = Field(
default=False,
title="Skip TLS verification for the server certificate",
)
KubernetesServerCredentials (AuthenticationConfig)
Kubernetes server authentication config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesServerCredentials(AuthenticationConfig):
"""Kubernetes server authentication config."""
certificate_authority: Optional[PlainSerializedSecretStr] = Field(
default=None,
title="Kubernetes CA Certificate (base64 encoded)",
)
KubernetesServiceConnector (ServiceConnector)
Kubernetes service connector.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesServiceConnector(ServiceConnector):
"""Kubernetes service connector."""
config: KubernetesBaseConfig
@classmethod
def _get_connector_type(cls) -> ServiceConnectorTypeModel:
"""Get the service connector type specification.
Returns:
The service connector type specification.
"""
return KUBERNETES_SERVICE_CONNECTOR_TYPE_SPEC
def _get_default_resource_id(self, resource_type: str) -> str:
"""Get the default resource ID for a resource type.
Service connector implementations must override this method and provide
a default resource ID for resources that do not support multiple
instances.
Args:
resource_type: The type of the resource to get a default resource ID
for. Only called with resource types that do not support
multiple instances.
Returns:
The default resource ID for the resource type.
"""
# We use the cluster name as the default resource ID.
return self.config.cluster_name
@classmethod
def _write_to_temp_file(cls, content: bytes) -> str:
"""Write content to a secured temporary file.
Write content to a temporary file that is readable and writable only by
the creating user ID and return the path to the temporary file.
Args:
content: The content to write to the temporary file.
Returns:
The path to the temporary file.
"""
fd, temp_path = tempfile.mkstemp()
fp = os.fdopen(fd, "wb")
try:
fp.write(content)
fp.flush()
finally:
fp.close()
return temp_path
def _connect_to_resource(
self,
**kwargs: Any,
) -> Any:
"""Authenticate and connect to a Kubernetes cluster.
Args:
kwargs: Additional implementation specific keyword arguments to pass
to the session or client constructor.
Returns:
A python-kubernetes client object.
"""
cfg = self.config
k8s_conf = k8s_client.Configuration()
if self.auth_method == KubernetesAuthenticationMethods.PASSWORD:
assert isinstance(cfg, KubernetesUserPasswordConfig)
k8s_conf.username = cfg.username.get_secret_value()
k8s_conf.password = cfg.password.get_secret_value()
else:
assert isinstance(cfg, KubernetesTokenConfig)
if cfg.token:
k8s_conf.api_key["authorization"] = (
cfg.token.get_secret_value()
)
k8s_conf.api_key_prefix["authorization"] = "Bearer"
if cfg.client_certificate is not None:
client_cert = cfg.client_certificate.get_secret_value()
client_cert_bs = base64.urlsafe_b64decode(
client_cert.encode("utf-8")
)
k8s_conf.cert_file = self._write_to_temp_file(client_cert_bs)
if cfg.client_key is not None:
client_key = cfg.client_key.get_secret_value()
client_key_bs = base64.urlsafe_b64decode(
client_key.encode("utf-8")
)
k8s_conf.key_file = self._write_to_temp_file(client_key_bs)
k8s_conf.host = cfg.server
if cfg.certificate_authority is not None:
ssl_ca_cert = cfg.certificate_authority.get_secret_value()
cert_bs = base64.urlsafe_b64decode(ssl_ca_cert.encode("utf-8"))
k8s_conf.ssl_ca_cert = self._write_to_temp_file(cert_bs)
return k8s_client.ApiClient(k8s_conf)
def _configure_local_client(
self,
**kwargs: Any,
) -> None:
"""Configure a local Kubernetes client to authenticate and connect to a cluster.
This method uses the connector's configuration to configure the local
Kubernetes client (kubectl).
Args:
kwargs: Additional implementation specific keyword arguments to use
to configure the client.
Raises:
AuthorizationException: If authentication failed.
"""
cfg = self.config
cluster_name = cfg.cluster_name
delete_files: List[str] = []
if self.auth_method == KubernetesAuthenticationMethods.PASSWORD:
assert isinstance(cfg, KubernetesUserPasswordConfig)
username = cfg.username.get_secret_value()
password = cfg.password.get_secret_value()
add_user_cmd = [
"kubectl",
"config",
"set-credentials",
cluster_name,
"--username",
username,
"--password",
password,
]
else:
assert isinstance(cfg, KubernetesTokenConfig)
add_user_cmd = [
"kubectl",
"config",
"set-credentials",
cluster_name,
]
if cfg.token:
token = cfg.token.get_secret_value()
add_user_cmd += [
"--token",
token,
]
if cfg.client_certificate and cfg.client_key:
add_user_cmd += [
"--embed-certs",
]
client_cert = cfg.client_certificate.get_secret_value()
client_cert_bs = base64.urlsafe_b64decode(
client_cert.encode("utf-8")
)
temp_path = self._write_to_temp_file(client_cert_bs)
add_user_cmd += [
"--client-certificate",
temp_path,
]
delete_files.append(temp_path)
client_key = cfg.client_key.get_secret_value()
client_key_bs = base64.urlsafe_b64decode(
client_key.encode("utf-8")
)
temp_path = self._write_to_temp_file(client_key_bs)
add_user_cmd += [
"--client-key",
temp_path,
]
delete_files.append(temp_path)
# add the cluster config to the default kubeconfig
add_cluster_cmd = [
"kubectl",
"config",
"set-cluster",
cluster_name,
"--server",
cfg.server,
]
if cfg.certificate_authority:
ssl_ca_cert = cfg.certificate_authority.get_secret_value()
cert_bs = base64.urlsafe_b64decode(ssl_ca_cert.encode("utf-8"))
temp_path = self._write_to_temp_file(cert_bs)
add_cluster_cmd += [
"--embed-certs",
"--certificate-authority",
temp_path,
]
delete_files.append(temp_path)
add_context_cmd = [
"kubectl",
"config",
"set-context",
cluster_name,
"--cluster",
cluster_name,
"--user",
cluster_name,
]
set_context_cmd = [
"kubectl",
"config",
"use-context",
cluster_name,
]
try:
for cmd in [
add_cluster_cmd,
add_user_cmd,
add_context_cmd,
set_context_cmd,
]:
subprocess.run(
cmd,
check=True,
)
except subprocess.CalledProcessError as e:
raise AuthorizationException(
f"Failed to update local kubeconfig with the "
f"cluster configuration: {e}"
) from e
logger.info(
f"Updated local kubeconfig with the cluster details. "
f"The current kubectl context was set to '{cluster_name}'."
)
# delete the temporary files
for f in delete_files:
os.unlink(f)
@classmethod
def _auto_configure(
cls,
auth_method: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
kubernetes_context: Optional[str] = None,
**kwargs: Any,
) -> "KubernetesServiceConnector":
"""Auto-configure the connector.
Args:
auth_method: The particular authentication method to use. If not
specified, the connector implementation must decide which
authentication method to use or raise an exception.
resource_type: The type of resource to configure.
resource_id: The ID of the resource to configure. The
implementation may choose to either require or ignore this
parameter if it does not support or detect an resource type that
supports multiple instances.
kubernetes_context: The name of the Kubernetes context to use. If
not specified, the active context will be used.
kwargs: Additional implementation specific keyword arguments to use.
Returns:
A configured Kubernetes connector instance.
Raises:
AuthorizationException: If the connector could not be configured.
"""
kube_config = k8s_client.Configuration()
try:
k8s_config.load_kube_config(
context=kubernetes_context,
client_configuration=kube_config,
)
except k8s_config.ConfigException as e:
raise AuthorizationException(
f"Failed to load the Kubernetes configuration: {e}"
) from e
auth_config: KubernetesBaseConfig
if kube_config.username and kube_config.password:
auth_method = KubernetesAuthenticationMethods.PASSWORD
auth_config = KubernetesUserPasswordConfig(
username=kube_config.username,
password=kube_config.password,
server=kube_config.host,
certificate_authority=base64.urlsafe_b64encode(
open(kube_config.ssl_ca_cert, "rb").read()
).decode("utf-8")
if kube_config.ssl_ca_cert
else None,
cluster_name=kube_config.host.strip("https://").split(":")[0],
insecure=kube_config.verify_ssl is False,
)
else:
token: Optional[str] = None
if kube_config.api_key:
token = kube_config.api_key["authorization"].strip("Bearer ")
auth_method = KubernetesAuthenticationMethods.TOKEN
auth_config = KubernetesTokenConfig(
token=token,
server=kube_config.host,
certificate_authority=base64.urlsafe_b64encode(
open(kube_config.ssl_ca_cert, "rb").read()
).decode("utf-8"),
client_certificate=base64.urlsafe_b64encode(
open(kube_config.cert_file, "rb").read()
).decode("utf-8")
if kube_config.cert_file
else None,
client_key=base64.urlsafe_b64encode(
open(kube_config.key_file, "rb").read()
).decode("utf-8")
if kube_config.key_file
else None,
cluster_name=kube_config.host.strip("https://").split(":")[0],
insecure=kube_config.verify_ssl is False,
)
return cls(
auth_method=auth_method,
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
config=auth_config,
)
def _verify(
self,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> List[str]:
"""Verify and list all the resources that the connector can access.
Args:
resource_type: The type of resource to verify. Always set to the
Kubernetes resource type.
resource_id: The ID of the resource to connect to. Always set to
the Kubernetes cluster name.
Returns:
The list of canonical resource IDs that the connector can access,
meaning only the Kubernetes cluster name.
Raises:
AuthorizationException: If the connector cannot authenticate or
access the Kubernetes cluster API.
"""
assert resource_id is not None
client = self._connect_to_resource()
assert isinstance(client, k8s_client.ApiClient)
# Verify that the Kubernetes cluster exists and is accessible
try:
client.call_api(
"/version",
"GET",
auth_settings=["BearerToken"],
response_type="VersionInfo",
)
except k8s_client.ApiException as err:
raise AuthorizationException(
f"failed to verify Kubernetes cluster access: {err}"
) from err
return [resource_id]
KubernetesTokenConfig (KubernetesBaseConfig, KubernetesTokenCredentials)
Kubernetes token config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesTokenConfig(KubernetesBaseConfig, KubernetesTokenCredentials):
"""Kubernetes token config."""
KubernetesTokenCredentials (AuthenticationConfig)
Kubernetes token authentication config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesTokenCredentials(AuthenticationConfig):
"""Kubernetes token authentication config."""
client_certificate: Optional[PlainSerializedSecretStr] = Field(
default=None,
title="Kubernetes Client Certificate (base64 encoded)",
)
client_key: Optional[PlainSerializedSecretStr] = Field(
default=None,
title="Kubernetes Client Key (base64 encoded)",
)
token: Optional[PlainSerializedSecretStr] = Field(
default=None,
title="Kubernetes Token",
)
KubernetesUserPasswordConfig (KubernetesBaseConfig, KubernetesUserPasswordCredentials)
Kubernetes user/pass config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesUserPasswordConfig(
KubernetesBaseConfig,
KubernetesUserPasswordCredentials,
):
"""Kubernetes user/pass config."""
KubernetesUserPasswordCredentials (AuthenticationConfig)
Kubernetes user/pass authentication config.
Source code in zenml/integrations/kubernetes/service_connectors/kubernetes_service_connector.py
class KubernetesUserPasswordCredentials(AuthenticationConfig):
"""Kubernetes user/pass authentication config."""
username: PlainSerializedSecretStr = Field(
title="Kubernetes Username",
)
password: PlainSerializedSecretStr = Field(
title="Kubernetes Password",
)
step_operators
special
Kubernetes step operator.
kubernetes_step_operator
Kubernetes step operator implementation.
KubernetesStepOperator (BaseStepOperator)
Step operator to run on Kubernetes.
Source code in zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
class KubernetesStepOperator(BaseStepOperator):
"""Step operator to run on Kubernetes."""
_k8s_client: Optional[k8s_client.ApiClient] = None
@property
def config(self) -> KubernetesStepOperatorConfig:
"""Returns the `KubernetesStepOperatorConfig` config.
Returns:
The configuration.
"""
return cast(KubernetesStepOperatorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Kubernetes step operator.
Returns:
The settings class.
"""
return KubernetesStepOperatorSettings
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
Returns:
A validator that checks that the stack contains a remote container
registry and a remote artifact store.
"""
def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
if stack.artifact_store.config.is_local:
return False, (
"The Kubernetes step operator runs code remotely and "
"needs to write files into the artifact store, but the "
f"artifact store `{stack.artifact_store.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote artifact store when using the Vertex "
"step operator."
)
container_registry = stack.container_registry
assert container_registry is not None
if container_registry.config.is_local:
return False, (
"The Kubernetes step operator runs code remotely and "
"needs to push/pull Docker images, but the "
f"container registry `{container_registry.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote container registry when using the "
"Kubernetes step operator."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def get_docker_builds(
self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.step_operator == self.name:
build = BuildConfiguration(
key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
def get_kube_client(self) -> k8s_client.ApiClient:
"""Get the Kubernetes API client.
Returns:
The Kubernetes API client.
Raises:
RuntimeError: If the service connector returns an unexpected client.
"""
if self.config.incluster:
kube_utils.load_kube_config(incluster=True)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
# Refresh the client also if the connector has expired
if self._k8s_client and not self.connector_has_expired():
return self._k8s_client
connector = self.get_connector()
if connector:
client = connector.connect()
if not isinstance(client, k8s_client.ApiClient):
raise RuntimeError(
f"Expected a k8s_client.ApiClient while trying to use the "
f"linked connector, but got {type(client)}."
)
self._k8s_client = client
else:
kube_utils.load_kube_config(
context=self.config.kubernetes_context,
)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
@property
def _k8s_core_api(self) -> k8s_client.CoreV1Api:
"""Getter for the Kubernetes Core API client.
Returns:
The Kubernetes Core API client.
"""
return k8s_client.CoreV1Api(self.get_kube_client())
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Launches a step on Kubernetes.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
settings = cast(
KubernetesStepOperatorSettings, self.get_settings(info)
)
image_name = info.get_image(
key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
)
pod_name = f"{info.run_name}_{info.pipeline_step_name}"
pod_name = kube_utils.sanitize_pod_name(pod_name)
command = entrypoint_command[:3]
args = entrypoint_command[3:]
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=info.run_name,
pod_name=pod_name,
pipeline_name=info.pipeline.name,
image_name=image_name,
command=command,
args=args,
privileged=settings.privileged,
service_account_name=settings.service_account_name,
pod_settings=settings.pod_settings,
env=environment,
mount_local_stores=False,
)
self._k8s_core_api.create_namespaced_pod(
namespace=self.config.kubernetes_namespace,
body=pod_manifest,
)
logger.info(
"Waiting for pod of step `%s` to start...", info.pipeline_step_name
)
kube_utils.wait_pod(
kube_client_fn=self.get_kube_client,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
stream_logs=True,
)
logger.info("Pod of step `%s` completed.", info.pipeline_step_name)
config: KubernetesStepOperatorConfig
property
readonly
Returns the KubernetesStepOperatorConfig
config.
Returns:
Type | Description |
---|---|
KubernetesStepOperatorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Kubernetes step operator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a remote container registry and a remote artifact store. |
get_docker_builds(self, deployment)
Gets the Docker builds required for the component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment for which to get the builds. |
required |
Returns:
Type | Description |
---|---|
List[BuildConfiguration] |
The required Docker builds. |
Source code in zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
def get_docker_builds(
self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.step_operator == self.name:
build = BuildConfiguration(
key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
get_kube_client(self)
Get the Kubernetes API client.
Returns:
Type | Description |
---|---|
kubernetes.client.ApiClient |
The Kubernetes API client. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the service connector returns an unexpected client. |
Source code in zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
def get_kube_client(self) -> k8s_client.ApiClient:
"""Get the Kubernetes API client.
Returns:
The Kubernetes API client.
Raises:
RuntimeError: If the service connector returns an unexpected client.
"""
if self.config.incluster:
kube_utils.load_kube_config(incluster=True)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
# Refresh the client also if the connector has expired
if self._k8s_client and not self.connector_has_expired():
return self._k8s_client
connector = self.get_connector()
if connector:
client = connector.connect()
if not isinstance(client, k8s_client.ApiClient):
raise RuntimeError(
f"Expected a k8s_client.ApiClient while trying to use the "
f"linked connector, but got {type(client)}."
)
self._k8s_client = client
else:
kube_utils.load_kube_config(
context=self.config.kubernetes_context,
)
self._k8s_client = k8s_client.ApiClient()
return self._k8s_client
launch(self, info, entrypoint_command, environment)
Launches a step on Kubernetes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
environment |
Dict[str, str] |
Environment variables to set in the step operator environment. |
required |
Source code in zenml/integrations/kubernetes/step_operators/kubernetes_step_operator.py
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Launches a step on Kubernetes.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
settings = cast(
KubernetesStepOperatorSettings, self.get_settings(info)
)
image_name = info.get_image(
key=KUBERNETES_STEP_OPERATOR_DOCKER_IMAGE_KEY
)
pod_name = f"{info.run_name}_{info.pipeline_step_name}"
pod_name = kube_utils.sanitize_pod_name(pod_name)
command = entrypoint_command[:3]
args = entrypoint_command[3:]
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=info.run_name,
pod_name=pod_name,
pipeline_name=info.pipeline.name,
image_name=image_name,
command=command,
args=args,
privileged=settings.privileged,
service_account_name=settings.service_account_name,
pod_settings=settings.pod_settings,
env=environment,
mount_local_stores=False,
)
self._k8s_core_api.create_namespaced_pod(
namespace=self.config.kubernetes_namespace,
body=pod_manifest,
)
logger.info(
"Waiting for pod of step `%s` to start...", info.pipeline_step_name
)
kube_utils.wait_pod(
kube_client_fn=self.get_kube_client,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
stream_logs=True,
)
logger.info("Pod of step `%s` completed.", info.pipeline_step_name)