Kserve
zenml.integrations.kserve
special
Initialization of the KServe integration for ZenML.
The KServe integration allows you to use the KServe model serving platform to implement continuous model deployment.
KServeIntegration (Integration)
Definition of KServe integration for ZenML.
Source code in zenml/integrations/kserve/__init__.py
class KServeIntegration(Integration):
"""Definition of KServe integration for ZenML."""
NAME = KSERVE
REQUIREMENTS = [
"kserve==0.9.0",
"torch-model-archiver",
]
@classmethod
def activate(cls) -> None:
"""Activate the Seldon Core integration."""
from zenml.integrations.kserve import model_deployers # noqa
from zenml.integrations.kserve import secret_schemas # noqa
from zenml.integrations.kserve import services # noqa
from zenml.integrations.kserve import steps # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for KServe.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor
return [KServeModelDeployerFlavor]
activate()
classmethod
Activate the Seldon Core integration.
Source code in zenml/integrations/kserve/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the Seldon Core integration."""
from zenml.integrations.kserve import model_deployers # noqa
from zenml.integrations.kserve import secret_schemas # noqa
from zenml.integrations.kserve import services # noqa
from zenml.integrations.kserve import steps # noqa
flavors()
classmethod
Declare the stack component flavors for KServe.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/kserve/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for KServe.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor
return [KServeModelDeployerFlavor]
constants
KServe constants.
custom_deployer
special
Initialization of ZenML custom deployer.
zenml_custom_model
Implements a custom model for the Kserve integration.
ZenMLCustomModel (Model)
Custom model class for ZenML and Kserve.
This class is used to implement a custom model for the Kserve integration, which is used as the main entry point for custom code execution.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
The name of the model. |
|
model_uri |
The URI of the model. |
|
predict_func |
The predict function of the model. |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel(kserve.Model): # type: ignore[misc]
"""Custom model class for ZenML and Kserve.
This class is used to implement a custom model for the Kserve integration,
which is used as the main entry point for custom code execution.
Attributes:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
super().__init__(model_name)
self.name = model_name
self.model_uri = model_uri
self.predict_func = import_class_by_path(predict_func)
self.model = None
self.ready = False
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a YAML file in the same path as the model artifacts at
the preparing time and loading it again at the prediction time by
the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.utils.materializer_utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Predict the given request.
The main predict function of the model. This function is called by the
KServe server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
request: The request to predict in a dictionary. e.g. {"instances": []}
Returns:
The prediction dictionary.
Raises:
RuntimeError: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {
"predictions": self.predict_func(
self.model, request["instances"]
)
}
except RuntimeError as err:
raise RuntimeError("Failed to predict: {}".format(err))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func)
special
Initializes a ZenMLCustomModel object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
The name of the model. |
required |
model_uri |
str |
The URI of the model. |
required |
predict_func |
str |
The predict function of the model. |
required |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
super().__init__(model_name)
self.name = model_name
self.model_uri = model_uri
self.predict_func = import_class_by_path(predict_func)
self.model = None
self.ready = False
load(self)
Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of the artifact to a YAML file in the same path as the model artifacts at the preparing time and loading it again at the prediction time by the materializer.
Returns:
Type | Description |
---|---|
bool |
True if the model was loaded successfully, False otherwise. |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a YAML file in the same path as the model artifacts at
the preparing time and loading it again at the prediction time by
the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.utils.materializer_utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
predict(self, request)
Predict the given request.
The main predict function of the model. This function is called by the KServe server when a request is received. Then inside this function, the user-defined predict function is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
Dict[str, Any] |
The request to predict in a dictionary. e.g. {"instances": []} |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The prediction dictionary. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If function could not be called. |
NotImplementedError |
If the model is not ready. |
TypeError |
If the request is not a dictionary. |
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Predict the given request.
The main predict function of the model. This function is called by the
KServe server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
request: The request to predict in a dictionary. e.g. {"instances": []}
Returns:
The prediction dictionary.
Raises:
RuntimeError: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {
"predictions": self.predict_func(
self.model, request["instances"]
)
}
except RuntimeError as err:
raise RuntimeError("Failed to predict: {}".format(err))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
flavors
special
KServe integration flavors.
kserve_model_deployer_flavor
KServe model deployer flavor.
KServeModelDeployerConfig (BaseModelDeployerConfig)
pydantic-model
Configuration for the KServeModelDeployer.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
Optional[str] |
the Kubernetes context to use to contact the remote KServe installation. If not specified, the current configuration is used. Depending on where the KServe model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). |
kubernetes_namespace |
Optional[str] |
the Kubernetes namespace where the KServe inference service CRDs are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the KServe model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod). |
base_url |
str |
the base URL of the Kubernetes ingress used to expose the KServe inference services. |
secret |
Optional[str] |
the name of the secret containing the credentials for the KServe inference services. |
Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerConfig(BaseModelDeployerConfig):
"""Configuration for the KServeModelDeployer.
Attributes:
kubernetes_context: the Kubernetes context to use to contact the remote
KServe installation. If not specified, the current
configuration is used. Depending on where the KServe model deployer
is being used, this can be either a locally active context or an
in-cluster Kubernetes configuration (if running inside a pod).
kubernetes_namespace: the Kubernetes namespace where the KServe
inference service CRDs are provisioned and managed by ZenML. If not
specified, the namespace set in the current configuration is used.
Depending on where the KServe model deployer is being used, this can
be either the current namespace configured in the locally active
context or the namespace in the context of which the pod is running
(if running inside a pod).
base_url: the base URL of the Kubernetes ingress used to expose the
KServe inference services.
secret: the name of the secret containing the credentials for the
KServe inference services.
"""
kubernetes_context: Optional[str] # TODO: Potential setting
kubernetes_namespace: Optional[str]
base_url: str # TODO: unused?
secret: Optional[str]
custom_domain: Optional[str] # TODO: unused?
KServeModelDeployerFlavor (BaseModelDeployerFlavor)
Flavor for the KServe model deployer.
Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the KServe model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return KSERVE_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[KServeModelDeployerConfig]:
"""Returns `KServeModelDeployerConfig` config class.
Returns:
The config class.
"""
return KServeModelDeployerConfig
@property
def implementation_class(self) -> Type["KServeModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kserve.model_deployers import (
KServeModelDeployer,
)
return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]
property
readonly
Returns KServeModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] |
The config class. |
implementation_class: Type[KServeModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KServeModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
model_deployers
special
Initialization of the KServe Model Deployer.
kserve_model_deployer
Implementation of the KServe Model Deployer.
KServeModelDeployer (BaseModelDeployer)
KServe model deployer stack component implementation.
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployer(BaseModelDeployer):
"""KServe model deployer stack component implementation."""
NAME: ClassVar[str] = "KServe"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = KServeModelDeployerFlavor
_client: Optional[KServeClient] = None
@property
def config(self) -> KServeModelDeployerConfig:
"""Returns the `KServeModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(KServeModelDeployerConfig, self._config)
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "KServeDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: KServe deployment service object
Returns:
A dictionary containing the model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"PREDICTION_HOSTNAME": service_instance.prediction_hostname,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
}
@property
def kserve_client(self) -> KServeClient:
"""Get the KServe client associated with this model deployer.
Returns:
The KServeclient.
"""
if not self._client:
self._client = KServeClient(
context=self.config.kubernetes_context,
)
return self._client
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
needs_docker_image = False
for step in deployment.steps.values():
if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
needs_docker_image = True
if needs_docker_image:
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(KSERVE_DOCKER_IMAGE_KEY, repo_digest)
def _set_credentials(self) -> None:
"""Set the credentials for the given service instance.
Raises:
RuntimeError: if the credentials are not available.
"""
secret = self._get_kserve_secret()
if secret:
secret_folder = Path(
GlobalConfiguration().config_directory,
"kserve-storage",
str(self.id),
)
kserve_credentials = {}
# Handle the secrets attributes
for key in secret.content.keys():
content = getattr(secret, key)
if key == "credentials" and content:
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, f"{key}.json")
kserve_credentials["credentials_file"] = str(file_path)
with open(file_path, "w") as f:
f.write(content)
file_path.chmod(0o600)
# Handle additional params
else:
kserve_credentials[key] = content
# We need to add the namespace to the kserve_credentials
kserve_credentials["namespace"] = (
self.config.kubernetes_namespace
or utils.get_default_target_namespace()
)
try:
self.kserve_client.set_credentials(**kserve_credentials)
except Exception as e:
raise RuntimeError(
f"Failed to set credentials for KServe model deployer: {e}"
)
finally:
if file_path.exists():
file_path.unlink()
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new KServe deployment or update an existing one.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new KServe
deployment server to reflect the model and other configuration
parameters specified in the supplied KServe deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing KServe deployment that is *equivalent* to the supplied
configuration parameters. Two or more KServe deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two KServe deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent KServe deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing KServe deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new KServe deployment
server for each new model version. If multiple equivalent KServe
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
config: the configuration of the model to be deployed with KServe.
replace: set this flag to True to find and update an equivalent
KServeDeployment server with the new model instead of
starting a new deployment server.
timeout: the timeout in seconds to wait for the KServe server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the KServe
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML KServe deployment service object that can be used to
interact with the remote KServe server.
Raises:
RuntimeError: if the KServe deployment server could not be stopped.
"""
with event_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(KServeDeploymentConfig, config)
service = None
# if the secret is passed in the config, use it to set the
# credentials
if config.secret_name:
self.config.secret = config.secret_name or self.config.secret
self._set_credentials()
# if replace is True, find equivalent KServe deployments
if replace is True:
equivalent_services = self.find_model_server(
running=False,
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for equivalent_service in equivalent_services:
if service is None:
# keep the most recently created service
service = equivalent_service
else:
try:
# delete the older services and don't wait for
# them to be deprovisioned
service.stop()
except RuntimeError as e:
raise RuntimeError(
"Failed to stop the KServe deployment "
"server:\n",
f"{e}\n",
"Please stop it manually and try again.",
)
if service:
# update an equivalent service in place
service.update(config)
logger.info(
f"Updating an existing KServe deployment service: {service}"
)
else:
# create a new service
service = KServeDeploymentService(config=config)
logger.info(
f"Creating a new KServe deployment service: {service}"
)
# start the service which in turn provisions the KServe
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.container is not None,
}
return service
def get_kserve_deployments(
self, labels: Dict[str, str]
) -> List[V1beta1InferenceService]:
"""Get a list of KServe deployments that match the supplied labels.
Args:
labels: a dictionary of labels to match against KServe deployments.
Returns:
A list of KServe deployments that match the supplied labels.
Raises:
RuntimeError: if an operational failure is encountered while
"""
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
namespace = (
self.config.kubernetes_namespace
or utils.get_default_target_namespace()
)
try:
response = (
self.kserve_client.api_instance.list_namespaced_custom_object(
constants.KSERVE_GROUP,
constants.KSERVE_V1BETA1_VERSION,
namespace,
constants.KSERVE_PLURAL,
label_selector=label_selector,
)
)
except client.rest.ApiException as e:
raise RuntimeError(
"Exception when retrieving KServe inference services\
%s\n"
% e
)
# TODO[CRITICAL]: de-serialize each item into a complete
# V1beta1InferenceService object recursively using the OpenApi
# schema (this doesn't work right now)
inference_services: List[V1beta1InferenceService] = []
for item in response.get("items", []):
snake_case_item = self._camel_to_snake(item)
inference_service = V1beta1InferenceService(**snake_case_item)
inference_services.append(inference_service)
return inference_services
def _camel_to_snake(self, obj: Dict[str, Any]) -> Dict[str, Any]:
"""Convert a camelCase dictionary to snake_case.
Args:
obj: a dictionary with camelCase keys
Returns:
a dictionary with snake_case keys
"""
if isinstance(obj, (str, int, float)):
return obj
if isinstance(obj, dict):
assert obj is not None
new = obj.__class__()
for k, v in obj.items():
new[self._convert_to_snake(k)] = self._camel_to_snake(v)
elif isinstance(obj, (list, set, tuple)):
assert obj is not None
new = obj.__class__(self._camel_to_snake(v) for v in obj)
else:
return obj
return new
def _convert_to_snake(self, k: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", k).lower()
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
predictor: Optional[str] = None,
) -> List[BaseService]:
"""Find one or more KServe model services that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
predictor: the name of the predictor that was used to deploy the model.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
config = KServeDeploymentConfig(
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
model_uri=model_uri or "",
model_name=model_name or "",
predictor=predictor or "",
resources={},
)
labels = config.get_kubernetes_labels()
if service_uuid:
labels["zenml.service_uuid"] = str(service_uuid)
deployments = self.get_kserve_deployments(labels=labels)
services: List[BaseService] = []
for deployment in deployments:
# recreate the KServe deployment service object from the KServe
# deployment resource
service = KServeDeploymentService.create_from_deployment(
deployment=deployment
)
if running and not service.is_running:
# skip non-running services
continue
services.append(service)
return services
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Stop a KServe model server.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping on KServe model servers is not
supported.
"""
raise NotImplementedError(
"Stopping KServe model servers is not implemented. Try "
"deleting the KServe model server instead."
)
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Start a KServe model deployment server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting KServe
model servers
"""
raise NotImplementedError(
"Starting KServe model servers is not implemented"
)
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a KServe model deployment server.
Args:
uuid: UUID of the model server to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
return
services[0].stop(timeout=timeout, force=force)
def _get_kserve_secret(self) -> Any:
"""Get the secret object for the KServe deployment.
Returns:
The secret object for the KServe deployment.
Raises:
RuntimeError: if the secret object is not found or secrets_manager is not set.
"""
if self.config.secret:
secret_manager = Client().active_stack.secrets_manager
if not secret_manager or not isinstance(
secret_manager, BaseSecretsManager
):
raise RuntimeError(
f"The active stack doesn't have a secret manager component. "
f"The ZenML secret specified in the KServe Model "
f"Deployer configuration cannot be fetched: {self.config.secret}."
)
try:
secret = secret_manager.get_secret(self.config.secret)
return secret
except KeyError:
raise RuntimeError(
f"The secret `{self.config.secret}` used for your KServe Model"
f"Deployer configuration does not exist in your secrets "
f"manager `{secret_manager.name}`."
)
return None
config: KServeModelDeployerConfig
property
readonly
Returns the KServeModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
KServeModelDeployerConfig |
The configuration. |
kserve_client: KServeClient
property
readonly
Get the KServe client associated with this model deployer.
Returns:
Type | Description |
---|---|
KServeClient |
The KServeclient. |
FLAVOR (BaseModelDeployerFlavor)
Flavor for the KServe model deployer.
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the KServe model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return KSERVE_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[KServeModelDeployerConfig]:
"""Returns `KServeModelDeployerConfig` config class.
Returns:
The config class.
"""
return KServeModelDeployerConfig
@property
def implementation_class(self) -> Type["KServeModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kserve.model_deployers import (
KServeModelDeployer,
)
return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]
property
readonly
Returns KServeModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] |
The config class. |
implementation_class: Type[KServeModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KServeModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
delete_model_server(self, uuid, timeout=300, force=False)
Delete a KServe model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to delete. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a KServe model deployment server.
Args:
uuid: UUID of the model server to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
return
services[0].stop(timeout=timeout, force=force)
deploy_model(self, config, replace=False, timeout=300)
Create a new KServe deployment or update an existing one.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new KServe deployment server to reflect the model and other configuration parameters specified in the supplied KServe deploymentconfig
. -
if
replace
is True, this method will first attempt to find an existing KServe deployment that is equivalent to the supplied configuration parameters. Two or more KServe deployments are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two KServe deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent KServe deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing KServe deployment to retain its prediction URL while performing a rolling update to serve a new model version.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new KServe deployment
server for each new model version. If multiple equivalent KServe
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
the configuration of the model to be deployed with KServe. |
required |
replace |
bool |
set this flag to True to find and update an equivalent KServeDeployment server with the new model instead of starting a new deployment server. |
False |
timeout |
int |
the timeout in seconds to wait for the KServe server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the KServe server is provisioned, without waiting for it to fully start. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML KServe deployment service object that can be used to interact with the remote KServe server. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the KServe deployment server could not be stopped. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new KServe deployment or update an existing one.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new KServe
deployment server to reflect the model and other configuration
parameters specified in the supplied KServe deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing KServe deployment that is *equivalent* to the supplied
configuration parameters. Two or more KServe deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two KServe deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent KServe deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing KServe deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new KServe deployment
server for each new model version. If multiple equivalent KServe
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
config: the configuration of the model to be deployed with KServe.
replace: set this flag to True to find and update an equivalent
KServeDeployment server with the new model instead of
starting a new deployment server.
timeout: the timeout in seconds to wait for the KServe server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the KServe
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML KServe deployment service object that can be used to
interact with the remote KServe server.
Raises:
RuntimeError: if the KServe deployment server could not be stopped.
"""
with event_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(KServeDeploymentConfig, config)
service = None
# if the secret is passed in the config, use it to set the
# credentials
if config.secret_name:
self.config.secret = config.secret_name or self.config.secret
self._set_credentials()
# if replace is True, find equivalent KServe deployments
if replace is True:
equivalent_services = self.find_model_server(
running=False,
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for equivalent_service in equivalent_services:
if service is None:
# keep the most recently created service
service = equivalent_service
else:
try:
# delete the older services and don't wait for
# them to be deprovisioned
service.stop()
except RuntimeError as e:
raise RuntimeError(
"Failed to stop the KServe deployment "
"server:\n",
f"{e}\n",
"Please stop it manually and try again.",
)
if service:
# update an equivalent service in place
service.update(config)
logger.info(
f"Updating an existing KServe deployment service: {service}"
)
else:
# create a new service
service = KServeDeploymentService(config=config)
logger.info(
f"Creating a new KServe deployment service: {service}"
)
# start the service which in turn provisions the KServe
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.container is not None,
}
return service
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, predictor=None)
Find one or more KServe model services that match the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
bool |
If true, only running services will be returned. |
False |
service_uuid |
Optional[uuid.UUID] |
The UUID of the service that was originally used to deploy the model. |
None |
pipeline_name |
Optional[str] |
name of the pipeline that the deployed model was part of. |
None |
pipeline_run_id |
Optional[str] |
ID of the pipeline run which the deployed model was part of. |
None |
pipeline_step_name |
Optional[str] |
the name of the pipeline model deployment step that deployed the model. |
None |
model_name |
Optional[str] |
the name of the deployed model. |
None |
model_uri |
Optional[str] |
URI of the deployed model. |
None |
predictor |
Optional[str] |
the name of the predictor that was used to deploy the model. |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Service objects representing model servers that match the input search criteria. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
predictor: Optional[str] = None,
) -> List[BaseService]:
"""Find one or more KServe model services that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
predictor: the name of the predictor that was used to deploy the model.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
config = KServeDeploymentConfig(
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
model_uri=model_uri or "",
model_name=model_name or "",
predictor=predictor or "",
resources={},
)
labels = config.get_kubernetes_labels()
if service_uuid:
labels["zenml.service_uuid"] = str(service_uuid)
deployments = self.get_kserve_deployments(labels=labels)
services: List[BaseService] = []
for deployment in deployments:
# recreate the KServe deployment service object from the KServe
# deployment resource
service = KServeDeploymentService.create_from_deployment(
deployment=deployment
)
if running and not service.is_running:
# skip non-running services
continue
services.append(service)
return services
get_kserve_deployments(self, labels)
Get a list of KServe deployments that match the supplied labels.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
Dict[str, str] |
a dictionary of labels to match against KServe deployments. |
required |
Returns:
Type | Description |
---|---|
List[kserve.models.v1beta1_inference_service.V1beta1InferenceService] |
A list of KServe deployments that match the supplied labels. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if an operational failure is encountered while |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def get_kserve_deployments(
self, labels: Dict[str, str]
) -> List[V1beta1InferenceService]:
"""Get a list of KServe deployments that match the supplied labels.
Args:
labels: a dictionary of labels to match against KServe deployments.
Returns:
A list of KServe deployments that match the supplied labels.
Raises:
RuntimeError: if an operational failure is encountered while
"""
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
namespace = (
self.config.kubernetes_namespace
or utils.get_default_target_namespace()
)
try:
response = (
self.kserve_client.api_instance.list_namespaced_custom_object(
constants.KSERVE_GROUP,
constants.KSERVE_V1BETA1_VERSION,
namespace,
constants.KSERVE_PLURAL,
label_selector=label_selector,
)
)
except client.rest.ApiException as e:
raise RuntimeError(
"Exception when retrieving KServe inference services\
%s\n"
% e
)
# TODO[CRITICAL]: de-serialize each item into a complete
# V1beta1InferenceService object recursively using the OpenApi
# schema (this doesn't work right now)
inference_services: List[V1beta1InferenceService] = []
for item in response.get("items", []):
snake_case_item = self._camel_to_snake(item)
inference_service = V1beta1InferenceService(**snake_case_item)
inference_services.append(inference_service)
return inference_services
get_model_server_info(service_instance)
staticmethod
Return implementation specific information on the model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
KServeDeploymentService |
KServe deployment service object |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the model server information. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "KServeDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: KServe deployment service object
Returns:
A dictionary containing the model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"PREDICTION_HOSTNAME": service_instance.prediction_hostname,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
}
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
needs_docker_image = False
for step in deployment.steps.values():
if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
needs_docker_image = True
if needs_docker_image:
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(KSERVE_DOCKER_IMAGE_KEY, repo_digest)
start_model_server(self, uuid, timeout=300)
Start a KServe model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
since we don't support starting KServe model servers |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Start a KServe model deployment server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting KServe
model servers
"""
raise NotImplementedError(
"Starting KServe model servers is not implemented"
)
stop_model_server(self, uuid, timeout=300, force=False)
Stop a KServe model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
stopping on KServe model servers is not supported. |
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Stop a KServe model server.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping on KServe model servers is not
supported.
"""
raise NotImplementedError(
"Stopping KServe model servers is not implemented. Try "
"deleting the KServe model server instead."
)
secret_schemas
special
Initialization of Kserve Secret Schemas.
These are secret schemas that can be used to authenticate Kserve to the Artifact Store used to store served ML models.
secret_schemas
Implementation for KServe secret schemas.
KServeAzureSecretSchema (BaseSecretSchema)
pydantic-model
KServe Azure Blob Storage credentials.
Attributes:
Name | Type | Description |
---|---|---|
storage_type |
Literal['Azure'] |
the storage type. Must be set to "GCS" for this schema. |
credentials |
Optional[str] |
the credentials to use. |
Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeAzureSecretSchema(BaseSecretSchema):
"""KServe Azure Blob Storage credentials.
Attributes:
storage_type: the storage type. Must be set to "GCS" for this schema.
credentials: the credentials to use.
"""
TYPE: ClassVar[str] = KSERVE_AZUREBLOB_SECRET_SCHEMA_TYPE
storage_type: Literal["Azure"] = "Azure"
credentials: Optional[str]
KServeGSSecretSchema (BaseSecretSchema)
pydantic-model
KServe GCS credentials.
Attributes:
Name | Type | Description |
---|---|---|
storage_type |
Literal['GCS'] |
the storage type. Must be set to "GCS" for this schema. |
credentials |
Optional[str] |
the credentials to use. |
service_account |
Optional[str] |
the service account. |
Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeGSSecretSchema(BaseSecretSchema):
"""KServe GCS credentials.
Attributes:
storage_type: the storage type. Must be set to "GCS" for this schema.
credentials: the credentials to use.
service_account: the service account.
"""
TYPE: ClassVar[str] = KSERVE_GS_SECRET_SCHEMA_TYPE
storage_type: Literal["GCS"] = "GCS"
credentials: Optional[str]
service_account: Optional[str]
KServeS3SecretSchema (BaseSecretSchema)
pydantic-model
KServe S3 credentials.
Attributes:
Name | Type | Description |
---|---|---|
storage_type |
Literal['S3'] |
the storage type. Must be set to "s3" for this schema. |
credentials |
Optional[str] |
the credentials to use. |
service_account |
Optional[str] |
the name of the service account. |
s3_endpoint |
Optional[str] |
the S3 endpoint. |
s3_region |
Optional[str] |
the S3 region. |
s3_use_https |
Optional[str] |
whether to use HTTPS. |
s3_verify_ssl |
Optional[str] |
whether to verify SSL. |
Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeS3SecretSchema(BaseSecretSchema):
"""KServe S3 credentials.
Attributes:
storage_type: the storage type. Must be set to "s3" for this schema.
credentials: the credentials to use.
service_account: the name of the service account.
s3_endpoint: the S3 endpoint.
s3_region: the S3 region.
s3_use_https: whether to use HTTPS.
s3_verify_ssl: whether to verify SSL.
"""
TYPE: ClassVar[str] = KSERVE_S3_SECRET_SCHEMA_TYPE
storage_type: Literal["S3"] = "S3"
credentials: Optional[str]
service_account: Optional[str]
s3_endpoint: Optional[str]
s3_region: Optional[str]
s3_use_https: Optional[str]
s3_verify_ssl: Optional[str]
services
special
Initialization for KServe services.
kserve_deployment
Implementation for the KServe inference service.
KServeDeploymentConfig (ServiceConfig)
pydantic-model
KServe deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_uri |
str |
URI of the model (or models) to serve. |
model_name |
str |
the name of the model. Multiple versions of the same model should use the same model name. |
secret_name |
Optional[str] |
the name of the secret containing the model. |
predictor |
str |
the KServe predictor used to serve the model. The |
predictor |
type can be one of the following |
|
replicas |
int |
number of replicas to use for the prediction service. |
resources |
Optional[Dict[str, Any]] |
the Kubernetes resources to allocate for the prediction service. |
container |
Optional[Dict[str, Any]] |
the container to use for the custom prediction services. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentConfig(ServiceConfig):
"""KServe deployment service configuration.
Attributes:
model_uri: URI of the model (or models) to serve.
model_name: the name of the model. Multiple versions of the same model
should use the same model name.
secret_name: the name of the secret containing the model.
predictor: the KServe predictor used to serve the model. The
predictor type can be one of the following: `tensorflow`, `pytorch`,
`sklearn`, `xgboost`, `custom`.
replicas: number of replicas to use for the prediction service.
resources: the Kubernetes resources to allocate for the prediction service.
container: the container to use for the custom prediction services.
"""
model_uri: str = ""
model_name: str
secret_name: Optional[str]
predictor: str
replicas: int = 1
container: Optional[Dict[str, Any]]
resources: Optional[Dict[str, Any]]
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: The labels to sanitize.
"""
# TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
def get_kubernetes_labels(self) -> Dict[str, str]:
"""Generate the labels for the KServe inference CRD from the service configuration.
These labels are attached to the KServe inference service CRD
and may be used as label selectors in lookup operations.
Returns:
The labels for the KServe inference service CRD.
"""
labels = {"app": "zenml"}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.pipeline_run_id:
labels["zenml.pipeline_run_id"] = self.pipeline_run_id
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.predictor:
labels["zenml.model_type"] = self.predictor
self.sanitize_labels(labels)
return labels
def get_kubernetes_annotations(self) -> Dict[str, str]:
"""Generate the annotations for the KServe inference CRD the service configuration.
The annotations are used to store additional information about the
KServe ZenML service associated with the deployment that is
not available on the labels. One annotation is particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote KServe inference
service CRD.
Returns:
The annotations for the KServe inference service CRD.
"""
annotations = {
"zenml.service_config": self.json(),
"zenml.version": __version__,
}
return annotations
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentConfig":
"""Recreate a KServe service from a KServe deployment resource.
Args:
deployment: the KServe inference service CRD.
Returns:
The KServe ZenML service configuration corresponding to the given
KServe inference service CRD.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe ZenML service configuration.
"""
config_data = deployment.metadata.get("annotations").get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.parse_raw(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded KServe Inference Service resource contains an "
f"invalid or incompatible KServe ZenML service configuration: "
f"{config_data}"
) from e
return service_config
create_from_deployment(deployment)
classmethod
Recreate a KServe service from a KServe deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
V1beta1InferenceService |
the KServe inference service CRD. |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentConfig |
The KServe ZenML service configuration corresponding to the given KServe inference service CRD. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe ZenML service configuration. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentConfig":
"""Recreate a KServe service from a KServe deployment resource.
Args:
deployment: the KServe inference service CRD.
Returns:
The KServe ZenML service configuration corresponding to the given
KServe inference service CRD.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe ZenML service configuration.
"""
config_data = deployment.metadata.get("annotations").get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.parse_raw(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded KServe Inference Service resource contains an "
f"invalid or incompatible KServe ZenML service configuration: "
f"{config_data}"
) from e
return service_config
get_kubernetes_annotations(self)
Generate the annotations for the KServe inference CRD the service configuration.
The annotations are used to store additional information about the KServe ZenML service associated with the deployment that is not available on the labels. One annotation is particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote KServe inference service CRD.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The annotations for the KServe inference service CRD. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_annotations(self) -> Dict[str, str]:
"""Generate the annotations for the KServe inference CRD the service configuration.
The annotations are used to store additional information about the
KServe ZenML service associated with the deployment that is
not available on the labels. One annotation is particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote KServe inference
service CRD.
Returns:
The annotations for the KServe inference service CRD.
"""
annotations = {
"zenml.service_config": self.json(),
"zenml.version": __version__,
}
return annotations
get_kubernetes_labels(self)
Generate the labels for the KServe inference CRD from the service configuration.
These labels are attached to the KServe inference service CRD and may be used as label selectors in lookup operations.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The labels for the KServe inference service CRD. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_labels(self) -> Dict[str, str]:
"""Generate the labels for the KServe inference CRD from the service configuration.
These labels are attached to the KServe inference service CRD
and may be used as label selectors in lookup operations.
Returns:
The labels for the KServe inference service CRD.
"""
labels = {"app": "zenml"}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.pipeline_run_id:
labels["zenml.pipeline_run_id"] = self.pipeline_run_id
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.predictor:
labels["zenml.model_type"] = self.predictor
self.sanitize_labels(labels)
return labels
sanitize_labels(labels)
staticmethod
Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
Dict[str, str] |
The labels to sanitize. |
required |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: The labels to sanitize.
"""
# TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
KServeDeploymentService (BaseService)
pydantic-model
A ZenML service that represents a KServe inference service CRD.
Attributes:
Name | Type | Description |
---|---|---|
config |
KServeDeploymentConfig |
service configuration. |
status |
ServiceStatus |
service status. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentService(BaseService):
"""A ZenML service that represents a KServe inference service CRD.
Attributes:
config: service configuration.
status: service status.
"""
SERVICE_TYPE = ServiceType(
name="kserve-deployment",
type="model-serving",
flavor="kserve",
description="KServe inference service",
)
config: KServeDeploymentConfig = Field(
default_factory=KServeDeploymentConfig
)
status: ServiceStatus = Field(default_factory=ServiceStatus)
def _get_model_deployer(self) -> "KServeModelDeployer":
"""Get the active KServe model deployer.
Returns:
The active KServeModelDeployer.
"""
from zenml.integrations.kserve.model_deployers.kserve_model_deployer import (
KServeModelDeployer,
)
return cast(
KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
)
def _get_client(self) -> KServeClient:
"""Get the KServe client from the active KServe model deployer.
Returns:
The KServe client.
"""
return self._get_model_deployer().kserve_client
def _get_namespace(self) -> Optional[str]:
"""Get the Kubernetes namespace from the active KServe model deployer.
Returns:
The Kubernetes namespace, or None, if the default namespace is
used.
"""
return self._get_model_deployer().config.kubernetes_namespace
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the state of the KServe inference service.
This method Checks the current operational state of the external KServe
inference service and translate it into a `ServiceState` value and a printable message.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
The operational state of the external service and a message
providing additional information about that state (e.g. a
description of the error if one is encountered while checking the
service status).
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
try:
deployment = client.get(name=name, namespace=namespace)
except RuntimeError:
return (ServiceState.INACTIVE, "")
# TODO[MEDIUM]: Implement better operational status checking that also
# cover errors
if "status" not in deployment:
return (ServiceState.INACTIVE, "No operational status available")
status = "Unknown"
for condition in deployment["status"].get("conditions", {}):
if condition.get("type", "") == "PredictorReady":
status = condition.get("status", "Unknown")
if status.lower() == "true":
return (
ServiceState.ACTIVE,
f"Inference service '{name}' is available",
)
elif status.lower() == "false":
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
)
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' still starting up",
)
@property
def crd_name(self) -> str:
"""Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.
Returns:
The name of the KServe inference service CRD.
"""
return (
self._get_kubernetes_labels().get("zenml.model_name")
or f"zenml-{str(self.uuid)[:8]}"
)
def _get_kubernetes_labels(self) -> Dict[str, str]:
"""Generate the labels for the KServe inference service CRD from the service configuration.
Returns:
The labels for the KServe inference service.
"""
labels = self.config.get_kubernetes_labels()
labels["zenml.service_uuid"] = str(self.uuid)
KServeDeploymentConfig.sanitize_labels(labels)
return labels
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentService":
"""Recreate the configuration of a KServe Service from a deployed instance.
Args:
deployment: the KServe deployment resource.
Returns:
The KServe service configuration corresponding to the given
KServe deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe service configuration.
"""
config = KServeDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
def provision(self) -> None:
"""Provision or update remote KServe deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
namespace = self._get_namespace()
api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
name = self.crd_name
# All supported model specs seem to have the same fields
# so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
if self.config.container is not None:
predictor_kwargs = {
"containers": [
k8s_client.V1Container(
name=self.config.container.get("name"),
image=self.config.container.get("image"),
command=self.config.container.get("command"),
args=self.config.container.get("args"),
env=[
k8s_client.V1EnvVar(
name="STORAGE_URI",
value=self.config.container.get("storage_uri"),
)
],
)
]
}
else:
predictor_kwargs = {
self.config.predictor: V1beta1PredictorExtensionSpec(
storage_uri=self.config.model_uri,
resources=self.config.resources,
)
}
isvc = V1beta1InferenceService(
api_version=api_version,
kind=constants.KSERVE_KIND,
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=namespace,
labels=self._get_kubernetes_labels(),
annotations=self.config.get_kubernetes_annotations(),
),
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(**predictor_kwargs)
),
)
# TODO[HIGH]: better error handling when provisioning KServe instances
try:
client.get(name=name, namespace=namespace)
# update the existing deployment
client.replace(name, isvc, namespace=namespace)
except RuntimeError:
client.create(isvc)
def deprovision(self, force: bool = False) -> None:
"""Deprovisions all resources used by the service.
Args:
force: if True, the service will be deprovisioned even if it is
still in use.
Raises:
ValueError: if the service is still in use and force is False.
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
# TODO[HIGH]: catch errors if deleting a KServe instance that is no
# longer available
try:
client.delete(name=name, namespace=namespace)
except RuntimeError:
raise ValueError(
f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
)
def _get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a KServe deployment resource.
Args:
name: the name of the KServe deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Raises:
Exception: if an unknown error occurs while fetching the logs.
Yields:
The logs of the given deployment.
"""
client = self._get_client()
namespace = self._get_namespace()
logger.debug(f"Retrieving logs for InferenceService resource: {name}")
try:
response = client.core_api.list_namespaced_pod(
namespace=namespace,
label_selector=f"zenml.service_uuid={self.uuid}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise Exception(
f"The KServe deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name
containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}
container = "default"
if container not in containers:
container = containers[0]
if not container_statuses[container]:
container = init_containers[0]
logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{namespace}`"
)
response = client.core_api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for InferenceService resource "
"%s: %s",
name,
str(e),
)
raise Exception(
f"Unexpected exception when fetching logs for InferenceService "
f"resource: {name}"
) from e
try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the logs from the remote KServe inference service instance.
Args:
follow: if True, the logs will be streamed as they are written.
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_deployment_logs(
self.crd_name,
follow=follow,
tail=tail,
)
@property
def prediction_url(self) -> Optional[str]:
"""The prediction URI exposed by the prediction service.
Returns:
The prediction URI exposed by the prediction service, or None if
the service is not yet ready.
"""
if not self.is_running:
return None
model_deployer = self._get_model_deployer()
return os.path.join(
model_deployer.config.base_url,
"v1/models",
f"{self.crd_name}:predict",
)
@property
def prediction_hostname(self) -> Optional[str]:
"""The prediction hostname exposed by the prediction service.
Returns:
The prediction hostname exposed by the prediction service status
that will be used in the headers of the prediction request.
"""
if not self.is_running:
return None
namespace = self._get_namespace()
model_deployer = self._get_model_deployer()
custom_domain = model_deployer.config.custom_domain or "example.com"
return f"{self.crd_name}.{namespace}.{custom_domain}"
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a NumPy array representing the request
Returns:
A NumPy array represents the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"KServe prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if self.prediction_hostname is None:
raise ValueError(
"`self.prediction_hostname` is not set, cannot post."
)
headers = {"Host": self.prediction_hostname}
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post(
self.prediction_url,
headers=headers,
json={"instances": request},
)
response.raise_for_status()
return response.json()
crd_name: str
property
readonly
Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.
Returns:
Type | Description |
---|---|
str |
The name of the KServe inference service CRD. |
prediction_hostname: Optional[str]
property
readonly
The prediction hostname exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction hostname exposed by the prediction service status that will be used in the headers of the prediction request. |
prediction_url: Optional[str]
property
readonly
The prediction URI exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction URI exposed by the prediction service, or None if the service is not yet ready. |
check_status(self)
Check the state of the KServe inference service.
This method Checks the current operational state of the external KServe
inference service and translate it into a ServiceState
value and a printable message.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status). |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the state of the KServe inference service.
This method Checks the current operational state of the external KServe
inference service and translate it into a `ServiceState` value and a printable message.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
The operational state of the external service and a message
providing additional information about that state (e.g. a
description of the error if one is encountered while checking the
service status).
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
try:
deployment = client.get(name=name, namespace=namespace)
except RuntimeError:
return (ServiceState.INACTIVE, "")
# TODO[MEDIUM]: Implement better operational status checking that also
# cover errors
if "status" not in deployment:
return (ServiceState.INACTIVE, "No operational status available")
status = "Unknown"
for condition in deployment["status"].get("conditions", {}):
if condition.get("type", "") == "PredictorReady":
status = condition.get("status", "Unknown")
if status.lower() == "true":
return (
ServiceState.ACTIVE,
f"Inference service '{name}' is available",
)
elif status.lower() == "false":
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
)
return (
ServiceState.PENDING_STARTUP,
f"Inference service '{name}' still starting up",
)
create_from_deployment(deployment)
classmethod
Recreate the configuration of a KServe Service from a deployed instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
V1beta1InferenceService |
the KServe deployment resource. |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentService |
The KServe service configuration corresponding to the given KServe deployment resource. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe service configuration. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentService":
"""Recreate the configuration of a KServe Service from a deployed instance.
Args:
deployment: the KServe deployment resource.
Returns:
The KServe service configuration corresponding to the given
KServe deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible KServe service configuration.
"""
config = KServeDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
deprovision(self, force=False)
Deprovisions all resources used by the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the service will be deprovisioned even if it is still in use. |
False |
Exceptions:
Type | Description |
---|---|
ValueError |
if the service is still in use and force is False. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def deprovision(self, force: bool = False) -> None:
"""Deprovisions all resources used by the service.
Args:
force: if True, the service will be deprovisioned even if it is
still in use.
Raises:
ValueError: if the service is still in use and force is False.
"""
client = self._get_client()
namespace = self._get_namespace()
name = self.crd_name
# TODO[HIGH]: catch errors if deleting a KServe instance that is no
# longer available
try:
client.delete(name=name, namespace=namespace)
except RuntimeError:
raise ValueError(
f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
)
get_logs(self, follow=False, tail=None)
Retrieve the logs from the remote KServe inference service instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written. |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the logs from the remote KServe inference service instance.
Args:
follow: if True, the logs will be streamed as they are written.
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_deployment_logs(
self.crd_name,
follow=follow,
tail=tail,
)
predict(self, request)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
str |
a NumPy array representing the request |
required |
Returns:
Type | Description |
---|---|
Any |
A NumPy array represents the prediction returned by the service. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not yet ready. |
ValueError |
if the prediction_url is not set. |
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a NumPy array representing the request
Returns:
A NumPy array represents the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"KServe prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if self.prediction_hostname is None:
raise ValueError(
"`self.prediction_hostname` is not set, cannot post."
)
headers = {"Host": self.prediction_hostname}
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post(
self.prediction_url,
headers=headers,
json={"instances": request},
)
response.raise_for_status()
return response.json()
provision(self)
Provision or update remote KServe deployment instance.
This should then match the current configuration.
Source code in zenml/integrations/kserve/services/kserve_deployment.py
def provision(self) -> None:
"""Provision or update remote KServe deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
namespace = self._get_namespace()
api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
name = self.crd_name
# All supported model specs seem to have the same fields
# so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
if self.config.container is not None:
predictor_kwargs = {
"containers": [
k8s_client.V1Container(
name=self.config.container.get("name"),
image=self.config.container.get("image"),
command=self.config.container.get("command"),
args=self.config.container.get("args"),
env=[
k8s_client.V1EnvVar(
name="STORAGE_URI",
value=self.config.container.get("storage_uri"),
)
],
)
]
}
else:
predictor_kwargs = {
self.config.predictor: V1beta1PredictorExtensionSpec(
storage_uri=self.config.model_uri,
resources=self.config.resources,
)
}
isvc = V1beta1InferenceService(
api_version=api_version,
kind=constants.KSERVE_KIND,
metadata=k8s_client.V1ObjectMeta(
name=name,
namespace=namespace,
labels=self._get_kubernetes_labels(),
annotations=self.config.get_kubernetes_annotations(),
),
spec=V1beta1InferenceServiceSpec(
predictor=V1beta1PredictorSpec(**predictor_kwargs)
),
)
# TODO[HIGH]: better error handling when provisioning KServe instances
try:
client.get(name=name, namespace=namespace)
# update the existing deployment
client.replace(name, isvc, namespace=namespace)
except RuntimeError:
client.create(isvc)
steps
special
Initialization for KServe steps.
kserve_deployer
Implementation of the KServe Deployer step.
CustomDeployParameters (BaseModel)
pydantic-model
Custom model deployer step extra parameters.
Attributes:
Name | Type | Description |
---|---|---|
predict_function |
str |
Path to Python file containing predict function. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class CustomDeployParameters(BaseModel):
"""Custom model deployer step extra parameters.
Attributes:
predict_function: Path to Python file containing predict function.
"""
predict_function: str
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
"""Validate predict function.
Args:
predict_func_path: predict function path
Returns:
predict function path
Raises:
ValueError: if predict function path is not valid
TypeError: if predict function path is not a callable function
"""
try:
predict_function = import_class_by_path(predict_func_path)
except AttributeError:
raise ValueError("Predict function can't be found.")
if not callable(predict_function):
raise TypeError("Predict function must be callable.")
return predict_func_path
predict_function_validate(predict_func_path)
classmethod
Validate predict function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predict_func_path |
str |
predict function path |
required |
Returns:
Type | Description |
---|---|
str |
predict function path |
Exceptions:
Type | Description |
---|---|
ValueError |
if predict function path is not valid |
TypeError |
if predict function path is not a callable function |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
"""Validate predict function.
Args:
predict_func_path: predict function path
Returns:
predict function path
Raises:
ValueError: if predict function path is not valid
TypeError: if predict function path is not a callable function
"""
try:
predict_function = import_class_by_path(predict_func_path)
except AttributeError:
raise ValueError("Predict function can't be found.")
if not callable(predict_function):
raise TypeError("Predict function must be callable.")
return predict_func_path
KServeDeployerStepParameters (BaseParameters)
pydantic-model
KServe model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
KServeDeploymentConfig |
KServe deployment service configuration. |
torch_serve_params |
TorchServe set of parameters to deploy model. |
|
timeout |
int |
Timeout for model deployment. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
"""KServe model deployer step parameters.
Attributes:
service_config: KServe deployment service configuration.
torch_serve_params: TorchServe set of parameters to deploy model.
timeout: Timeout for model deployment.
"""
service_config: KServeDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
torch_serve_parameters: Optional[TorchServeParameters] = None
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
TorchServeParameters (BaseModel)
pydantic-model
KServe PyTorch model deployer step configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_class |
str |
Path to Python file containing model architecture. |
handler |
str |
TorchServe's handler file to handle custom TorchServe inference logic. |
extra_files |
Optional[List[str]] |
Comma separated path to extra dependency files. |
model_version |
Optional[str] |
Model version. |
requirements_file |
Optional[str] |
Path to requirements file. |
torch_config |
Optional[str] |
TorchServe configuration file path. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class TorchServeParameters(BaseModel):
"""KServe PyTorch model deployer step configuration.
Attributes:
model_class: Path to Python file containing model architecture.
handler: TorchServe's handler file to handle custom TorchServe inference
logic.
extra_files: Comma separated path to extra dependency files.
model_version: Model version.
requirements_file: Path to requirements file.
torch_config: TorchServe configuration file path.
"""
model_class: str
handler: str
extra_files: Optional[List[str]] = None
requirements_file: Optional[str] = None
model_version: Optional[str] = "1.0"
torch_config: Optional[str] = None
@validator("model_class")
def model_class_validate(cls, v: str) -> str:
"""Validate model class file path.
Args:
v: model class file path
Returns:
model class file path
Raises:
ValueError: if model class file path is not valid
"""
if not v:
raise ValueError("Model class file path is required.")
if not is_inside_repository(v):
raise ValueError(
"Model class file path must be inside the repository."
)
return v
@validator("handler")
def handler_validate(cls, v: str) -> str:
"""Validate handler.
Args:
v: handler file path
Returns:
handler file path
Raises:
ValueError: if handler file path is not valid
"""
if v:
if v in TORCH_HANDLERS:
return v
elif is_inside_repository(v):
return v
else:
raise ValueError(
"Handler must be one of the TorchServe handlers",
"or a file that exists inside the repository.",
)
else:
raise ValueError("Handler is required.")
@validator("extra_files")
def extra_files_validate(
cls, v: Optional[List[str]]
) -> Optional[List[str]]:
"""Validate extra files.
Args:
v: extra files path
Returns:
extra files path
Raises:
ValueError: if the extra files path is not valid
"""
extra_files = []
if v is not None:
for file_path in v:
if is_inside_repository(file_path):
extra_files.append(file_path)
else:
raise ValueError(
"Extra file path must be inside the repository."
)
return extra_files
return v
@validator("torch_config")
def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
"""Validate torch config file.
Args:
v: torch config file path
Returns:
torch config file path
Raises:
ValueError: if torch config file path is not valid.
"""
if v:
if is_inside_repository(v):
return v
else:
raise ValueError(
"Torch config file path must be inside the repository."
)
return v
extra_files_validate(v)
classmethod
Validate extra files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
Optional[List[str]] |
extra files path |
required |
Returns:
Type | Description |
---|---|
Optional[List[str]] |
extra files path |
Exceptions:
Type | Description |
---|---|
ValueError |
if the extra files path is not valid |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("extra_files")
def extra_files_validate(
cls, v: Optional[List[str]]
) -> Optional[List[str]]:
"""Validate extra files.
Args:
v: extra files path
Returns:
extra files path
Raises:
ValueError: if the extra files path is not valid
"""
extra_files = []
if v is not None:
for file_path in v:
if is_inside_repository(file_path):
extra_files.append(file_path)
else:
raise ValueError(
"Extra file path must be inside the repository."
)
return extra_files
return v
handler_validate(v)
classmethod
Validate handler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
handler file path |
required |
Returns:
Type | Description |
---|---|
str |
handler file path |
Exceptions:
Type | Description |
---|---|
ValueError |
if handler file path is not valid |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("handler")
def handler_validate(cls, v: str) -> str:
"""Validate handler.
Args:
v: handler file path
Returns:
handler file path
Raises:
ValueError: if handler file path is not valid
"""
if v:
if v in TORCH_HANDLERS:
return v
elif is_inside_repository(v):
return v
else:
raise ValueError(
"Handler must be one of the TorchServe handlers",
"or a file that exists inside the repository.",
)
else:
raise ValueError("Handler is required.")
model_class_validate(v)
classmethod
Validate model class file path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
str |
model class file path |
required |
Returns:
Type | Description |
---|---|
str |
model class file path |
Exceptions:
Type | Description |
---|---|
ValueError |
if model class file path is not valid |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("model_class")
def model_class_validate(cls, v: str) -> str:
"""Validate model class file path.
Args:
v: model class file path
Returns:
model class file path
Raises:
ValueError: if model class file path is not valid
"""
if not v:
raise ValueError("Model class file path is required.")
if not is_inside_repository(v):
raise ValueError(
"Model class file path must be inside the repository."
)
return v
torch_config_validate(v)
classmethod
Validate torch config file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
Optional[str] |
torch config file path |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
torch config file path |
Exceptions:
Type | Description |
---|---|
ValueError |
if torch config file path is not valid. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("torch_config")
def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
"""Validate torch config file.
Args:
v: torch config file path
Returns:
torch config file path
Raises:
ValueError: if torch config file path is not valid.
"""
if v:
if is_inside_repository(v):
return v
else:
raise ValueError(
"Torch config file path must be inside the repository."
)
return v
kserve_custom_model_deployer_step (BaseStep)
KServe custom model deployer pipeline step.
This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
model |
the model artifact to deploy |
required | |
context |
the step context |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the custom deployer parameters is not defined |
DoesNotExistException |
if no active stack is found |
Returns:
Type | Description |
---|---|
KServe deployment service |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
KServe model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
KServeDeploymentConfig |
KServe deployment service configuration. |
torch_serve_params |
TorchServe set of parameters to deploy model. |
|
timeout |
int |
Timeout for model deployment. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
"""KServe model deployer step parameters.
Attributes:
service_config: KServe deployment service configuration.
torch_serve_params: TorchServe set of parameters to deploy model.
timeout: Timeout for model deployment.
"""
service_config: KServeDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
torch_serve_parameters: Optional[TorchServeParameters] = None
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model)
staticmethod
KServe custom model deployer pipeline step.
This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
KServeDeployerStepParameters |
parameters for the deployer step |
required |
model |
UnmaterializedArtifact |
the model artifact to deploy |
required |
context |
StepContext |
the step context |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the custom deployer parameters is not defined |
DoesNotExistException |
if no active stack is found |
Returns:
Type | Description |
---|---|
KServeDeploymentService |
KServe deployment service |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False, extra={KSERVE_CUSTOM_DEPLOYMENT: True})
def kserve_custom_model_deployer_step(
deploy_decision: bool,
params: KServeDeployerStepParameters,
context: StepContext,
model: UnmaterializedArtifact,
) -> KServeDeploymentService:
"""KServe custom model deployer pipeline step.
This step can be used in a pipeline to implement the
process required to deploy a custom model with KServe.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
model: the model artifact to deploy
context: the step context
Raises:
ValueError: if the custom deployer parameters is not defined
DoesNotExistException: if no active stack is found
Returns:
KServe deployment service
"""
# verify that a custom deployer is defined
if not params.custom_deploy_parameters:
raise ValueError(
"Custom deploy parameter which contains the path of the",
"custom predict function is required for custom model deployment.",
)
# get the active model deployer
model_deployer = cast(
KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name, run id
step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
pipeline_name = step_env.pipeline_name
run_name = step_env.run_name
step_name = step_env.step_name
# update the step configuration with the real pipeline runtime information
params.service_config.pipeline_name = pipeline_name
params.service_config.pipeline_run_id = run_name
params.service_config.pipeline_step_name = step_name
# fetch existing services with same pipeline name, step name and
# model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.service_config.model_name,
)
# even when the deploy decision is negative if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing the last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.service_config.model_name}'..."
)
service = cast(KServeDeploymentService, existing_services[0])
# even when the deploy decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure that
# a model server is available at all times
if not service.is_running:
service.start(timeout=params.timeout)
return service
# entrypoint for starting KServe server deployment for custom model
entrypoint_command = [
"python",
"-m",
"zenml.integrations.kserve.custom_deployer.zenml_custom_model",
"--model_name",
params.service_config.model_name,
"--predict_func",
params.custom_deploy_parameters.predict_function,
]
# verify if there is an active stack before starting the service
if not context.stack:
raise DoesNotExistException(
"No active stack is available. "
"Please make sure that you have registered and set a stack."
)
context.stack
docker_image = step_env.step_run_info.pipeline.extra[
KSERVE_DOCKER_IMAGE_KEY
]
# copy the model files to a new specific directory for the deployment
served_model_uri = os.path.join(context.get_output_artifact_uri(), "kserve")
fileio.makedirs(served_model_uri)
io_utils.copy_dir(model.uri, served_model_uri)
# save the model artifact metadata to the YAML file and copy it to the
# deployment directory
model_metadata_file = save_model_metadata(model)
fileio.copy(
model_metadata_file,
os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
)
# prepare the service configuration for the deployment
service_config = params.service_config.copy()
service_config.model_uri = served_model_uri
# Prepare container config for custom model deployment
service_config.container = {
"name": service_config.model_name,
"image": docker_image,
"command": entrypoint_command,
"storage_uri": service_config.model_uri,
}
# deploy the service
service = cast(
KServeDeploymentService,
model_deployer.deploy_model(
service_config, replace=True, timeout=params.timeout
),
)
logger.info(
f"KServe deployment service started and reachable at:\n"
f" {service.prediction_url}\n"
f" With the hostname: {service.prediction_hostname}."
)
return service
kserve_model_deployer_step (BaseStep)
KServe model deployer pipeline step.
This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
model |
the model artifact to deploy |
required | |
context |
the step context |
required |
Returns:
Type | Description |
---|---|
KServe deployment service |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
KServe model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
KServeDeploymentConfig |
KServe deployment service configuration. |
torch_serve_params |
TorchServe set of parameters to deploy model. |
|
timeout |
int |
Timeout for model deployment. |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
"""KServe model deployer step parameters.
Attributes:
service_config: KServe deployment service configuration.
torch_serve_params: TorchServe set of parameters to deploy model.
timeout: Timeout for model deployment.
"""
service_config: KServeDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
torch_serve_parameters: Optional[TorchServeParameters] = None
timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model)
staticmethod
KServe model deployer pipeline step.
This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
KServeDeployerStepParameters |
parameters for the deployer step |
required |
model |
UnmaterializedArtifact |
the model artifact to deploy |
required |
context |
StepContext |
the step context |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentService |
KServe deployment service |
Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False)
def kserve_model_deployer_step(
deploy_decision: bool,
params: KServeDeployerStepParameters,
context: StepContext,
model: UnmaterializedArtifact,
) -> KServeDeploymentService:
"""KServe model deployer pipeline step.
This step can be used in a pipeline to implement continuous
deployment for an ML model with KServe.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
model: the model artifact to deploy
context: the step context
Returns:
KServe deployment service
"""
model_deployer = cast(
KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name and run id
step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
pipeline_name = step_env.pipeline_name
run_name = step_env.run_name
step_name = step_env.step_name
# update the step configuration with the real pipeline runtime information
params.service_config.pipeline_name = pipeline_name
params.service_config.pipeline_run_id = run_name
params.service_config.pipeline_step_name = step_name
# fetch existing services with same pipeline name, step name and
# model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.service_config.model_name,
)
# even when the deploy decision is negative if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing the last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.service_config.model_name}'..."
)
service = cast(KServeDeploymentService, existing_services[0])
# even when the deploy decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure that
# a model server is available at all times
if not service.is_running:
service.start(timeout=params.timeout)
return service
# invoke the KServe model deployer to create a new service
# or update an existing one that was previously deployed for the same
# model
if params.service_config.predictor == "pytorch":
# import the prepare function from the step utils
from zenml.integrations.kserve.steps.kserve_step_utils import (
prepare_torch_service_config,
)
# prepare the service config
service_config = prepare_torch_service_config(
model_uri=model.uri,
output_artifact_uri=context.get_output_artifact_uri(),
params=params,
)
else:
# import the prepare function from the step utils
from zenml.integrations.kserve.steps.kserve_step_utils import (
prepare_service_config,
)
# prepare the service config
service_config = prepare_service_config(
model_uri=model.uri,
output_artifact_uri=context.get_output_artifact_uri(),
params=params,
)
service = cast(
KServeDeploymentService,
model_deployer.deploy_model(
service_config, replace=True, timeout=params.timeout
),
)
logger.info(
f"KServe deployment service started and reachable at:\n"
f" {service.prediction_url}\n"
f" With the hostname: {service.prediction_hostname}."
)
return service
kserve_step_utils
This module contains the utility functions used by the KServe deployer step.
TorchModelArchiver (BaseModel)
pydantic-model
Model Archiver for PyTorch models.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
Model name. |
model_version |
Model version. |
|
serialized_file |
str |
Serialized model file. |
handler |
str |
TorchServe's handler file to handle custom TorchServe inference logic. |
extra_files |
Optional[List[str]] |
Comma separated path to extra dependency files. |
requirements_file |
Optional[str] |
Path to requirements file. |
export_path |
str |
Path to export model. |
runtime |
Optional[str] |
Runtime of the model. |
force |
Optional[bool] |
Force export of the model. |
archive_format |
Optional[str] |
Archive format. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
class TorchModelArchiver(BaseModel):
"""Model Archiver for PyTorch models.
Attributes:
model_name: Model name.
model_version: Model version.
serialized_file: Serialized model file.
handler: TorchServe's handler file to handle custom TorchServe inference logic.
extra_files: Comma separated path to extra dependency files.
requirements_file: Path to requirements file.
export_path: Path to export model.
runtime: Runtime of the model.
force: Force export of the model.
archive_format: Archive format.
"""
model_name: str
serialized_file: str
model_file: str
handler: str
export_path: str
extra_files: Optional[List[str]] = None
version: Optional[str] = None
requirements_file: Optional[str] = None
runtime: Optional[str] = "python"
force: Optional[bool] = None
archive_format: Optional[str] = "default"
generate_model_deployer_config(model_name, directory)
Generate a model deployer config.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
the name of the model |
required |
directory |
str |
the directory where the model is stored |
required |
Returns:
Type | Description |
---|---|
str |
None |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def generate_model_deployer_config(
model_name: str,
directory: str,
) -> str:
"""Generate a model deployer config.
Args:
model_name: the name of the model
directory: the directory where the model is stored
Returns:
None
"""
config_lines = [
"inference_address=http://0.0.0.0:8085",
"management_address=http://0.0.0.0:8085",
"metrics_address=http://0.0.0.0:8082",
"grpc_inference_port=7070",
"grpc_management_port=7071",
"enable_metrics_api=true",
"metrics_format=prometheus",
"number_of_netty_threads=4",
"job_queue_size=10",
"enable_envvars_config=true",
"install_py_dep_per_model=true",
"model_store=/mnt/models/model-store",
]
with tempfile.NamedTemporaryFile(
suffix=".properties", mode="w+", dir=directory, delete=False
) as f:
for line in config_lines:
f.write(line + "\n")
f.write(
f'model_snapshot={{"name":"startup.cfg","modelCount":1,"models":{{"{model_name}":{{"1.0":{{"defaultVersion":true,"marName":"{model_name}.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"maxBatchDelay":10,"responseTimeout":120}}}}}}}}'
)
f.close()
return f.name
prepare_service_config(model_uri, output_artifact_uri, params)
Prepare the model files for model serving.
This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_uri |
str |
the URI of the model artifact being served |
required |
output_artifact_uri |
str |
the URI of the output artifact |
required |
params |
KServeDeployerStepParameters |
the KServe deployer step parameters |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentConfig |
The URL to the model is ready for serving. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model files cannot be prepared. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_service_config(
model_uri: str,
output_artifact_uri: str,
params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
"""Prepare the model files for model serving.
This function ensures that the model files are in the correct format
and file structure required by the KServe server implementation
used for model serving.
Args:
model_uri: the URI of the model artifact being served
output_artifact_uri: the URI of the output artifact
params: the KServe deployer step parameters
Returns:
The URL to the model is ready for serving.
Raises:
RuntimeError: if the model files cannot be prepared.
"""
served_model_uri = os.path.join(output_artifact_uri, "kserve")
fileio.makedirs(served_model_uri)
# TODO [ENG-773]: determine how to formalize how models are organized into
# folders and sub-folders depending on the model type/format and the
# KServe protocol used to serve the model.
# TODO [ENG-791]: an auto-detect built-in KServe server implementation
# from the model artifact type
# TODO [ENG-792]: validate the model artifact type against the
# supported built-in KServe server implementations
if params.service_config.predictor == "tensorflow":
# the TensorFlow server expects model artifacts to be
# stored in numbered subdirectories, each representing a model
# version
served_model_uri = os.path.join(
served_model_uri,
params.service_config.predictor,
params.service_config.model_name,
)
fileio.makedirs(served_model_uri)
io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
elif params.service_config.predictor == "sklearn":
# the sklearn server expects model artifacts to be
# stored in a file called model.joblib
model_uri = os.path.join(model_uri, "model")
if not fileio.exists(model_uri):
raise RuntimeError(
f"Expected sklearn model artifact was not found at "
f"{model_uri}"
)
served_model_uri = os.path.join(
served_model_uri,
params.service_config.predictor,
params.service_config.model_name,
)
fileio.makedirs(served_model_uri)
fileio.copy(model_uri, os.path.join(served_model_uri, "model.joblib"))
else:
# default treatment for all other server implementations is to
# simply reuse the model from the artifact store path where it
# is originally stored
served_model_uri = os.path.join(
served_model_uri,
params.service_config.predictor,
params.service_config.model_name,
)
fileio.makedirs(served_model_uri)
fileio.copy(model_uri, served_model_uri)
service_config = params.service_config.copy()
service_config.model_uri = served_model_uri
return service_config
prepare_torch_service_config(model_uri, output_artifact_uri, params)
Prepare the PyTorch model files for model serving.
This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_uri |
str |
the URI of the model artifact being served |
required |
output_artifact_uri |
str |
the URI of the output artifact |
required |
params |
KServeDeployerStepParameters |
the KServe deployer step parameters |
required |
Returns:
Type | Description |
---|---|
KServeDeploymentConfig |
The URL to the model is ready for serving. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model files cannot be prepared. |
Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_torch_service_config(
model_uri: str,
output_artifact_uri: str,
params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
"""Prepare the PyTorch model files for model serving.
This function ensures that the model files are in the correct format
and file structure required by the KServe server implementation
used for model serving.
Args:
model_uri: the URI of the model artifact being served
output_artifact_uri: the URI of the output artifact
params: the KServe deployer step parameters
Returns:
The URL to the model is ready for serving.
Raises:
RuntimeError: if the model files cannot be prepared.
"""
deployment_folder_uri = os.path.join(output_artifact_uri, "kserve")
served_model_uri = os.path.join(deployment_folder_uri, "model-store")
config_propreties_uri = os.path.join(deployment_folder_uri, "config")
fileio.makedirs(served_model_uri)
fileio.makedirs(config_propreties_uri)
if params.torch_serve_parameters is None:
raise RuntimeError("No torch serve parameters provided")
else:
# Create a temporary folder
temp_dir = tempfile.mkdtemp(prefix="zenml-pytorch-temp-")
tmp_model_uri = os.path.join(
str(temp_dir), f"{params.service_config.model_name}.pt"
)
# Copy from artifact store to temporary file
fileio.copy(f"{model_uri}/checkpoint.pt", tmp_model_uri)
torch_archiver_args = TorchModelArchiver(
model_name=params.service_config.model_name,
serialized_file=tmp_model_uri,
model_file=params.torch_serve_parameters.model_class,
handler=params.torch_serve_parameters.handler,
export_path=temp_dir,
version=params.torch_serve_parameters.model_version,
)
manifest = ModelExportUtils.generate_manifest_json(torch_archiver_args)
package_model(torch_archiver_args, manifest=manifest)
# Copy from temporary file to artifact store
archived_model_uri = os.path.join(
temp_dir, f"{params.service_config.model_name}.mar"
)
if not fileio.exists(archived_model_uri):
raise RuntimeError(
f"Expected torch archived model artifact was not found at "
f"{archived_model_uri}"
)
# Copy the torch model archive artifact to the model store
fileio.copy(
archived_model_uri,
os.path.join(
served_model_uri, f"{params.service_config.model_name}.mar"
),
)
# Get or Generate the config file
if params.torch_serve_parameters.torch_config:
# Copy the torch model config to the model store
fileio.copy(
params.torch_serve_parameters.torch_config,
os.path.join(config_propreties_uri, "config.properties"),
)
else:
# Generate the config file
config_file_uri = generate_model_deployer_config(
model_name=params.service_config.model_name,
directory=temp_dir,
)
# Copy the torch model config to the model store
fileio.copy(
config_file_uri,
os.path.join(config_propreties_uri, "config.properties"),
)
service_config = params.service_config.copy()
service_config.model_uri = deployment_folder_uri
return service_config