Kserve
zenml.integrations.kserve
special
Initialization of the KServe integration for ZenML.
The KServe integration allows you to use the KServe model serving platform to implement continuous model deployment.
KServeIntegration (Integration)
Definition of KServe integration for ZenML.
Source code in zenml/integrations/kserve/__init__.py
class KServeIntegration(Integration):
"""Definition of KServe integration for ZenML."""
NAME = KSERVE
REQUIREMENTS = [
"kserve>=0.9.0,<=10",
"torch-model-archiver",
]
@classmethod
def activate(cls) -> None:
"""Activate the KServe integration."""
from zenml.integrations.kserve import model_deployers # noqa
from zenml.integrations.kserve import secret_schemas # noqa
from zenml.integrations.kserve import services # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for KServe.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor
return [KServeModelDeployerFlavor]
activate()
classmethod
Activate the KServe integration.
Source code in zenml/integrations/kserve/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the KServe integration."""
from zenml.integrations.kserve import model_deployers # noqa
from zenml.integrations.kserve import secret_schemas # noqa
from zenml.integrations.kserve import services # noqa
flavors()
classmethod
Declare the stack component flavors for KServe.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/kserve/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for KServe.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor
return [KServeModelDeployerFlavor]
constants
KServe constants.
custom_deployer
special
Initialization of ZenML custom deployer.
zenml_custom_model
Implements a custom model for the Kserve integration.
ZenMLCustomModel (Model)
Custom model class for ZenML and Kserve.
This class is used to implement a custom model for the Kserve integration, which is used as the main entry point for custom code execution.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
The name of the model. |
|
model_uri |
The URI of the model. |
|
predict_func |
The predict function of the model. |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel(kserve.Model): # type: ignore[misc]
"""Custom model class for ZenML and Kserve.
This class is used to implement a custom model for the Kserve integration,
which is used as the main entry point for custom code execution.
Attributes:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
super().__init__(model_name)
self.name = model_name
self.model_uri = model_uri
self.predict_func = source_utils.load(predict_func)
self.model = None
self.ready = False
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a YAML file in the same path as the model artifacts at
the preparing time and loading it again at the prediction time by
the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.utils.artifact_utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Predict the given request.
The main predict function of the model. This function is called by the
KServe server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
request: The request to predict in a dictionary. e.g. {"instances": []}
Returns:
The prediction dictionary.
Raises:
RuntimeError: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {
"predictions": self.predict_func(
self.model, request["instances"]
)
}
except RuntimeError as err:
raise RuntimeError("Failed to predict: {}".format(err))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func)
special
Initializes a ZenMLCustomModel object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
The name of the model. |
required |
model_uri |
str |
The URI of the model. |
required |
predict_func |
str |
The predict function of the model. |
required |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
super().__init__(model_name)
self.name = model_name
self.model_uri = model_uri
self.predict_func = source_utils.load(predict_func)
self.model = None
self.ready = False
load(self)
Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of the artifact to a YAML file in the same path as the model artifacts at the preparing time and loading it again at the prediction time by the materializer.
Returns:
Type | Description |
---|---|
bool |
True if the model was loaded successfully, False otherwise. |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a YAML file in the same path as the model artifacts at
the preparing time and loading it again at the prediction time by
the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.utils.artifact_utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
predict(self, request)
Predict the given request.
The main predict function of the model. This function is called by the KServe server when a request is received. Then inside this function, the user-defined predict function is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
Dict[str, Any] |
The request to predict in a dictionary. e.g. {"instances": []} |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The prediction dictionary. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If function could not be called. |
NotImplementedError |
If the model is not ready. |
TypeError |
If the request is not a dictionary. |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Predict the given request.
The main predict function of the model. This function is called by the
KServe server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
request: The request to predict in a dictionary. e.g. {"instances": []}
Returns:
The prediction dictionary.
Raises:
RuntimeError: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {
"predictions": self.predict_func(
self.model, request["instances"]
)
}
except RuntimeError as err:
raise RuntimeError("Failed to predict: {}".format(err))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
flavors
special
KServe integration flavors.
kserve_model_deployer_flavor
KServe model deployer flavor.
KServeModelDeployerConfig (BaseModelDeployerConfig)
pydantic-model
Configuration for the KServeModelDeployer.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
Optional[str] |
the Kubernetes context to use to contact the remote KServe installation. If not specified, the current configuration is used. Depending on where the KServe model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is ignored. |
kubernetes_namespace |
Optional[str] |
the Kubernetes namespace where the KServe inference service CRDs are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the KServe model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod). |
base_url |
str |
the base URL of the Kubernetes ingress used to expose the KServe inference services. |
secret |
Optional[str] |
the name of the secret containing the credentials for the KServe inference services. |
Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerConfig(BaseModelDeployerConfig):
"""Configuration for the KServeModelDeployer.
Attributes:
kubernetes_context: the Kubernetes context to use to contact the remote
KServe installation. If not specified, the current
configuration is used. Depending on where the KServe model deployer
is being used, this can be either a locally active context or an
in-cluster Kubernetes configuration (if running inside a pod).
If the model deployer stack component is linked to a Kubernetes
service connector, this field is ignored.
kubernetes_namespace: the Kubernetes namespace where the KServe
inference service CRDs are provisioned and managed by ZenML. If not
specified, the namespace set in the current configuration is used.
Depending on where the KServe model deployer is being used, this can
be either the current namespace configured in the locally active
context or the namespace in the context of which the pod is running
(if running inside a pod).
base_url: the base URL of the Kubernetes ingress used to expose the
KServe inference services.
secret: the name of the secret containing the credentials for the
KServe inference services.
"""
kubernetes_context: Optional[str] = None
kubernetes_namespace: Optional[str] = None
base_url: str # TODO: unused?
secret: Optional[str]
custom_domain: Optional[str] # TODO: unused?
KServeModelDeployerFlavor (BaseModelDeployerFlavor)
Flavor for the KServe model deployer.
Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the KServe model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return KSERVE_MODEL_DEPLOYER_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/kserve.png"
@property
def config_class(self) -> Type[KServeModelDeployerConfig]:
"""Returns `KServeModelDeployerConfig` config class.
Returns:
The config class.
"""
return KServeModelDeployerConfig
@property
def implementation_class(self) -> Type["KServeModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kserve.model_deployers import (
KServeModelDeployer,
)
return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]
property
readonly
Returns KServeModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[KServeModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KServeModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
model_deployers
special
Initialization of the KServe Model Deployer.
kserve_model_deployer
Implementation of the KServe Model Deployer.
KServeModelDeployer (BaseModelDeployer)
KServe model deployer stack component implementation.
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployer(BaseModelDeployer):
"""KServe model deployer stack component implementation."""
NAME: ClassVar[str] = "KServe"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = KServeModelDeployerFlavor
_client: Optional[KServeClient] = None
@property
def config(self) -> KServeModelDeployerConfig:
"""Returns the `KServeModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(KServeModelDeployerConfig, self._config)
@property
def validator(self) -> Optional[StackValidator]:
"""Ensures there is a container registry and image builder in the stack.
Returns:
A `StackValidator` instance.
"""
# Log deprecation warning
logger.warning(
"The KServe model deployer is deprecated and is no longer "
"being maintained by the ZenML core team. If you are looking for a "
"scalable Kubernetes-based model deployment solution, consider "
"using Seldon instead: "
"https://docs.zenml.io/stacks-and-components/component-guide/model-deployers/seldon",
)
return StackValidator(
required_components={
StackComponentType.IMAGE_BUILDER,
}
)
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "KServeDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: KServe deployment service object
Returns:
A dictionary containing the model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"PREDICTION_HOSTNAME": service_instance.prediction_hostname,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
}
@property
def kserve_client(self) -> KServeClient:
"""Get the KServe client associated with this model deployer.
Returns:
The KServeclient.
Raises:
RuntimeError: If the Kubernetes namespace is not configured in the
stack component when using a service connector to deploy models
with KServe.
"""
# Refresh the client also if the connector has expired
if self._client and not self.connector_has_expired():
return self._client
connector = self.get_connector()
if connector:
if not self.config.kubernetes_namespace:
raise RuntimeError(
"The Kubernetes namespace must be explicitly configured in "
"the stack component when using a service connector to "
"deploy models with KServe."
)
client = connector.connect()
if not isinstance(client, k8s_client.ApiClient):
raise RuntimeError(
f"Expected a k8s_client.ApiClient while trying to use the "
f"linked connector, but got {type(client)}."
)
self._client = KubeClientKServeClient(
kube_client=client,
)
else:
self._client = KServeClient(
context=self.config.kubernetes_context,
)
return self._client
def get_docker_builds(
self, deployment: "PipelineDeploymentBaseModel"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
build = BuildConfiguration(
key=KSERVE_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new KServe deployment or update an existing one.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new KServe
deployment server to reflect the model and other configuration
parameters specified in the supplied KServe deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing KServe deployment that is *equivalent* to the supplied
configuration parameters. Two or more KServe deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two KServe deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent KServe deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing KServe deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new KServe deployment
server for each new model version. If multiple equivalent KServe
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
config: the configuration of the model to be deployed with KServe.
replace: set this flag to True to find and update an equivalent
KServeDeployment server with the new model instead of
starting a new deployment server.
timeout: the timeout in seconds to wait for the KServe server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the KServe
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML KServe deployment service object that can be used to
interact with the remote KServe server.
Raises:
RuntimeError: if the KServe deployment server could not be stopped.
"""
with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(KServeDeploymentConfig, config)
service = None
# if replace is True, find equivalent KServe deployments
if replace is True:
equivalent_services = self.find_model_server(
running=False,
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for equivalent_service in equivalent_services:
if service is None:
# keep the most recently created service
service = equivalent_service
else:
try:
# delete the older services and don't wait for
# them to be deprovisioned
service.stop()
except RuntimeError as e:
raise RuntimeError(
"Failed to stop the KServe deployment "
"server:\n",
f"{e}\n",
"Please stop it manually and try again.",
)
if service:
# Reuse the service account and secret from the existing
# service.
assert isinstance(service, KServeDeploymentService)
config.k8s_service_account = service.config.k8s_service_account
config.k8s_secret = service.config.k8s_secret
# configure the credentials for the KServe model server
self._create_or_update_kserve_credentials(config)
if service:
# update an equivalent service in place
service.update(config)
logger.info(
f"Updating an existing KServe deployment service: {service}"
)
else:
# create a new service
service = KServeDeploymentService(config=config)
logger.info(
f"Creating a new KServe deployment service: {service}"
)
# start the service which in turn provisions the KServe
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.container is not None,
}
return service
def get_kserve_deployments(
self, labels: Dict[str, str]
) -> List[V1beta1InferenceService]:
"""Get a list of KServe deployments that match the supplied labels.
Args:
labels: a dictionary of labels to match against KServe deployments.
Returns:
A list of KServe deployments that match the supplied labels.
Raises:
RuntimeError: if an operational failure is encountered while
"""
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
namespace = (
self.config.kubernetes_namespace
or utils.get_default_target_namespace()
)
try:
response = (
self.kserve_client.api_instance.list_namespaced_custom_object(
constants.KSERVE_GROUP,
constants.KSERVE_V1BETA1_VERSION,
namespace,
constants.KSERVE_PLURAL,
label_selector=label_selector,
)
)
except k8s_client.rest.ApiException as e:
raise RuntimeError(
"Exception when retrieving KServe inference services\
%s\n"
% e
)
# TODO[CRITICAL]: de-serialize each item into a complete
# V1beta1InferenceService object recursively using the OpenApi
# schema (this doesn't work right now)
inference_services: List[V1beta1InferenceService] = []
for item in response.get("items", []):
snake_case_item = self._camel_to_snake(item)
inference_service = V1beta1InferenceService(**snake_case_item)
inference_services.append(inference_service)
return inference_services
def _camel_to_snake(self, obj: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a camelCase dictionary to snake_case.
Args:
obj: a dictionary with camelCase keys
Returns:
a dictionary with snake_case keys
"""
if isinstance(obj, (str, int, float)):
return obj
if isinstance(obj, dict):
assert obj is not None
new = obj.__class__()
for k, v in obj.items():
new[self._convert_to_snake(k)] = self._camel_to_snake(v)
elif isinstance(obj, (list, set, tuple)):
assert obj is not None
new = obj.__class__(self._camel_to_snake(v) for v in obj)
else:
return obj
return new
def _convert_to_snake(self, k: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", k).lower()
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
run_name: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
predictor: Optional[str] = None,
) -> List[BaseService]:
"""Find one or more KServe model services that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: name of the pipeline that the deployed model was part
of.
run_name: name of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
predictor: the name of the predictor that was used to deploy the model.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
config = KServeDeploymentConfig(
pipeline_name=pipeline_name or "",
run_name=run_name or "",
pipeline_run_id=run_name or "",
pipeline_step_name=pipeline_step_name or "",
model_uri=model_uri or "",
model_name=model_name or "",
predictor=predictor or "",
resources={},
)
labels = config.get_kubernetes_labels()
if service_uuid:
labels["zenml.service_uuid"] = str(service_uuid)
deployments = self.get_kserve_deployments(labels=labels)
services: List[BaseService] = []
for deployment in deployments:
# recreate the KServe deployment service object from the KServe
# deployment resource
service = KServeDeploymentService.create_from_deployment(
deployment=deployment
)
if running and not service.is_running:
# skip non-running services
continue
services.append(service)
return services
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Stop a KServe model server.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping on KServe model servers is not
supported.
"""
raise NotImplementedError(
"Stopping KServe model servers is not implemented. Try "
"deleting the KServe model server instead."
)
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Start a KServe model deployment server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting KServe
model servers
"""
raise NotImplementedError(
"Starting KServe model servers is not implemented"
)
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a KServe model deployment server.
Args:
uuid: UUID of the model server to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
return
service = services[0]
service.stop(timeout=timeout, force=force)
assert isinstance(service, KServeDeploymentService)
if service.config.k8s_service_account:
self.delete_k8s_service_account(service.config.k8s_service_account)
if service.config.k8s_secret:
self.delete_k8s_secret(service.config.k8s_secret)
def _create_or_update_kserve_credentials(
self, config: KServeDeploymentConfig
) -> None:
"""Create or update the KServe credentials used to access the artifact store.
The way KServe allows configured credentials to be passed to the
model servers is a bit convoluted:
* we need to create a Kubernetes secret object with credentials
in the correct format supported by KServe (only AWS, GCP, Azure are
supported)
* we need to create a Kubernetes service account object that
references the secret object
* we need to use the service account object in the KServe
deployment configuration
This method will use a random name for every model server. This ensures
that we can create multiple KServe deployments with different
credentials without running into naming conflicts.
If a ZenML secret is not explicitly configured for the model deployment
or the model deployer, this method attempts to fetch credentials from
the active artifact store and convert them into the appropriate secret
format expected by KServe.
Args:
config: KServe deployment configuration.
Raises:
RuntimeError: if the configured secret object is not found.
"""
secret_name = config.secret_name or self.config.secret
if secret_name:
if config.secret_name:
secret_source = "model deployment"
else:
secret_source = "Model Deployer"
logger.warning(
f"Your KServe {secret_source} is configured to use a "
f"ZenML secret `{secret_name}` that holds credentials needed "
"to access the artifact store. The recommended authentication "
"method is to configure credentials for the artifact store "
"stack component instead. The KServe model deployer will use "
"those credentials to authenticate to the artifact store "
"automatically."
)
try:
zenml_secret = Client().get_secret_by_name_and_scope(
secret_name
)
except KeyError as e:
raise RuntimeError(
f"The ZenML secret '{secret_name}' specified in the "
f"KServe {secret_source} configuration was not found "
f"in the secrets store: {e}."
)
credentials = zenml_secret.secret_values
else:
# if no secret is configured, try to fetch credentials from the
# active artifact store and convert them into the appropriate format
# expected by KServe
converted_secret = self._convert_artifact_store_secret()
if not converted_secret:
# If a secret and service account were previously configured, we
# need to delete them before we can proceed
if config.k8s_service_account:
self.delete_k8s_service_account(config.k8s_service_account)
config.k8s_service_account = None
if config.k8s_secret:
self.delete_k8s_secret(config.k8s_secret)
config.k8s_secret = None
return
credentials = converted_secret.content
# S3 credentials are special because part of them need to be passed
# as annotations
annotations: Dict[str, str] = {}
if "aws_access_key_id" in credentials:
if credentials.get("s3_region"):
annotations[
"serving.kubeflow.org/s3-region"
] = credentials.pop("s3_region")
if credentials.get("s3_endpoint"):
annotations[
"serving.kubeflow.org/s3-endpoint"
] = credentials.pop("s3_endpoint")
if credentials.get("s3_use_https"):
annotations[
"serving.kubeflow.org/s3-usehttps"
] = credentials.pop("s3_use_https")
if credentials.get("s3_verify_ssl"):
annotations[
"serving.kubeflow.org/s3-verifyssl"
] = credentials.pop("s3_verify_ssl")
# Convert all keys to uppercase
credentials = {k.upper(): v for k, v in credentials.items()}
# The GCP credentials need to use a specific key name
if "GOOGLE_APPLICATION_CREDENTIALS" in credentials:
credentials[
"gcloud-application-credentials.json"
] = credentials.pop("GOOGLE_APPLICATION_CREDENTIALS")
# Create or update the Kubernetes secret object
config.k8s_secret = self.create_or_update_k8s_secret(
name=config.k8s_secret,
annotations=annotations,
secret_values=credentials,
)
# Create or update the Kubernetes service account object
config.k8s_service_account = self.create_or_update_k8s_service_account(
name=config.k8s_service_account,
secret_name=config.k8s_secret,
)
def _convert_artifact_store_secret(self) -> Optional[BaseSecretSchema]:
"""Convert the credentials configured for the artifact store into a ZenML secret.
Returns:
The KServe credentials in the format expected by KServe or None if
no credentials are configured for the artifact store or if they
cannot be converted into the KServe format.
"""
artifact_store = Client().active_stack.artifact_store
zenml_secret: BaseSecretSchema
if artifact_store.flavor == "s3":
from zenml.integrations.s3.artifact_stores import S3ArtifactStore
assert isinstance(artifact_store, S3ArtifactStore)
(
aws_access_key_id,
aws_secret_access_key,
_,
) = artifact_store.get_credentials()
if aws_access_key_id and aws_secret_access_key:
# Convert the credentials into the format expected by KServe
zenml_secret = KServeS3SecretSchema(
name="",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
if artifact_store.config.client_kwargs:
if "endpoint_url" in artifact_store.config.client_kwargs:
zenml_secret.s3_endpoint = str(
artifact_store.config.client_kwargs["endpoint_url"]
)
if "region_name" in artifact_store.config.client_kwargs:
zenml_secret.s3_region = str(
artifact_store.config.client_kwargs["region_name"]
)
if "use_ssl" in artifact_store.config.client_kwargs:
zenml_secret.s3_use_https = str(
artifact_store.config.client_kwargs["use_ssl"]
)
return zenml_secret
logger.warning(
"No credentials are configured for the active S3 artifact "
"store. The KServe model deployer will assume an "
"implicit form of authentication is available in the "
"target Kubernetes cluster, but the served model may not "
"be able to access the model artifacts."
)
# Assume implicit in-cluster IAM authentication
return None
elif artifact_store.flavor == "gcp":
from zenml.integrations.gcp.artifact_stores import GCPArtifactStore
assert isinstance(artifact_store, GCPArtifactStore)
gcp_credentials = artifact_store.get_credentials()
if gcp_credentials:
# Convert the credentials into the format expected by KServe
return KServeGSSecretSchema(
name="",
google_application_credentials=json.dumps(gcp_credentials),
)
logger.warning(
"No credentials are configured for the active GCS artifact "
"store. The KServe model deployer will assume an "
"implicit form of authentication is available in the "
"target Kubernetes cluster, but the served model may not "
"be able to access the model artifacts."
)
return None
elif artifact_store.flavor == "azure":
from zenml.integrations.azure.artifact_stores import (
AzureArtifactStore,
)
assert isinstance(artifact_store, AzureArtifactStore)
azure_credentials = artifact_store.get_credentials()
if azure_credentials:
# Convert the credentials into the format expected by KServe
if (
azure_credentials.client_id is not None
and azure_credentials.client_secret is not None
and azure_credentials.tenant_id is not None
):
return KServeAzureSecretSchema(
name="",
azure_client_id=azure_credentials.client_id,
azure_client_secret=azure_credentials.client_secret,
azure_tenant_id=azure_credentials.tenant_id,
)
else:
logger.warning(
"The KServe model deployer could not use the "
"credentials currently configured in the active Azure "
"artifact store because it only supports service "
"principal Azure credentials. "
"Please configure Azure principal credentials for your "
"artifact store or specify a custom ZenML secret in "
"the model deployer configuration that holds the "
"credentials required to access the model artifacts. "
"The KServe model deployer will assume an implicit "
"form of authentication is available in the target "
"Kubernetes cluster, but the served model "
"may not be able to access the model artifacts."
)
return None
logger.warning(
"No credentials are configured for the active Azure "
"artifact store. The Seldon Core model deployer will "
"assume an implicit form of authentication is available "
"in the target Kubernetes cluster, but the served model "
"may not be able to access the model artifacts."
)
return None
logger.warning(
"The KServe model deployer doesn't know how to configure "
f"credentials automatically for the `{artifact_store.flavor}` "
"active artifact store flavor. "
"Please use one of the supported artifact stores (S3 or GCP) "
"or specify a ZenML secret in the model deployer "
"configuration that holds the credentials required to access "
"the model artifacts. The KServe model deployer will "
"assume an implicit form of authentication is available "
"in the target Kubernetes cluster, but the served model "
"may not be able to access the model artifacts."
)
return None
def create_or_update_k8s_secret(
self,
name: Optional[str] = None,
secret_values: Dict[str, Any] = {},
annotations: Dict[str, str] = {},
) -> str:
"""Create or update a Kubernetes Secret resource.
Args:
name: the name of the Secret resource to create. If not
specified, a random name will be generated.
secret_values: secret key-values that should be
stored in the Secret resource.
annotations: optional annotations to add to the Secret resource.
Returns:
The name of the created Secret resource.
Raises:
RuntimeError: if an unknown error occurs during the creation of
the secret.
"""
name = name or f"zenml-kserve-{uuid4().hex}"
try:
logger.debug(f"Creating Secret resource: {name}")
core_api = k8s_client.CoreV1Api()
secret_data = {
k: base64.b64encode(str(v).encode("utf-8")).decode("ascii")
for k, v in secret_values.items()
if v is not None
}
secret = k8s_client.V1Secret(
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={"app": "zenml"},
annotations=annotations,
),
type="Opaque",
data=secret_data,
)
try:
# check if the secret is already present
core_api.read_namespaced_secret(
name=name,
namespace=self.config.kubernetes_namespace,
)
# if we got this far, the secret is already present, update it
# in place
response = core_api.replace_namespaced_secret(
name=name,
namespace=self.config.kubernetes_namespace,
body=secret,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise RuntimeError(
"Exception when reading Secret resource: %s", str(e)
)
response = core_api.create_namespaced_secret(
namespace=self.config.kubernetes_namespace,
body=secret,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
raise RuntimeError(
"Exception when creating Secret resource %s", str(e)
)
return name
def delete_k8s_secret(
self,
name: str,
) -> None:
"""Delete a Kubernetes Secret resource managed by ZenML.
Args:
name: the name of the Kubernetes Secret resource to delete.
Raises:
RuntimeError: if an unknown error occurs during the removal
of the secret.
"""
try:
logger.debug(f"Deleting Secret resource: {name}")
core_api = k8s_client.CoreV1Api()
response = core_api.delete_namespaced_secret(
name=name,
namespace=self.config.kubernetes_namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the secret is no longer present, nothing to do
return
raise RuntimeError(
f"Exception when deleting Secret resource {name}: {e}"
)
def create_or_update_k8s_service_account(
self, name: Optional[str] = None, secret_name: Optional[str] = None
) -> str:
"""Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML.
Args:
name: the name of the ServiceAccount resource to create. If not
specified, a random name will be generated.
secret_name: the name of a secret to attach to the ServiceAccount.
Returns:
The name of the created ServiceAccount resource.
Raises:
RuntimeError: if an unknown error occurs during the creation of
the service account.
"""
name = name or f"zenml-kserve-{uuid4().hex}"
service_account = k8s_client.V1ServiceAccount(
metadata=k8s_client.V1ObjectMeta(
name=name,
),
)
if secret_name:
service_account.secrets = [
k8s_client.V1ObjectReference(kind="Secret", name=secret_name)
]
core_api = k8s_client.CoreV1Api()
try:
# check if the service account is already present
core_api.read_namespaced_service_account(
name=name,
namespace=self.config.kubernetes_namespace,
)
# if we got this far, the service account is already present, update
# it in place
core_api.replace_namespaced_service_account(
name=name,
namespace=self.config.kubernetes_namespace,
body=service_account,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise RuntimeError(
"Exception when reading ServiceAccount resource: %s",
str(e),
)
core_api.create_namespaced_service_account(
namespace=self.config.kubernetes_namespace,
body=service_account,
)
return name
def delete_k8s_service_account(
self,
name: str,
) -> None:
"""Delete a Kubernetes ServiceAccount resource managed by ZenML.
Args:
name: the name of the Kubernetes ServiceAccount resource to delete.
Raises:
RuntimeError: if an unknown error occurs during the removal
of the service account.
"""
try:
logger.debug(f"Deleting ServiceAccount resource: {name}")
core_api = k8s_client.CoreV1Api()
response = core_api.delete_namespaced_service_account(
name=name,
namespace=self.config.kubernetes_namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the service account is no longer present, nothing to do
return
raise RuntimeError(
f"Exception when deleting ServiceAccount resource {name}: {e}"
)
config: KServeModelDeployerConfig
property
readonly
Returns the KServeModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
KServeModelDeployerConfig |
The configuration. |
kserve_client: KServeClient
property
readonly
Get the KServe client associated with this model deployer.
Returns:
Type | Description |
---|---|
KServeClient |
The KServeclient. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the Kubernetes namespace is not configured in the stack component when using a service connector to deploy models with KServe. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Ensures there is a container registry and image builder in the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
FLAVOR (BaseModelDeployerFlavor)
Flavor for the KServe model deployer.
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the KServe model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return KSERVE_MODEL_DEPLOYER_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/kserve.png"
@property
def config_class(self) -> Type[KServeModelDeployerConfig]:
"""Returns `KServeModelDeployerConfig` config class.
Returns:
The config class.
"""
return KServeModelDeployerConfig
@property
def implementation_class(self) -> Type["KServeModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kserve.model_deployers import (
KServeModelDeployer,
)
return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]
property
readonly
Returns KServeModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[KServeModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KServeModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.service_connector_models.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.service_connector_models.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
create_or_update_k8s_secret(self, name=None, secret_values={}, annotations={})
Create or update a Kubernetes Secret resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Optional[str] |
the name of the Secret resource to create. If not specified, a random name will be generated. |
None |
secret_values |
Dict[str, Any] |
secret key-values that should be stored in the Secret resource. |
{} |
annotations |
Dict[str, str] |
optional annotations to add to the Secret resource. |
{} |
Returns:
Type | Description |
---|---|
str |
The name of the created Secret resource. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an unknown error occurs during the creation of the secret. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def create_or_update_k8s_secret(
self,
name: Optional[str] = None,
secret_values: Dict[str, Any] = {},
annotations: Dict[str, str] = {},
) -> str:
"""Create or update a Kubernetes Secret resource.
Args:
name: the name of the Secret resource to create. If not
specified, a random name will be generated.
secret_values: secret key-values that should be
stored in the Secret resource.
annotations: optional annotations to add to the Secret resource.
Returns:
The name of the created Secret resource.
Raises:
RuntimeError: if an unknown error occurs during the creation of
the secret.
"""
name = name or f"zenml-kserve-{uuid4().hex}"
try:
logger.debug(f"Creating Secret resource: {name}")
core_api = k8s_client.CoreV1Api()
secret_data = {
k: base64.b64encode(str(v).encode("utf-8")).decode("ascii")
for k, v in secret_values.items()
if v is not None
}
secret = k8s_client.V1Secret(
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={"app": "zenml"},
annotations=annotations,
),
type="Opaque",
data=secret_data,
)
try:
# check if the secret is already present
core_api.read_namespaced_secret(
name=name,
namespace=self.config.kubernetes_namespace,
)
# if we got this far, the secret is already present, update it
# in place
response = core_api.replace_namespaced_secret(
name=name,
namespace=self.config.kubernetes_namespace,
body=secret,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise RuntimeError(
"Exception when reading Secret resource: %s", str(e)
)
response = core_api.create_namespaced_secret(
namespace=self.config.kubernetes_namespace,
body=secret,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
raise RuntimeError(
"Exception when creating Secret resource %s", str(e)
)
return name
create_or_update_k8s_service_account(self, name=None, secret_name=None)
Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Optional[str] |
the name of the ServiceAccount resource to create. If not specified, a random name will be generated. |
None |
secret_name |
Optional[str] |
the name of a secret to attach to the ServiceAccount. |
None |
Returns:
Type | Description |
---|---|
str |
The name of the created ServiceAccount resource. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an unknown error occurs during the creation of the service account. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def create_or_update_k8s_service_account(
self, name: Optional[str] = None, secret_name: Optional[str] = None
) -> str:
"""Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML.
Args:
name: the name of the ServiceAccount resource to create. If not
specified, a random name will be generated.
secret_name: the name of a secret to attach to the ServiceAccount.
Returns:
The name of the created ServiceAccount resource.
Raises:
RuntimeError: if an unknown error occurs during the creation of
the service account.
"""
name = name or f"zenml-kserve-{uuid4().hex}"
service_account = k8s_client.V1ServiceAccount(
metadata=k8s_client.V1ObjectMeta(
name=name,
),
)
if secret_name:
service_account.secrets = [
k8s_client.V1ObjectReference(kind="Secret", name=secret_name)
]
core_api = k8s_client.CoreV1Api()
try:
# check if the service account is already present
core_api.read_namespaced_service_account(
name=name,
namespace=self.config.kubernetes_namespace,
)
# if we got this far, the service account is already present, update
# it in place
core_api.replace_namespaced_service_account(
name=name,
namespace=self.config.kubernetes_namespace,
body=service_account,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise RuntimeError(
"Exception when reading ServiceAccount resource: %s",
str(e),
)
core_api.create_namespaced_service_account(
namespace=self.config.kubernetes_namespace,
body=service_account,
)
return name
delete_k8s_secret(self, name)
Delete a Kubernetes Secret resource managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Kubernetes Secret resource to delete. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an unknown error occurs during the removal of the secret. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_k8s_secret(
self,
name: str,
) -> None:
"""Delete a Kubernetes Secret resource managed by ZenML.
Args:
name: the name of the Kubernetes Secret resource to delete.
Raises:
RuntimeError: if an unknown error occurs during the removal
of the secret.
"""
try:
logger.debug(f"Deleting Secret resource: {name}")
core_api = k8s_client.CoreV1Api()
response = core_api.delete_namespaced_secret(
name=name,
namespace=self.config.kubernetes_namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the secret is no longer present, nothing to do
return
raise RuntimeError(
f"Exception when deleting Secret resource {name}: {e}"
)
delete_k8s_service_account(self, name)
Delete a Kubernetes ServiceAccount resource managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Kubernetes ServiceAccount resource to delete. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an unknown error occurs during the removal of the service account. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_k8s_service_account(
self,
name: str,
) -> None:
"""Delete a Kubernetes ServiceAccount resource managed by ZenML.
Args:
name: the name of the Kubernetes ServiceAccount resource to delete.
Raises:
RuntimeError: if an unknown error occurs during the removal
of the service account.
"""
try:
logger.debug(f"Deleting ServiceAccount resource: {name}")
core_api = k8s_client.CoreV1Api()
response = core_api.delete_namespaced_service_account(
name=name,
namespace=self.config.kubernetes_namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the service account is no longer present, nothing to do
return
raise RuntimeError(
f"Exception when deleting ServiceAccount resource {name}: {e}"
)
delete_model_server(self, uuid, timeout=300, force=False)
Delete a KServe model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to delete. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a KServe model deployment server.
Args:
uuid: UUID of the model server to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
return
service = services[0]
service.stop(timeout=timeout, force=force)
assert isinstance(service, KServeDeploymentService)
if service.config.k8s_service_account:
self.delete_k8s_service_account(service.config.k8s_service_account)
if service.config.k8s_secret:
self.delete_k8s_secret(service.config.k8s_secret)
deploy_model(self, config, replace=False, timeout=300)
Create a new KServe deployment or update an existing one.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new KServe deployment server to reflect the model and other configuration parameters specified in the supplied KServe deploymentconfig
. -
if
replace
is True, this method will first attempt to find an existing KServe deployment that is equivalent to the supplied configuration parameters. Two or more KServe deployments are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two KServe deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent KServe deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing KServe deployment to retain its prediction URL while performing a rolling update to serve a new model version.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new KServe deployment
server for each new model version. If multiple equivalent KServe
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
the configuration of the model to be deployed with KServe. |
required |
replace |
bool |
set this flag to True to find and update an equivalent KServeDeployment server with the new model instead of starting a new deployment server. |
False |
timeout |
int |
the timeout in seconds to wait for the KServe server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the KServe server is provisioned, without waiting for it to fully start. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML KServe deployment service object that can be used to interact with the remote KServe server. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the KServe deployment server could not be stopped. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new KServe deployment or update an existing one.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new KServe
deployment server to reflect the model and other configuration
parameters specified in the supplied KServe deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing KServe deployment that is *equivalent* to the supplied
configuration parameters. Two or more KServe deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two KServe deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent KServe deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing KServe deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new KServe deployment
server for each new model version. If multiple equivalent KServe
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
config: the configuration of the model to be deployed with KServe.
replace: set this flag to True to find and update an equivalent
KServeDeployment server with the new model instead of
starting a new deployment server.
timeout: the timeout in seconds to wait for the KServe server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the KServe
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML KServe deployment service object that can be used to
interact with the remote KServe server.
Raises:
RuntimeError: if the KServe deployment server could not be stopped.
"""
with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(KServeDeploymentConfig, config)
service = None
# if replace is True, find equivalent KServe deployments
if replace is True:
equivalent_services = self.find_model_server(
running=False,
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for equivalent_service in equivalent_services:
if service is None:
# keep the most recently created service
service = equivalent_service
else:
try:
# delete the older services and don't wait for
# them to be deprovisioned
service.stop()
except RuntimeError as e:
raise RuntimeError(
"Failed to stop the KServe deployment "
"server:\n",
f"{e}\n",
"Please stop it manually and try again.",
)
if service:
# Reuse the service account and secret from the existing
# service.
assert isinstance(service, KServeDeploymentService)
config.k8s_service_account = service.config.k8s_service_account
config.k8s_secret = service.config.k8s_secret
# configure the credentials for the KServe model server
self._create_or_update_kserve_credentials(config)
if service:
# update an equivalent service in place
service.update(config)
logger.info(
f"Updating an existing KServe deployment service: {service}"
)
else:
# create a new service
service = KServeDeploymentService(config=config)
logger.info(
f"Creating a new KServe deployment service: {service}"
)
# start the service which in turn provisions the KServe
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.container is not None,
}
return service
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, run_name=None, pipeline_step_name=None, model_name=None, model_uri=None, predictor=None)
Find one or more KServe model services that match the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
bool |
If true, only running services will be returned. |
False |
service_uuid |
Optional[uuid.UUID] |
The UUID of the service that was originally used to deploy the model. |
None |
pipeline_name |
Optional[str] |
name of the pipeline that the deployed model was part of. |
None |
run_name |
Optional[str] |
name of the pipeline run which the deployed model was part of. |
None |
pipeline_step_name |
Optional[str] |
the name of the pipeline model deployment step that deployed the model. |
None |
model_name |
Optional[str] |
the name of the deployed model. |
None |
model_uri |
Optional[str] |
URI of the deployed model. |
None |
predictor |
Optional[str] |
the name of the predictor that was used to deploy the model. |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Service objects representing model servers that match the input search criteria. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
run_name: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
predictor: Optional[str] = None,
) -> List[BaseService]:
"""Find one or more KServe model services that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: name of the pipeline that the deployed model was part
of.
run_name: name of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
predictor: the name of the predictor that was used to deploy the model.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
config = KServeDeploymentConfig(
pipeline_name=pipeline_name or "",
run_name=run_name or "",
pipeline_run_id=run_name or "",
pipeline_step_name=pipeline_step_name or "",
model_uri=model_uri or "",
model_name=model_name or "",
predictor=predictor or "",
resources={},
)
labels = config.get_kubernetes_labels()
if service_uuid:
labels["zenml.service_uuid"] = str(service_uuid)
deployments = self.get_kserve_deployments(labels=labels)
services: List[BaseService] = []
for deployment in deployments:
# recreate the KServe deployment service object from the KServe
# deployment resource
service = KServeDeploymentService.create_from_deployment(
deployment=deployment
)
if running and not service.is_running:
# skip non-running services
continue
services.append(service)
return services
get_docker_builds(self, deployment)
Gets the Docker builds required for the component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBaseModel |
The pipeline deployment for which to get the builds. |
required |
Returns:
Type | Description |
---|---|
List[BuildConfiguration] |
The required Docker builds. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def get_docker_builds(
self, deployment: "PipelineDeploymentBaseModel"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
build = BuildConfiguration(
key=KSERVE_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
get_kserve_deployments(self, labels)
Get a list of KServe deployments that match the supplied labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
Dict[str, str] |
a dictionary of labels to match against KServe deployments. |
required |
Returns:
Type | Description |
---|---|
List[kserve.models.v1beta1_inference_service.V1beta1InferenceService] |
A list of KServe deployments that match the supplied labels. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an operational failure is encountered while |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def get_kserve_deployments(
self, labels: Dict[str, str]
) -> List[V1beta1InferenceService]:
"""Get a list of KServe deployments that match the supplied labels.
Args:
labels: a dictionary of labels to match against KServe deployments.
Returns:
A list of KServe deployments that match the supplied labels.
Raises:
RuntimeError: if an operational failure is encountered while
"""
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
namespace = (
self.config.kubernetes_namespace
or utils.get_default_target_namespace()
)
try:
response = (
self.kserve_client.api_instance.list_namespaced_custom_object(
constants.KSERVE_GROUP,
constants.KSERVE_V1BETA1_VERSION,
namespace,
constants.KSERVE_PLURAL,
label_selector=label_selector,
)
)
except k8s_client.rest.ApiException as e:
raise RuntimeError(
"Exception when retrieving KServe inference services\
%s\n"
% e
)
# TODO[CRITICAL]: de-serialize each item into a complete
# V1beta1InferenceService object recursively using the OpenApi
# schema (this doesn't work right now)
inference_services: List[V1beta1InferenceService] = []
for item in response.get("items", []):
snake_case_item = self._camel_to_snake(item)
inference_service = V1beta1InferenceService(**snake_case_item)
inference_services.append(inference_service)
return inference_services
get_model_server_info(service_instance)
staticmethod
Return implementation specific information on the model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
KServeDeploymentService |
KServe deployment service object |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the model server information. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "KServeDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: KServe deployment service object
Returns:
A dictionary containing the model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"PREDICTION_HOSTNAME": service_instance.prediction_hostname,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
}
start_model_server(self, uuid, timeout=300)
Start a KServe model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
since we don't support starting KServe model servers |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Start a KServe model deployment server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting KServe
model servers
"""
raise NotImplementedError(
"Starting KServe model servers is not implemented"
)
stop_model_server(self, uuid, timeout=300, force=False)
Stop a KServe model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
stopping on KServe model servers is not supported. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Stop a KServe model server.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping on KServe model servers is not
supported.
"""
raise NotImplementedError(
"Stopping KServe model servers is not implemented. Try "
"deleting the KServe model server instead."
)
KubeClientKServeClient (KServeClient)
KServe client initialized from a Kubernetes client.
This is a workaround for the fact that the native KServe client does not support initialization from an existing Kubernetes client.
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KubeClientKServeClient(KServeClient): # type: ignore[misc]
"""KServe client initialized from a Kubernetes client.
This is a workaround for the fact that the native KServe client does not
support initialization from an existing Kubernetes client.
"""
def __init__(
self, kube_client: k8s_client.ApiClient, *args: Any, **kwargs: Any
) -> None:
"""Initializes the KServe client from a Kubernetes client.
Args:
kube_client: pre-configured Kubernetes client.
*args: standard KServe client positional arguments.
**kwargs: standard KServe client keyword arguments.
"""
from kubernetes import client
self.core_api = client.CoreV1Api(kube_client)
self.app_api = client.AppsV1Api(kube_client)
self.api_instance = client.CustomObjectsApi(kube_client)
__init__(self, kube_client, *args, **kwargs)
special
Initializes the KServe client from a Kubernetes client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kube_client |
ApiClient |
pre-configured Kubernetes client. |
required |
*args |
Any |
standard KServe client positional arguments. |
() |
**kwargs |
Any |
standard KServe client keyword arguments. |
{} |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def __init__(
self, kube_client: k8s_client.ApiClient, *args: Any, **kwargs: Any
) -> None:
"""Initializes the KServe client from a Kubernetes client.
Args:
kube_client: pre-configured Kubernetes client.
*args: standard KServe client positional arguments.
**kwargs: standard KServe client keyword arguments.
"""
from kubernetes import client
self.core_api = client.CoreV1Api(kube_client)
self.app_api = client.AppsV1Api(kube_client)
self.api_instance = client.CustomObjectsApi(kube_client)
secret_schemas
special
Initialization of Kserve Secret Schemas.
These are secret schemas that can be used to authenticate Kserve to the Artifact Store used to store served ML models.
secret_schemas
Implementation for KServe secret schemas.
KServeAzureSecretSchema (BaseSecretSchema)
pydantic-model
KServe Azure Blob Storage credentials.
Attributes:
Name | Type | Description |
---|---|---|
azure_client_id |
Optional[str] |
the Azure client ID. |
azure_client_secret |
Optional[str] |
the Azure client secret. |
azure_tenant_id |
Optional[str] |
the Azure tenant ID. |
azure_subscription_id |
Optional[str] |
the Azure subscription ID. |
Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
@register_secret_schema_class
class KServeAzureSecretSchema(BaseSecretSchema):
"""KServe Azure Blob Storage credentials.
Attributes:
azure_client_id: the Azure client ID.
azure_client_secret: the Azure client secret.
azure_tenant_id: the Azure tenant ID.
azure_subscription_id: the Azure subscription ID.
"""
TYPE: ClassVar[str] = KSERVE_AZUREBLOB_SECRET_SCHEMA_TYPE
azure_client_id: Optional[str] = None
azure_client_secret: Optional[str] = None
azure_tenant_id: Optional[str] = None
azure_subscription_id: Optional[str] = None
KServeGSSecretSchema (BaseSecretSchema)
pydantic-model
KServe GCS credentials.
Attributes:
Name | Type | Description |
---|---|---|
google_application_credentials |
Optional[str] |
the GCP application credentials to use, in JSON format. |
Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
@register_secret_schema_class
class KServeGSSecretSchema(BaseSecretSchema):
"""KServe GCS credentials.
Attributes:
google_application_credentials: the GCP application credentials to use,
in JSON format.
"""
TYPE: ClassVar[str] = KSERVE_GS_SECRET_SCHEMA_TYPE
google_application_credentials: Optional[str]
KServeS3SecretSchema (BaseSecretSchema)
pydantic-model
KServe S3 credentials.
Attributes:
Name | Type | Description |
---|---|---|
aws_access_key_id |
Optional[str] |
the AWS access key ID. |
aws_secret_access_key |
Optional[str] |
the AWS secret access key. |
s3_endpoint |
Optional[str] |
the S3 endpoint. |
s3_region |
Optional[str] |
the S3 region. |
s3_use_https |
Optional[str] |
whether to use HTTPS. |
s3_verify_ssl |
Optional[str] |
whether to verify SSL. |
Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
@register_secret_schema_class
class KServeS3SecretSchema(BaseSecretSchema):
"""KServe S3 credentials.
Attributes:
aws_access_key_id: the AWS access key ID.
aws_secret_access_key: the AWS secret access key.
s3_endpoint: the S3 endpoint.
s3_region: the S3 region.
s3_use_https: whether to use HTTPS.
s3_verify_ssl: whether to verify SSL.
"""
TYPE: ClassVar[str] = KSERVE_S3_SECRET_SCHEMA_TYPE
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
s3_endpoint: Optional[str] = None
s3_region: Optional[str] = None
s3_use_https: Optional[str] = None
s3_verify_ssl: Optional[str] = None
services
special
Initialization for KServe services.
kserve_deployment
Implementation for the KServe inference service.
KServeDeploymentConfig (ServiceConfig)
pydantic-model
KServe deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_uri |
str |
URI of the model (or models) to serve. |
model_name |
str |
the name of the model. Multiple versions of the same model should use the same model name. Model name must use only lowercase alphanumeric characters and dashes. |
secret_name |
Optional[str] |
the name of the ZenML secret containing credentials required to authenticate to the artifact store. |
k8s_secret |
Optional[str] |
the name of the Kubernetes secret to use for the prediction service. |
k8s_service_account |
Optional[str] |
the name of the Kubernetes service account to use for the prediction service. |
predictor |
str |
the KServe predictor used to serve the model. The |
predictor |
type can be one of the following |
|
replicas |
int |
number of replicas to use for the prediction service. |
resources |
Optional[Dict[str, Any]] |
the Kubernetes resources to allocate for the prediction service. |
container |
Optional[Dict[str, Any]] |
the container to use for the custom prediction services. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentConfig(ServiceConfig):
"""KServe deployment service configuration.
Attributes:
model_uri: URI of the model (or models) to serve.
model_name: the name of the model. Multiple versions of the same model
should use the same model name. Model name must use only lowercase
alphanumeric characters and dashes.
secret_name: the name of the ZenML secret containing credentials
required to authenticate to the artifact store.
k8s_secret: the name of the Kubernetes secret to use for the prediction
service.
k8s_service_account: the name of the Kubernetes service account to use
for the prediction service.
predictor: the KServe predictor used to serve the model. The
predictor type can be one of the following: `tensorflow`, `pytorch`,
`sklearn`, `xgboost`, `custom`.
replicas: number of replicas to use for the prediction service.
resources: the Kubernetes resources to allocate for the prediction service.
container: the container to use for the custom prediction services.
"""
model_uri: str = ""
model_name: str
secret_name: Optional[str] = None
k8s_secret: Optional[str] = None
k8s_service_account: Optional[str] = None
predictor: str
replicas: int = 1
container: Optional[Dict[str, Any]] = None
resources: Optional[Dict[str, Any]] = None
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: The labels to sanitize.
"""
# TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
def get_kubernetes_labels(self) -> Dict[str, str]:
"""Generate the labels for the KServe inference CRD from the service configuration.
These labels are attached to the KServe inference service CRD
and may be used as label selectors in lookup operations.
Returns:
The labels for the KServe inference service CRD.
"""
labels = {"app": "zenml"}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.run_name:
labels["zenml.run_name"] = self.run_name
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.predictor:
labels["zenml.model_type"] = self.predictor
self.sanitize_labels(labels)
return labels
def get_kubernetes_annotations(self) -> Dict[str, str]:
"""Generate the annotations for the KServe inference CRD the service configuration.
The annotations are used to store additional information about the
KServe ZenML service associated with the deployment that is
not available on the labels. One annotation is particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote KServe inference
service CRD.
Returns:
The annotations for the KServe inference service CRD.
"""
annotations = {
"zenml.service_config": self.json(),
"zenml.version": __version__,
}
return annotations
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentConfig":
"""Recreate a KServe service from a KServe deployment resource.
Args:
deployment: the KServe inference service CRD.
Returns:
The KServe ZenML service configuration corresponding to the given
KServe inference service CRD.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe ZenML service configuration.
"""
config_data = deployment.metadata.get("annotations").get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.parse_raw(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded KServe Inference Service resource contains an "
f"invalid or incompatible KServe ZenML service configuration: "
f"{config_data}"
) from e
return service_config
create_from_deployment(deployment)
classmethod
Recreate a KServe service from a KServe deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
V1beta1InferenceService |
the KServe inference service CRD. |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentConfig |
The KServe ZenML service configuration corresponding to the given KServe inference service CRD. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe ZenML service configuration. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentConfig":
"""Recreate a KServe service from a KServe deployment resource.
Args:
deployment: the KServe inference service CRD.
Returns:
The KServe ZenML service configuration corresponding to the given
KServe inference service CRD.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe ZenML service configuration.
"""
config_data = deployment.metadata.get("annotations").get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.parse_raw(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded KServe Inference Service resource contains an "
f"invalid or incompatible KServe ZenML service configuration: "
f"{config_data}"
) from e
return service_config
get_kubernetes_annotations(self)
Generate the annotations for the KServe inference CRD the service configuration.
The annotations are used to store additional information about the KServe ZenML service associated with the deployment that is not available on the labels. One annotation is particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote KServe inference service CRD.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The annotations for the KServe inference service CRD. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_annotations(self) -> Dict[str, str]:
"""Generate the annotations for the KServe inference CRD the service configuration.
The annotations are used to store additional information about the
KServe ZenML service associated with the deployment that is
not available on the labels. One annotation is particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote KServe inference
service CRD.
Returns:
The annotations for the KServe inference service CRD.
"""
annotations = {
"zenml.service_config": self.json(),
"zenml.version": __version__,
}
return annotations
get_kubernetes_labels(self)
Generate the labels for the KServe inference CRD from the service configuration.
These labels are attached to the KServe inference service CRD and may be used as label selectors in lookup operations.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The labels for the KServe inference service CRD. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_labels(self) -> Dict[str, str]:
"""Generate the labels for the KServe inference CRD from the service configuration.
These labels are attached to the KServe inference service CRD
and may be used as label selectors in lookup operations.
Returns:
The labels for the KServe inference service CRD.
"""
labels = {"app": "zenml"}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.run_name:
labels["zenml.run_name"] = self.run_name
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.predictor:
labels["zenml.model_type"] = self.predictor
self.sanitize_labels(labels)
return labels
sanitize_labels(labels)
staticmethod
Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
Dict[str, str] |
The labels to sanitize. |
required |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: The labels to sanitize.
"""
# TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
KServeDeploymentService (BaseDeploymentService)
pydantic-model
A ZenML service that represents a KServe inference service CRD.
Attributes:
Name | Type | Description |
---|---|---|
config |
KServeDeploymentConfig |
service configuration. |
status |
ServiceStatus |
service status. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentService(BaseDeploymentService):
"""A ZenML service that represents a KServe inference service CRD.
Attributes:
config: service configuration.
status: service status.
"""
SERVICE_TYPE = ServiceType(
name="kserve-deployment",
type="model-serving",
flavor="kserve",
description="KServe inference service",
)
config: KServeDeploymentConfig
status: ServiceStatus = Field(default_factory=lambda: ServiceStatus())
def _get_model_deployer(self) -> "KServeModelDeployer":
"""Get the active KServe model deployer.
Returns:
The active KServeModelDeployer.
"""
from zenml.integrations.kserve.model_deployers.kserve_model_deployer import (
KServeModelDeployer,
)
return cast(
KServeModelDeployer,
KServeModelDeployer.get_active_model_deployer(),
)
def _get_client(self) -> KServeClient:
"""Get the KServe client from the active KServe model deployer.
Returns:
The KServe client.
"""
return self._get_model_deployer().kserve_client
def _get_namespace(self) -> Optional[str]:
"""Get the Kubernetes namespace from the active KServe model deployer.
Returns:
The Kubernetes namespace, or None, if the default namespace is
used.
"""
return self._get_model_deployer().config.kubernetes_namespace
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the state of the KServe inference service.
This method Checks the current operational state of the external KServe
inference service and translate it into a `ServiceState` value and a printable message.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
The operational state of the external service and a message
providing additional information about that state (e.g. a
description of the error if one is encountered while checking the
service status).
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
try:
deployment = client.get(name=name, namespace=namespace)
except RuntimeError:
return (ServiceState.INACTIVE, "")
# TODO[MEDIUM]: Implement better operational status checking that also
# cover errors
if "status" not in deployment:
return (ServiceState.INACTIVE, "No operational status available")
status = "Unknown"
for condition in deployment["status"].get("conditions", {}):
if condition.get("type", "") == "PredictorReady":
status = condition.get("status", "Unknown")
if status.lower() == "true":
return (
ServiceState.ACTIVE,
f"Inference service '{name}' is available",
)
elif status.lower() == "false":
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
)
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' still starting up",
)
@property
def crd_name(self) -> str:
"""Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.
Returns:
The name of the KServe inference service CRD.
"""
return (
self._get_kubernetes_labels().get("zenml.model_name")
or f"zenml-{str(self.uuid)[:8]}"
)
def _get_kubernetes_labels(self) -> Dict[str, str]:
"""Generate the labels for the KServe inference service CRD from the service configuration.
Returns:
The labels for the KServe inference service.
"""
labels = self.config.get_kubernetes_labels()
labels["zenml.service_uuid"] = str(self.uuid)
KServeDeploymentConfig.sanitize_labels(labels)
return labels
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentService":
"""Recreate the configuration of a KServe Service from a deployed instance.
Args:
deployment: the KServe deployment resource.
Returns:
The KServe service configuration corresponding to the given
KServe deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe service configuration.
"""
config = KServeDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
def provision(self) -> None:
"""Provision or update remote KServe deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
namespace = self._get_namespace()
api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
name = self.crd_name
# All supported model specs seem to have the same fields
# so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
if self.config.container is not None:
predictor_kwargs = {
"containers": [
k8s_client.V1Container(
name=self.config.container.get("name"),
image=self.config.container.get("image"),
command=self.config.container.get("command"),
args=self.config.container.get("args"),
env=[
k8s_client.V1EnvVar(
name="STORAGE_URI",
value=self.config.container.get("storage_uri"),
)
],
)
],
"service_account_name": self.config.k8s_service_account,
}
else:
predictor_kwargs = {
self.config.predictor: V1beta1PredictorExtensionSpec(
storage_uri=self.config.model_uri,
resources=self.config.resources,
),
"service_account_name": self.config.k8s_service_account,
}
isvc = V1beta1InferenceService(
api_version=api_version,
kind=constants.KSERVE_KIND,
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=namespace,
labels=self._get_kubernetes_labels(),
annotations=self.config.get_kubernetes_annotations(),
),
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(**predictor_kwargs)
),
)
# TODO[HIGH]: better error handling when provisioning KServe instances
try:
client.get(name=name, namespace=namespace)
# update the existing deployment
client.replace(name, isvc, namespace=namespace)
except RuntimeError:
client.create(isvc)
def deprovision(self, force: bool = False) -> None:
"""Deprovisions all resources used by the service.
Args:
force: if True, the service will be deprovisioned even if it is
still in use.
Raises:
ValueError: if the service is still in use and force is False.
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
# TODO[HIGH]: catch errors if deleting a KServe instance that is no
# longer available
try:
client.delete(name=name, namespace=namespace)
except RuntimeError:
raise ValueError(
f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
)
def _get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a KServe deployment resource.
Args:
name: the name of the KServe deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Raises:
Exception: if an unknown error occurs while fetching the logs.
Yields:
The logs of the given deployment.
"""
client = self._get_client()
namespace = self._get_namespace()
logger.debug(f"Retrieving logs for InferenceService resource: {name}")
try:
response = client.core_api.list_namespaced_pod(
namespace=namespace,
label_selector=f"zenml.service_uuid={self.uuid}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise Exception(
f"The KServe deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name
containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}
container = "default"
if container not in containers:
container = containers[0]
if not container_statuses[container]:
container = init_containers[0]
logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{namespace}`"
)
response = client.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for InferenceService resource "
"%s: %s",
name,
str(e),
)
raise Exception(
f"Unexpected exception when fetching logs for InferenceService "
f"resource: {name}"
) from e
try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the logs from the remote KServe inference service instance.
Args:
follow: if True, the logs will be streamed as they are written.
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_deployment_logs(
self.crd_name,
follow=follow,
tail=tail,
)
@property
def prediction_url(self) -> Optional[str]:
"""The prediction URI exposed by the prediction service.
Returns:
The prediction URI exposed by the prediction service, or None if
the service is not yet ready.
"""
if not self.is_running:
return None
model_deployer = self._get_model_deployer()
return os.path.join(
model_deployer.config.base_url,
"v1/models",
f"{self.crd_name}:predict",
)
@property
def prediction_hostname(self) -> Optional[str]:
"""The prediction hostname exposed by the prediction service.
Returns:
The prediction hostname exposed by the prediction service status
that will be used in the headers of the prediction request.
"""
if not self.is_running:
return None
namespace = self._get_namespace()
model_deployer = self._get_model_deployer()
custom_domain = model_deployer.config.custom_domain or "example.com"
return f"{self.crd_name}.{namespace}.{custom_domain}"
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a NumPy array representing the request
Returns:
A NumPy array represents the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"KServe prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if self.prediction_hostname is None:
raise ValueError(
"`self.prediction_hostname` is not set, cannot post."
)
headers = {"Host": self.prediction_hostname}
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post( # nosec
self.prediction_url,
headers=headers,
json={"instances": request},
)
response.raise_for_status()
return response.json()
crd_name: str
property
readonly
Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.
Returns:
Type | Description |
---|---|
str |
The name of the KServe inference service CRD. |
prediction_hostname: Optional[str]
property
readonly
The prediction hostname exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction hostname exposed by the prediction service status that will be used in the headers of the prediction request. |
prediction_url: Optional[str]
property
readonly
The prediction URI exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction URI exposed by the prediction service, or None if the service is not yet ready. |
check_status(self)
Check the state of the KServe inference service.
This method Checks the current operational state of the external KServe
inference service and translate it into a ServiceState
value and a printable message.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status). |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the state of the KServe inference service.
This method Checks the current operational state of the external KServe
inference service and translate it into a `ServiceState` value and a printable message.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
The operational state of the external service and a message
providing additional information about that state (e.g. a
description of the error if one is encountered while checking the
service status).
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
try:
deployment = client.get(name=name, namespace=namespace)
except RuntimeError:
return (ServiceState.INACTIVE, "")
# TODO[MEDIUM]: Implement better operational status checking that also
# cover errors
if "status" not in deployment:
return (ServiceState.INACTIVE, "No operational status available")
status = "Unknown"
for condition in deployment["status"].get("conditions", {}):
if condition.get("type", "") == "PredictorReady":
status = condition.get("status", "Unknown")
if status.lower() == "true":
return (
ServiceState.ACTIVE,
f"Inference service '{name}' is available",
)
elif status.lower() == "false":
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
)
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' still starting up",
)
create_from_deployment(deployment)
classmethod
Recreate the configuration of a KServe Service from a deployed instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
V1beta1InferenceService |
the KServe deployment resource. |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentService |
The KServe service configuration corresponding to the given KServe deployment resource. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe service configuration. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentService":
"""Recreate the configuration of a KServe Service from a deployed instance.
Args:
deployment: the KServe deployment resource.
Returns:
The KServe service configuration corresponding to the given
KServe deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe service configuration.
"""
config = KServeDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
deprovision(self, force=False)
Deprovisions all resources used by the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the service will be deprovisioned even if it is still in use. |
False |
Exceptions:
Type | Description |
---|---|
ValueError |
if the service is still in use and force is False. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def deprovision(self, force: bool = False) -> None:
"""Deprovisions all resources used by the service.
Args:
force: if True, the service will be deprovisioned even if it is
still in use.
Raises:
ValueError: if the service is still in use and force is False.
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
# TODO[HIGH]: catch errors if deleting a KServe instance that is no
# longer available
try:
client.delete(name=name, namespace=namespace)
except RuntimeError:
raise ValueError(
f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
)
get_logs(self, follow=False, tail=None)
Retrieve the logs from the remote KServe inference service instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written. |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the logs from the remote KServe inference service instance.
Args:
follow: if True, the logs will be streamed as they are written.
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_deployment_logs(
self.crd_name,
follow=follow,
tail=tail,
)
predict(self, request)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
str |
a NumPy array representing the request |
required |
Returns:
Type | Description |
---|---|
Any |
A NumPy array represents the prediction returned by the service. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not yet ready. |
ValueError |
if the prediction_url is not set. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a NumPy array representing the request
Returns:
A NumPy array represents the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"KServe prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if self.prediction_hostname is None:
raise ValueError(
"`self.prediction_hostname` is not set, cannot post."
)
headers = {"Host": self.prediction_hostname}
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post( # nosec
self.prediction_url,
headers=headers,
json={"instances": request},
)
response.raise_for_status()
return response.json()
provision(self)
Provision or update remote KServe deployment instance.
This should then match the current configuration.
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def provision(self) -> None:
"""Provision or update remote KServe deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
namespace = self._get_namespace()
api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
name = self.crd_name
# All supported model specs seem to have the same fields
# so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
if self.config.container is not None:
predictor_kwargs = {
"containers": [
k8s_client.V1Container(
name=self.config.container.get("name"),
image=self.config.container.get("image"),
command=self.config.container.get("command"),
args=self.config.container.get("args"),
env=[
k8s_client.V1EnvVar(
name="STORAGE_URI",
value=self.config.container.get("storage_uri"),
)
],
)
],
"service_account_name": self.config.k8s_service_account,
}
else:
predictor_kwargs = {
self.config.predictor: V1beta1PredictorExtensionSpec(
storage_uri=self.config.model_uri,
resources=self.config.resources,
),
"service_account_name": self.config.k8s_service_account,
}
isvc = V1beta1InferenceService(
api_version=api_version,
kind=constants.KSERVE_KIND,
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=namespace,
labels=self._get_kubernetes_labels(),
annotations=self.config.get_kubernetes_annotations(),
),
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(**predictor_kwargs)
),
)
# TODO[HIGH]: better error handling when provisioning KServe instances
try:
client.get(name=name, namespace=namespace)
# update the existing deployment
client.replace(name, isvc, namespace=namespace)
except RuntimeError:
client.create(isvc)
steps
special
Initialization for KServe steps.
kserve_deployer
Implementation of the KServe Deployer step.
CustomDeployParameters (BaseModel)
pydantic-model
Custom model deployer step extra parameters.
Attributes:
Name | Type | Description |
---|---|---|
predict_function |
str |
Path to Python file containing predict function. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class CustomDeployParameters(BaseModel):
"""Custom model deployer step extra parameters.
Attributes:
predict_function: Path to Python file containing predict function.
"""
predict_function: str
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
"""Validate predict function.
Args:
predict_func_path: predict function path
Returns:
predict function path
Raises:
ValueError: if predict function path is not valid
TypeError: if predict function path is not a callable function
"""
try:
predict_function = source_utils.load(predict_func_path)
except AttributeError:
raise ValueError("Predict function can't be found.")
if not callable(predict_function):
raise TypeError("Predict function must be callable.")
return predict_func_path
predict_function_validate(predict_func_path)
classmethod
Validate predict function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predict_func_path |
str |
predict function path |
required |
Returns:
Type | Description |
---|---|
str |
predict function path |
Exceptions:
Type | Description |
---|---|
ValueError |
if predict function path is not valid |
TypeError |
if predict function path is not a callable function |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
"""Validate predict function.
Args:
predict_func_path: predict function path
Returns:
predict function path
Raises:
ValueError: if predict function path is not valid
TypeError: if predict function path is not a callable function
"""
try:
predict_function = source_utils.load(predict_func_path)
except AttributeError:
raise ValueError("Predict function can't be found.")
if not callable(predict_function):
raise TypeError("Predict function must be callable.")
return predict_func_path
KServeDeployerStepParameters (BaseParameters)
pydantic-model
KServe model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
KServeDeploymentConfig |
KServe deployment service configuration. |
torch_serve_params |
TorchServe set of parameters to deploy model. |
|
timeout |
int |
Timeout for model deployment. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
"""KServe model deployer step parameters.
Attributes:
service_config: KServe deployment service configuration.
torch_serve_params: TorchServe set of parameters to deploy model.
timeout: Timeout for model deployment.
"""
service_config: KServeDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
torch_serve_parameters: Optional[TorchServeParameters] = None
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
TorchServeParameters (BaseModel)
pydantic-model
KServe PyTorch model deployer step configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_class |
str |
Path to Python file containing model architecture. |
handler |
str |
TorchServe's handler file to handle custom TorchServe inference logic. |
extra_files |
Optional[List[str]] |
Comma separated path to extra dependency files. |
model_version |
Optional[str] |
Model version. |
requirements_file |
Optional[str] |
Path to requirements file. |
torch_config |
Optional[str] |
TorchServe configuration file path. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class TorchServeParameters(BaseModel):
"""KServe PyTorch model deployer step configuration.
Attributes:
model_class: Path to Python file containing model architecture.
handler: TorchServe's handler file to handle custom TorchServe inference
logic.
extra_files: Comma separated path to extra dependency files.
model_version: Model version.
requirements_file: Path to requirements file.
torch_config: TorchServe configuration file path.
"""
model_class: str
handler: str
extra_files: Optional[List[str]] = None
requirements_file: Optional[str] = None
model_version: Optional[str] = "1.0"
torch_config: Optional[str] = None
@validator("model_class")
def model_class_validate(cls, v: str) -> str:
"""Validate model class file path.
Args:
v: model class file path
Returns:
model class file path
Raises:
ValueError: if model class file path is not valid
"""
if not v:
raise ValueError("Model class file path is required.")
if not Client.is_inside_repository(v):
raise ValueError(
"Model class file path must be inside the repository."
)
return v
@validator("handler")
def handler_validate(cls, v: str) -> str:
"""Validate handler.
Args:
v: handler file path
Returns:
handler file path
Raises:
ValueError: if handler file path is not valid
"""
if v:
if v in TORCH_HANDLERS:
return v
elif Client.is_inside_repository(v):
return v
else:
raise ValueError(
"Handler must be one of the TorchServe handlers",
"or a file that exists inside the repository.",
)
else:
raise ValueError("Handler is required.")
@validator("extra_files")
def extra_files_validate(
cls, v: Optional[List[str]]
) -> Optional[List[str]]:
"""Validate extra files.
Args:
v: extra files path
Returns:
extra files path
Raises:
ValueError: if the extra files path is not valid
"""
extra_files = []
if v is not None:
for file_path in v:
if Client.is_inside_repository(file_path):
extra_files.append(file_path)
else:
raise ValueError(
"Extra file path must be inside the repository."
)
return extra_files
return v
@validator("torch_config")
def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
"""Validate torch config file.
Args:
v: torch config file path
Returns:
torch config file path
Raises:
ValueError: if torch config file path is not valid.
"""
if v:
if Client.is_inside_repository(v):
return v
else:
raise ValueError(
"Torch config file path must be inside the repository."
)
return v
extra_files_validate(v)
classmethod
Validate extra files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
Optional[List[str]] |
extra files path |
required |
Returns:
Type | Description |
---|---|
Optional[List[str]] |
extra files path |
Exceptions:
Type | Description |
---|---|
ValueError |
if the extra files path is not valid |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("extra_files")
def extra_files_validate(
cls, v: Optional[List[str]]
) -> Optional[List[str]]:
"""Validate extra files.
Args:
v: extra files path
Returns:
extra files path
Raises:
ValueError: if the extra files path is not valid
"""
extra_files = []
if v is not None:
for file_path in v:
if Client.is_inside_repository(file_path):
extra_files.append(file_path)
else:
raise ValueError(
"Extra file path must be inside the repository."
)
return extra_files
return v
handler_validate(v)
classmethod
Validate handler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
handler file path |
required |
Returns:
Type | Description |
---|---|
str |
handler file path |
Exceptions:
Type | Description |
---|---|
ValueError |
if handler file path is not valid |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("handler")
def handler_validate(cls, v: str) -> str:
"""Validate handler.
Args:
v: handler file path
Returns:
handler file path
Raises:
ValueError: if handler file path is not valid
"""
if v:
if v in TORCH_HANDLERS:
return v
elif Client.is_inside_repository(v):
return v
else:
raise ValueError(
"Handler must be one of the TorchServe handlers",
"or a file that exists inside the repository.",
)
else:
raise ValueError("Handler is required.")
model_class_validate(v)
classmethod
Validate model class file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
model class file path |
required |
Returns:
Type | Description |
---|---|
str |
model class file path |
Exceptions:
Type | Description |
---|---|
ValueError |
if model class file path is not valid |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("model_class")
def model_class_validate(cls, v: str) -> str:
"""Validate model class file path.
Args:
v: model class file path
Returns:
model class file path
Raises:
ValueError: if model class file path is not valid
"""
if not v:
raise ValueError("Model class file path is required.")
if not Client.is_inside_repository(v):
raise ValueError(
"Model class file path must be inside the repository."
)
return v
torch_config_validate(v)
classmethod
Validate torch config file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
Optional[str] |
torch config file path |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
torch config file path |
Exceptions:
Type | Description |
---|---|
ValueError |
if torch config file path is not valid. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("torch_config")
def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
"""Validate torch config file.
Args:
v: torch config file path
Returns:
torch config file path
Raises:
ValueError: if torch config file path is not valid.
"""
if v:
if Client.is_inside_repository(v):
return v
else:
raise ValueError(
"Torch config file path must be inside the repository."
)
return v
kserve_custom_model_deployer_step (_DecoratedStep)
KServe custom model deployer pipeline step.
This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
model |
the model artifact to deploy |
required | |
context |
the step context |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the custom deployer parameters is not defined |
DoesNotExistException |
if no active stack is found |
Returns:
Type | Description |
---|---|
KServe deployment service |
entrypoint(deploy_decision, params, context, model)
staticmethod
KServe custom model deployer pipeline step.
This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
KServeDeployerStepParameters |
parameters for the deployer step |
required |
model |
UnmaterializedArtifact |
the model artifact to deploy |
required |
context |
StepContext |
the step context |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the custom deployer parameters is not defined |
DoesNotExistException |
if no active stack is found |
Returns:
Type | Description |
---|---|
KServeDeploymentService |
KServe deployment service |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False, extra={KSERVE_CUSTOM_DEPLOYMENT: True})
def kserve_custom_model_deployer_step(
deploy_decision: bool,
params: KServeDeployerStepParameters,
context: StepContext,
model: UnmaterializedArtifact,
) -> KServeDeploymentService:
"""KServe custom model deployer pipeline step.
This step can be used in a pipeline to implement the
process required to deploy a custom model with KServe.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
model: the model artifact to deploy
context: the step context
Raises:
ValueError: if the custom deployer parameters is not defined
DoesNotExistException: if no active stack is found
Returns:
KServe deployment service
"""
# verify that a custom deployer is defined
if not params.custom_deploy_parameters:
raise ValueError(
"Custom deploy parameter which contains the path of the",
"custom predict function is required for custom model deployment.",
)
# get the active model deployer
model_deployer = cast(
KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name, run id
step_context = get_step_context()
pipeline_name = step_context.pipeline.name
run_name = step_context.pipeline_run.name
step_name = step_context.step_run.name
# update the step configuration with the real pipeline runtime information
params.service_config.pipeline_name = pipeline_name
params.service_config.run_name = run_name
params.service_config.pipeline_step_name = step_name
# fetch existing services with same pipeline name, step name and
# model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.service_config.model_name,
)
# even when the deploy decision is negative if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing the last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.service_config.model_name}'..."
)
service = cast(KServeDeploymentService, existing_services[0])
# even when the deploy decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure that
# a model server is available at all times
if not service.is_running:
service.start(timeout=params.timeout)
return service
# entrypoint for starting KServe server deployment for custom model
entrypoint_command = [
"python",
"-m",
"zenml.integrations.kserve.custom_deployer.zenml_custom_model",
"--model_name",
params.service_config.model_name,
"--predict_func",
params.custom_deploy_parameters.predict_function,
]
# verify if there is an active stack before starting the service
if not context.stack:
raise DoesNotExistException(
"No active stack is available. "
"Please make sure that you have registered and set a stack."
)
image_name = step_context.step_run_info.get_image(
key=KSERVE_DOCKER_IMAGE_KEY
)
# copy the model files to a new specific directory for the deployment
served_model_uri = os.path.join(
context.get_output_artifact_uri(), "kserve"
)
fileio.makedirs(served_model_uri)
io_utils.copy_dir(model.uri, served_model_uri)
# save the model artifact metadata to the YAML file and copy it to the
# deployment directory
model_metadata_file = save_model_metadata(model)
fileio.copy(
model_metadata_file,
os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
)
# prepare the service configuration for the deployment
service_config = params.service_config.copy()
service_config.model_uri = served_model_uri
# Prepare container config for custom model deployment
service_config.container = {
"name": service_config.model_name,
"image": image_name,
"command": entrypoint_command,
"storage_uri": service_config.model_uri,
}
# deploy the service
service = cast(
KServeDeploymentService,
model_deployer.deploy_model(
service_config, replace=True, timeout=params.timeout
),
)
logger.info(
f"KServe deployment service started and reachable at:\n"
f" {service.prediction_url}\n"
f" With the hostname: {service.prediction_hostname}."
)
return service
kserve_model_deployer_step (_DecoratedStep)
KServe model deployer pipeline step.
This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
model |
the model artifact to deploy |
required | |
context |
the step context |
required |
Returns:
Type | Description |
---|---|
KServe deployment service |
entrypoint(deploy_decision, params, context, model)
staticmethod
KServe model deployer pipeline step.
This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
KServeDeployerStepParameters |
parameters for the deployer step |
required |
model |
UnmaterializedArtifact |
the model artifact to deploy |
required |
context |
StepContext |
the step context |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentService |
KServe deployment service |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False)
def kserve_model_deployer_step(
deploy_decision: bool,
params: KServeDeployerStepParameters,
context: StepContext,
model: UnmaterializedArtifact,
) -> KServeDeploymentService:
"""KServe model deployer pipeline step.
This step can be used in a pipeline to implement continuous
deployment for an ML model with KServe.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
model: the model artifact to deploy
context: the step context
Returns:
KServe deployment service
"""
model_deployer = cast(
KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name and run id
step_context = get_step_context()
pipeline_name = step_context.pipeline.name
run_name = step_context.pipeline_run.name
step_name = step_context.step_run.name
# update the step configuration with the real pipeline runtime information
params.service_config.pipeline_name = pipeline_name
params.service_config.run_name = run_name
params.service_config.pipeline_step_name = step_name
# fetch existing services with same pipeline name, step name and
# model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.service_config.model_name,
)
# even when the deploy decision is negative if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing the last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.service_config.model_name}'..."
)
service = cast(KServeDeploymentService, existing_services[0])
# even when the deploy decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure that
# a model server is available at all times
if not service.is_running:
service.start(timeout=params.timeout)
return service
# invoke the KServe model deployer to create a new service
# or update an existing one that was previously deployed for the same
# model
if params.service_config.predictor == "pytorch":
# import the prepare function from the step utils
from zenml.integrations.kserve.steps.kserve_step_utils import (
prepare_torch_service_config,
)
# prepare the service config
service_config = prepare_torch_service_config(
model_uri=model.uri,
output_artifact_uri=context.get_output_artifact_uri(),
params=params,
)
else:
# import the prepare function from the step utils
from zenml.integrations.kserve.steps.kserve_step_utils import (
prepare_service_config,
)
# prepare the service config
service_config = prepare_service_config(
model_uri=model.uri,
output_artifact_uri=context.get_output_artifact_uri(),
params=params,
)
service = cast(
KServeDeploymentService,
model_deployer.deploy_model(
service_config, replace=True, timeout=params.timeout
),
)
logger.info(
f"KServe deployment service started and reachable at:\n"
f" {service.prediction_url}\n"
f" With the hostname: {service.prediction_hostname}."
)
return service
kserve_step_utils
Utility functions used by the KServe deployer step.
TorchModelArchiver (BaseModel)
pydantic-model
Model Archiver for PyTorch models.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
Model name. |
model_version |
Model version. |
|
serialized_file |
str |
Serialized model file. |
handler |
str |
TorchServe's handler file to handle custom TorchServe inference logic. |
extra_files |
Optional[List[str]] |
Comma separated path to extra dependency files. |
requirements_file |
Optional[str] |
Path to requirements file. |
export_path |
str |
Path to export model. |
runtime |
Optional[str] |
Runtime of the model. |
force |
Optional[bool] |
Force export of the model. |
archive_format |
Optional[str] |
Archive format. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
class TorchModelArchiver(BaseModel):
"""Model Archiver for PyTorch models.
Attributes:
model_name: Model name.
model_version: Model version.
serialized_file: Serialized model file.
handler: TorchServe's handler file to handle custom TorchServe inference logic.
extra_files: Comma separated path to extra dependency files.
requirements_file: Path to requirements file.
export_path: Path to export model.
runtime: Runtime of the model.
force: Force export of the model.
archive_format: Archive format.
"""
model_name: str
serialized_file: str
model_file: str
handler: str
export_path: str
extra_files: Optional[List[str]] = None
version: Optional[str] = None
requirements_file: Optional[str] = None
runtime: Optional[str] = "python"
force: Optional[bool] = None
archive_format: Optional[str] = "default"
generate_model_deployer_config(model_name, directory)
Generate a model deployer config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
the name of the model |
required |
directory |
str |
the directory where the model is stored |
required |
Returns:
Type | Description |
---|---|
str |
None |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def generate_model_deployer_config(
model_name: str,
directory: str,
) -> str:
"""Generate a model deployer config.
Args:
model_name: the name of the model
directory: the directory where the model is stored
Returns:
None
"""
config_lines = [
"inference_address=http://0.0.0.0:8085",
"management_address=http://0.0.0.0:8085",
"metrics_address=http://0.0.0.0:8082",
"grpc_inference_port=7070",
"grpc_management_port=7071",
"enable_metrics_api=true",
"metrics_format=prometheus",
"number_of_netty_threads=4",
"job_queue_size=10",
"enable_envvars_config=true",
"install_py_dep_per_model=true",
"model_store=/mnt/models/model-store",
]
with tempfile.NamedTemporaryFile(
suffix=".properties", mode="w+", dir=directory, delete=False
) as f:
for line in config_lines:
f.write(line + "\n")
f.write(
f'model_snapshot={{"name":"startup.cfg","modelCount":1,"models":{{"{model_name}":{{"1.0":{{"defaultVersion":true,"marName":"{model_name}.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"maxBatchDelay":10,"responseTimeout":120}}}}}}}}'
)
f.close()
return f.name
is_valid_model_name(model_name)
Checks if the model name is valid.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
the model name to check |
required |
Returns:
Type | Description |
---|---|
bool |
True if the model name is valid, False otherwise. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def is_valid_model_name(model_name: str) -> bool:
"""Checks if the model name is valid.
Args:
model_name: the model name to check
Returns:
True if the model name is valid, False otherwise.
"""
pattern = re.compile("^[a-z0-9-]+$")
return pattern.match(model_name) is not None
prepare_service_config(model_uri, output_artifact_uri, params)
Prepare the model files for model serving.
This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_uri |
str |
the URI of the model artifact being served |
required |
output_artifact_uri |
str |
the URI of the output artifact |
required |
params |
KServeDeployerStepParameters |
the KServe deployer step parameters |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentConfig |
The URL to the model is ready for serving. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model files cannot be prepared. |
ValidationError |
if the model name is invalid. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_service_config(
model_uri: str,
output_artifact_uri: str,
params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
"""Prepare the model files for model serving.
This function ensures that the model files are in the correct format
and file structure required by the KServe server implementation
used for model serving.
Args:
model_uri: the URI of the model artifact being served
output_artifact_uri: the URI of the output artifact
params: the KServe deployer step parameters
Returns:
The URL to the model is ready for serving.
Raises:
RuntimeError: if the model files cannot be prepared.
ValidationError: if the model name is invalid.
"""
served_model_uri = os.path.join(output_artifact_uri, "kserve")
fileio.makedirs(served_model_uri)
# TODO [ENG-773]: determine how to formalize how models are organized into
# folders and sub-folders depending on the model type/format and the
# KServe protocol used to serve the model.
# TODO [ENG-791]: an auto-detect built-in KServe server implementation
# from the model artifact type
# TODO [ENG-792]: validate the model artifact type against the
# supported built-in KServe server implementations
if params.service_config.predictor == "tensorflow":
# the TensorFlow server expects model artifacts to be
# stored in numbered subdirectories, each representing a model
# version
served_model_uri = os.path.join(
served_model_uri,
params.service_config.predictor,
params.service_config.model_name,
)
fileio.makedirs(served_model_uri)
io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
elif params.service_config.predictor == "sklearn":
# the sklearn server expects model artifacts to be
# stored in a file called model.joblib
model_uri = os.path.join(model_uri, "model")
if not fileio.exists(model_uri):
raise RuntimeError(
f"Expected sklearn model artifact was not found at "
f"{model_uri}"
)
served_model_uri = os.path.join(
served_model_uri,
params.service_config.predictor,
params.service_config.model_name,
)
fileio.makedirs(served_model_uri)
fileio.copy(model_uri, os.path.join(served_model_uri, "model.joblib"))
elif not is_valid_model_name(params.service_config.model_name):
raise ValidationError(
f"Model name '{params.service_config.model_name}' is invalid. "
f"The model name can only include lowercase alphanumeric "
"characters and hyphens. Please rename your model and try again."
)
else:
# default treatment for all other server implementations is to
# simply reuse the model from the artifact store path where it
# is originally stored
served_model_uri = os.path.join(
served_model_uri,
params.service_config.predictor,
params.service_config.model_name,
)
fileio.makedirs(served_model_uri)
fileio.copy(model_uri, served_model_uri)
service_config = params.service_config.copy()
service_config.model_uri = served_model_uri
return service_config
prepare_torch_service_config(model_uri, output_artifact_uri, params)
Prepare the PyTorch model files for model serving.
This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_uri |
str |
the URI of the model artifact being served |
required |
output_artifact_uri |
str |
the URI of the output artifact |
required |
params |
KServeDeployerStepParameters |
the KServe deployer step parameters |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentConfig |
The URL to the model is ready for serving. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model files cannot be prepared. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_torch_service_config(
model_uri: str,
output_artifact_uri: str,
params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
"""Prepare the PyTorch model files for model serving.
This function ensures that the model files are in the correct format
and file structure required by the KServe server implementation
used for model serving.
Args:
model_uri: the URI of the model artifact being served
output_artifact_uri: the URI of the output artifact
params: the KServe deployer step parameters
Returns:
The URL to the model is ready for serving.
Raises:
RuntimeError: if the model files cannot be prepared.
"""
deployment_folder_uri = os.path.join(output_artifact_uri, "kserve")
served_model_uri = os.path.join(deployment_folder_uri, "model-store")
config_properties_uri = os.path.join(deployment_folder_uri, "config")
fileio.makedirs(served_model_uri)
fileio.makedirs(config_properties_uri)
if params.torch_serve_parameters is None:
raise RuntimeError("No torch serve parameters provided")
else:
# Create a temporary folder
temp_dir = tempfile.mkdtemp(prefix="zenml-pytorch-temp-")
tmp_model_uri = os.path.join(
str(temp_dir), f"{params.service_config.model_name}.pt"
)
# Copy from artifact store to temporary file
fileio.copy(f"{model_uri}/checkpoint.pt", tmp_model_uri)
torch_archiver_args = TorchModelArchiver(
model_name=params.service_config.model_name,
serialized_file=tmp_model_uri,
model_file=params.torch_serve_parameters.model_class,
handler=params.torch_serve_parameters.handler,
export_path=temp_dir,
version=params.torch_serve_parameters.model_version,
)
manifest = ModelExportUtils.generate_manifest_json(torch_archiver_args)
package_model(torch_archiver_args, manifest=manifest)
# Copy from temporary file to artifact store
archived_model_uri = os.path.join(
temp_dir, f"{params.service_config.model_name}.mar"
)
if not fileio.exists(archived_model_uri):
raise RuntimeError(
f"Expected torch archived model artifact was not found at "
f"{archived_model_uri}"
)
# Copy the torch model archive artifact to the model store
fileio.copy(
archived_model_uri,
os.path.join(
served_model_uri, f"{params.service_config.model_name}.mar"
),
)
# Get or Generate the config file
if params.torch_serve_parameters.torch_config:
# Copy the torch model config to the model store
fileio.copy(
params.torch_serve_parameters.torch_config,
os.path.join(config_properties_uri, "config.properties"),
)
else:
# Generate the config file
config_file_uri = generate_model_deployer_config(
model_name=params.service_config.model_name,
directory=temp_dir,
)
# Copy the torch model config to the model store
fileio.copy(
config_file_uri,
os.path.join(config_properties_uri, "config.properties"),
)
service_config = params.service_config.copy()
service_config.model_uri = deployment_folder_uri
return service_config