Skip to content

Kserve

zenml.integrations.kserve special

Initialization of the KServe integration for ZenML.

The KServe integration allows you to use the KServe model serving platform to implement continuous model deployment.

KServeIntegration (Integration)

Definition of KServe integration for ZenML.

Source code in zenml/integrations/kserve/__init__.py
class KServeIntegration(Integration):
    """Definition of KServe integration for ZenML."""

    NAME = KSERVE
    REQUIREMENTS = [
        "kserve>=0.9.0,<=10",
        "torch-model-archiver",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activate the KServe integration."""
        from zenml.integrations.kserve import model_deployers  # noqa
        from zenml.integrations.kserve import secret_schemas  # noqa
        from zenml.integrations.kserve import services  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for KServe.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor

        return [KServeModelDeployerFlavor]

activate() classmethod

Activate the KServe integration.

Source code in zenml/integrations/kserve/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the KServe integration."""
    from zenml.integrations.kserve import model_deployers  # noqa
    from zenml.integrations.kserve import secret_schemas  # noqa
    from zenml.integrations.kserve import services  # noqa

flavors() classmethod

Declare the stack component flavors for KServe.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/kserve/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for KServe.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor

    return [KServeModelDeployerFlavor]

constants

KServe constants.

custom_deployer special

Initialization of ZenML custom deployer.

zenml_custom_model

Implements a custom model for the Kserve integration.

ZenMLCustomModel (Model)

Custom model class for ZenML and Kserve.

This class is used to implement a custom model for the Kserve integration, which is used as the main entry point for custom code execution.

Attributes:

Name Type Description
model_name

The name of the model.

model_uri

The URI of the model.

predict_func

The predict function of the model.

Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel(kserve.Model):  # type: ignore[misc]
    """Custom model class for ZenML and Kserve.

    This class is used to implement a custom model for the Kserve integration,
    which is used as the main entry point for custom code execution.

    Attributes:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """

    def __init__(
        self,
        model_name: str,
        model_uri: str,
        predict_func: str,
    ):
        """Initializes a ZenMLCustomModel object.

        Args:
            model_name: The name of the model.
            model_uri: The URI of the model.
            predict_func: The predict function of the model.
        """
        super().__init__(model_name)
        self.name = model_name
        self.model_uri = model_uri
        self.predict_func = source_utils.load(predict_func)
        self.model = None
        self.ready = False

    def load(self) -> bool:
        """Load the model.

        This function loads the model into memory and sets the ready flag to True.

        The model is loaded using the materializer, by saving the information of
        the artifact to a YAML file in the same path as the model artifacts at
        the preparing time and loading it again at the prediction time by
        the materializer.

        Returns:
            True if the model was loaded successfully, False otherwise.

        """
        try:
            from zenml.artifacts.utils import load_model_from_metadata

            self.model = load_model_from_metadata(self.model_uri)
        except Exception as e:
            logger.error("Failed to load model: {}".format(e))
            return False
        self.ready = True
        return self.ready

    def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Predict the given request.

        The main predict function of the model. This function is called by the
        KServe server when a request is received. Then inside this function,
        the user-defined predict function is called.

        Args:
            request: The request to predict in a dictionary. e.g. {"instances": []}

        Returns:
            The prediction dictionary.

        Raises:
            RuntimeError: If function could not be called.
            NotImplementedError: If the model is not ready.
            TypeError: If the request is not a dictionary.
        """
        if self.predict_func is not None:
            try:
                prediction = {
                    "predictions": self.predict_func(
                        self.model, request["instances"]
                    )
                }
            except RuntimeError as err:
                raise RuntimeError("Failed to predict: {}".format(err))
            if isinstance(prediction, dict):
                return prediction
            else:
                raise TypeError(
                    f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
                )
        else:
            raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func) special

Initializes a ZenMLCustomModel object.

Parameters:

Name Type Description Default
model_name str

The name of the model.

required
model_uri str

The URI of the model.

required
predict_func str

The predict function of the model.

required
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def __init__(
    self,
    model_name: str,
    model_uri: str,
    predict_func: str,
):
    """Initializes a ZenMLCustomModel object.

    Args:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """
    super().__init__(model_name)
    self.name = model_name
    self.model_uri = model_uri
    self.predict_func = source_utils.load(predict_func)
    self.model = None
    self.ready = False
load(self)

Load the model.

This function loads the model into memory and sets the ready flag to True.

The model is loaded using the materializer, by saving the information of the artifact to a YAML file in the same path as the model artifacts at the preparing time and loading it again at the prediction time by the materializer.

Returns:

Type Description
bool

True if the model was loaded successfully, False otherwise.

Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
    """Load the model.

    This function loads the model into memory and sets the ready flag to True.

    The model is loaded using the materializer, by saving the information of
    the artifact to a YAML file in the same path as the model artifacts at
    the preparing time and loading it again at the prediction time by
    the materializer.

    Returns:
        True if the model was loaded successfully, False otherwise.

    """
    try:
        from zenml.artifacts.utils import load_model_from_metadata

        self.model = load_model_from_metadata(self.model_uri)
    except Exception as e:
        logger.error("Failed to load model: {}".format(e))
        return False
    self.ready = True
    return self.ready
predict(self, request)

Predict the given request.

The main predict function of the model. This function is called by the KServe server when a request is received. Then inside this function, the user-defined predict function is called.

Parameters:

Name Type Description Default
request Dict[str, Any]

The request to predict in a dictionary. e.g. {"instances": []}

required

Returns:

Type Description
Dict[str, Any]

The prediction dictionary.

Exceptions:

Type Description
RuntimeError

If function could not be called.

NotImplementedError

If the model is not ready.

TypeError

If the request is not a dictionary.

Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
    """Predict the given request.

    The main predict function of the model. This function is called by the
    KServe server when a request is received. Then inside this function,
    the user-defined predict function is called.

    Args:
        request: The request to predict in a dictionary. e.g. {"instances": []}

    Returns:
        The prediction dictionary.

    Raises:
        RuntimeError: If function could not be called.
        NotImplementedError: If the model is not ready.
        TypeError: If the request is not a dictionary.
    """
    if self.predict_func is not None:
        try:
            prediction = {
                "predictions": self.predict_func(
                    self.model, request["instances"]
                )
            }
        except RuntimeError as err:
            raise RuntimeError("Failed to predict: {}".format(err))
        if isinstance(prediction, dict):
            return prediction
        else:
            raise TypeError(
                f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
            )
    else:
        raise NotImplementedError("Predict function is not implemented")

flavors special

KServe integration flavors.

kserve_model_deployer_flavor

KServe model deployer flavor.

KServeModelDeployerConfig (BaseModelDeployerConfig) pydantic-model

Configuration for the KServeModelDeployer.

Attributes:

Name Type Description
kubernetes_context Optional[str]

the Kubernetes context to use to contact the remote KServe installation. If not specified, the current configuration is used. Depending on where the KServe model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_namespace Optional[str]

the Kubernetes namespace where the KServe inference service CRDs are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the KServe model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod).

base_url str

the base URL of the Kubernetes ingress used to expose the KServe inference services.

secret Optional[str]

the name of the secret containing the credentials for the KServe inference services.

Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerConfig(BaseModelDeployerConfig):
    """Configuration for the KServeModelDeployer.

    Attributes:
        kubernetes_context: the Kubernetes context to use to contact the remote
            KServe installation. If not specified, the current
            configuration is used. Depending on where the KServe model deployer
            is being used, this can be either a locally active context or an
            in-cluster Kubernetes configuration (if running inside a pod).
            If the model deployer stack component is linked to a Kubernetes
            service connector, this field is ignored.
        kubernetes_namespace: the Kubernetes namespace where the KServe
            inference service CRDs are provisioned and managed by ZenML. If not
            specified, the namespace set in the current configuration is used.
            Depending on where the KServe model deployer is being used, this can
            be either the current namespace configured in the locally active
            context or the namespace in the context of which the pod is running
            (if running inside a pod).
        base_url: the base URL of the Kubernetes ingress used to expose the
            KServe inference services.
        secret: the name of the secret containing the credentials for the
            KServe inference services.
    """

    kubernetes_context: Optional[str] = None
    kubernetes_namespace: Optional[str] = None
    base_url: str  # TODO: unused?
    secret: Optional[str]
    custom_domain: Optional[str]  # TODO: unused?
KServeModelDeployerFlavor (BaseModelDeployerFlavor)

Flavor for the KServe model deployer.

Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the KServe model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return KSERVE_MODEL_DEPLOYER_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/kserve.png"

    @property
    def config_class(self) -> Type[KServeModelDeployerConfig]:
        """Returns `KServeModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return KServeModelDeployerConfig

    @property
    def implementation_class(self) -> Type["KServeModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.kserve.model_deployers import (
            KServeModelDeployer,
        )

        return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] property readonly

Returns KServeModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KServeModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KServeModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

model_deployers special

Initialization of the KServe Model Deployer.

kserve_model_deployer

Implementation of the KServe Model Deployer.

KServeModelDeployer (BaseModelDeployer)

KServe model deployer stack component implementation.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployer(BaseModelDeployer):
    """KServe model deployer stack component implementation."""

    NAME: ClassVar[str] = "KServe"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = KServeModelDeployerFlavor

    _client: Optional[KServeClient] = None

    @property
    def config(self) -> KServeModelDeployerConfig:
        """Returns the `KServeModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(KServeModelDeployerConfig, self._config)

    @property
    def validator(self) -> Optional[StackValidator]:
        """Ensures there is a container registry and image builder in the stack.

        Returns:
            A `StackValidator` instance.
        """
        # Log deprecation warning
        logger.warning(
            "The KServe model deployer is deprecated and is no longer "
            "being maintained by the ZenML core team. If you are looking for a "
            "scalable Kubernetes-based model deployment solution, consider "
            "using Seldon instead: "
            "https://docs.zenml.io/stacks-and-components/component-guide/model-deployers/seldon",
        )
        return StackValidator(
            required_components={
                StackComponentType.IMAGE_BUILDER,
            }
        )

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "KServeDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information on the model server.

        Args:
            service_instance: KServe deployment service object

        Returns:
            A dictionary containing the model server information.
        """
        return {
            "PREDICTION_URL": service_instance.prediction_url,
            "PREDICTION_HOSTNAME": service_instance.prediction_hostname,
            "MODEL_URI": service_instance.config.model_uri,
            "MODEL_NAME": service_instance.config.model_name,
            "KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
        }

    @property
    def kserve_client(self) -> KServeClient:
        """Get the KServe client associated with this model deployer.

        Returns:
            The KServeclient.

        Raises:
            RuntimeError: If the Kubernetes namespace is not configured in the
                stack component when using a service connector to deploy models
                with KServe.
        """
        # Refresh the client also if the connector has expired
        if self._client and not self.connector_has_expired():
            return self._client

        connector = self.get_connector()
        if connector:
            if not self.config.kubernetes_namespace:
                raise RuntimeError(
                    "The Kubernetes namespace must be explicitly configured in "
                    "the stack component when using a service connector to "
                    "deploy models with KServe."
                )
            client = connector.connect()
            if not isinstance(client, k8s_client.ApiClient):
                raise RuntimeError(
                    f"Expected a k8s_client.ApiClient while trying to use the "
                    f"linked connector, but got {type(client)}."
                )
            self._client = KubeClientKServeClient(
                kube_client=client,
            )
        else:
            self._client = KServeClient(
                context=self.config.kubernetes_context,
            )
        return self._client

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBase"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        builds = []
        for step_name, step in deployment.step_configurations.items():
            if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
                build = BuildConfiguration(
                    key=KSERVE_DOCKER_IMAGE_KEY,
                    settings=step.config.docker_settings,
                    step_name=step_name,
                )
                builds.append(build)

        return builds

    def deploy_model(
        self,
        config: ServiceConfig,
        replace: bool = False,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new KServe deployment or update an existing one.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new KServe
            deployment server to reflect the model and other configuration
            parameters specified in the supplied KServe deployment `config`.

          * if `replace` is True, this method will first attempt to find an
            existing KServe deployment that is *equivalent* to the supplied
            configuration parameters. Two or more KServe deployments are
            considered equivalent if they have the same `pipeline_name`,
            `pipeline_step_name` and `model_name` configuration parameters. To
            put it differently, two KServe deployments are equivalent if
            they serve versions of the same model deployed by the same pipeline
            step. If an equivalent KServe deployment is found, it will be
            updated in place to reflect the new configuration parameters. This
            allows an existing KServe deployment to retain its prediction
            URL while performing a rolling update to serve a new model version.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new KServe deployment
        server for each new model version. If multiple equivalent KServe
        deployments are found, the most recently created deployment is selected
        to be updated and the others are deleted.

        Args:
            config: the configuration of the model to be deployed with KServe.
            replace: set this flag to True to find and update an equivalent
                KServeDeployment server with the new model instead of
                starting a new deployment server.
            timeout: the timeout in seconds to wait for the KServe server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the KServe
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML KServe deployment service object that can be used to
            interact with the remote KServe server.

        Raises:
            RuntimeError: if the KServe deployment server could not be stopped.
        """
        with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
            config = cast(KServeDeploymentConfig, config)
            service = None

            # if replace is True, find equivalent KServe deployments
            if replace is True:
                equivalent_services = self.find_model_server(
                    running=False,
                    pipeline_name=config.pipeline_name,
                    pipeline_step_name=config.pipeline_step_name,
                    model_name=config.model_name,
                )

                for equivalent_service in equivalent_services:
                    if service is None:
                        # keep the most recently created service
                        service = equivalent_service
                    else:
                        try:
                            # delete the older services and don't wait for
                            # them to be deprovisioned
                            service.stop()
                        except RuntimeError as e:
                            raise RuntimeError(
                                "Failed to stop the KServe deployment "
                                "server:\n",
                                f"{e}\n",
                                "Please stop it manually and try again.",
                            )

            if service:
                # Reuse the service account and secret from the existing
                # service.
                assert isinstance(service, KServeDeploymentService)
                config.k8s_service_account = service.config.k8s_service_account
                config.k8s_secret = service.config.k8s_secret

            # configure the credentials for the KServe model server
            self._create_or_update_kserve_credentials(config)
            if service:
                # update an equivalent service in place
                service.update(config)
                logger.info(
                    f"Updating an existing KServe deployment service: {service}"
                )
            else:
                # create a new service
                service = KServeDeploymentService(config=config)
                logger.info(
                    f"Creating a new KServe deployment service: {service}"
                )

            # start the service which in turn provisions the KServe
            # deployment server and waits for it to reach a ready state
            service.start(timeout=timeout)

            # Add telemetry with metadata that gets the stack metadata and
            # differentiates between pure model and custom code deployments
            stack = Client().active_stack
            stack_metadata = {
                component_type.value: component.flavor
                for component_type, component in stack.components.items()
            }
            analytics_handler.metadata = {
                "store_type": Client().zen_store.type.value,
                **stack_metadata,
                "is_custom_code_deployment": config.container is not None,
            }
        return service

    def get_kserve_deployments(
        self, labels: Dict[str, str]
    ) -> List[V1beta1InferenceService]:
        """Get a list of KServe deployments that match the supplied labels.

        Args:
            labels: a dictionary of labels to match against KServe deployments.

        Returns:
            A list of KServe deployments that match the supplied labels.

        Raises:
            RuntimeError: if an operational failure is encountered while
        """
        label_selector = (
            ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
        )

        namespace = (
            self.config.kubernetes_namespace
            or utils.get_default_target_namespace()
        )

        try:
            response = (
                self.kserve_client.api_instance.list_namespaced_custom_object(
                    constants.KSERVE_GROUP,
                    constants.KSERVE_V1BETA1_VERSION,
                    namespace,
                    constants.KSERVE_PLURAL,
                    label_selector=label_selector,
                )
            )
        except k8s_client.rest.ApiException as e:
            raise RuntimeError(
                "Exception when retrieving KServe inference services\
                %s\n"
                % e
            )

        # TODO[CRITICAL]: de-serialize each item into a complete
        #   V1beta1InferenceService object recursively using the OpenApi
        #   schema (this doesn't work right now)
        inference_services: List[V1beta1InferenceService] = []
        for item in response.get("items", []):
            snake_case_item = self._camel_to_snake(item)
            inference_service = V1beta1InferenceService(**snake_case_item)
            inference_services.append(inference_service)
        return inference_services

    def _camel_to_snake(self, obj: Dict[str, Any]) -> Dict[str, Any]:
        """Convert a camelCase dictionary to snake_case.

        Args:
            obj: a dictionary with camelCase keys

        Returns:
            a dictionary with snake_case keys
        """
        if isinstance(obj, (str, int, float)):
            return obj
        if isinstance(obj, dict):
            assert obj is not None
            new = obj.__class__()
            for k, v in obj.items():
                new[self._convert_to_snake(k)] = self._camel_to_snake(v)
        elif isinstance(obj, (list, set, tuple)):
            assert obj is not None
            new = obj.__class__(self._camel_to_snake(v) for v in obj)
        else:
            return obj
        return new

    def _convert_to_snake(self, k: str) -> str:
        return re.sub(r"(?<!^)(?=[A-Z])", "_", k).lower()

    def find_model_server(
        self,
        running: bool = False,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        run_name: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_uri: Optional[str] = None,
        predictor: Optional[str] = None,
    ) -> List[BaseService]:
        """Find one or more KServe model services that match the given criteria.

        Args:
            running: If true, only running services will be returned.
            service_uuid: The UUID of the service that was originally used
                to deploy the model.
            pipeline_name: name of the pipeline that the deployed model was part
                of.
            run_name: name of the pipeline run which the deployed model was
                part of.
            pipeline_step_name: the name of the pipeline model deployment step
                that deployed the model.
            model_name: the name of the deployed model.
            model_uri: URI of the deployed model.
            predictor: the name of the predictor that was used to deploy the model.

        Returns:
            One or more Service objects representing model servers that match
            the input search criteria.
        """
        config = KServeDeploymentConfig(
            pipeline_name=pipeline_name or "",
            run_name=run_name or "",
            pipeline_run_id=run_name or "",
            pipeline_step_name=pipeline_step_name or "",
            model_uri=model_uri or "",
            model_name=model_name or "",
            predictor=predictor or "",
            resources={},
        )
        labels = config.get_kubernetes_labels()

        if service_uuid:
            labels["zenml.service_uuid"] = str(service_uuid)

        deployments = self.get_kserve_deployments(labels=labels)

        services: List[BaseService] = []
        for deployment in deployments:
            # recreate the KServe deployment service object from the KServe
            # deployment resource
            service = KServeDeploymentService.create_from_deployment(
                deployment=deployment
            )
            if running and not service.is_running:
                # skip non-running services
                continue
            services.append(service)

        return services

    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Stop a KServe model server.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop.
            force: if True, force the service to stop.

        Raises:
            NotImplementedError: stopping on KServe model servers is not
                supported.
        """
        raise NotImplementedError(
            "Stopping KServe model servers is not implemented. Try "
            "deleting the KServe model server instead."
        )

    def start_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> None:
        """Start a KServe model deployment server.

        Args:
            uuid: UUID of the model server to start.
            timeout: timeout in seconds to wait for the service to become
                active. . If set to 0, the method will return immediately after
                provisioning the service, without waiting for it to become
                active.

        Raises:
            NotImplementedError: since we don't support starting KServe
                model servers
        """
        raise NotImplementedError(
            "Starting KServe model servers is not implemented"
        )

    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Delete a KServe model deployment server.

        Args:
            uuid: UUID of the model server to delete.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """
        services = self.find_model_server(service_uuid=uuid)
        if len(services) == 0:
            return
        service = services[0]
        service.stop(timeout=timeout, force=force)
        assert isinstance(service, KServeDeploymentService)
        if service.config.k8s_service_account:
            self.delete_k8s_service_account(service.config.k8s_service_account)
        if service.config.k8s_secret:
            self.delete_k8s_secret(service.config.k8s_secret)

    def _create_or_update_kserve_credentials(
        self, config: KServeDeploymentConfig
    ) -> None:
        """Create or update the KServe credentials used to access the artifact store.

        The way KServe allows configured credentials to be passed to the
        model servers is a bit convoluted:

          * we need to create a Kubernetes secret object with credentials
            in the correct format supported by KServe (only AWS, GCP, Azure are
            supported)
          * we need to create a Kubernetes service account object that
            references the secret object
          * we need to use the service account object in the KServe
            deployment configuration

        This method will use a random name for every model server. This ensures
        that we can create multiple KServe deployments with different
        credentials without running into naming conflicts.

        If a ZenML secret is not explicitly configured for the model deployment
        or the model deployer, this method attempts to fetch credentials from
        the active artifact store and convert them into the appropriate secret
        format expected by KServe.

        Args:
            config: KServe deployment configuration.

        Raises:
            RuntimeError: if the configured secret object is not found.
        """
        secret_name = config.secret_name or self.config.secret

        if secret_name:
            if config.secret_name:
                secret_source = "model deployment"
            else:
                secret_source = "Model Deployer"

            logger.warning(
                f"Your KServe {secret_source} is configured to use a "
                f"ZenML secret `{secret_name}` that holds credentials needed "
                "to access the artifact store. The recommended authentication "
                "method is to configure credentials for the artifact store "
                "stack component instead. The KServe model deployer will use "
                "those credentials to authenticate to the artifact store "
                "automatically."
            )

            try:
                zenml_secret = Client().get_secret_by_name_and_scope(
                    secret_name
                )
            except KeyError as e:
                raise RuntimeError(
                    f"The ZenML secret '{secret_name}' specified in the "
                    f"KServe {secret_source} configuration was not found "
                    f"in the secrets store: {e}."
                )

            credentials = zenml_secret.secret_values

        else:
            # if no secret is configured, try to fetch credentials from the
            # active artifact store and convert them into the appropriate format
            # expected by KServe
            converted_secret = self._convert_artifact_store_secret()

            if not converted_secret:
                # If a secret and service account were previously configured, we
                # need to delete them before we can proceed
                if config.k8s_service_account:
                    self.delete_k8s_service_account(config.k8s_service_account)
                    config.k8s_service_account = None
                if config.k8s_secret:
                    self.delete_k8s_secret(config.k8s_secret)
                    config.k8s_secret = None
                return

            credentials = converted_secret.get_values()

        # S3 credentials are special because part of them need to be passed
        # as annotations
        annotations: Dict[str, str] = {}
        if "aws_access_key_id" in credentials:
            if credentials.get("s3_region"):
                annotations["serving.kubeflow.org/s3-region"] = (
                    credentials.pop("s3_region")
                )
            if credentials.get("s3_endpoint"):
                annotations["serving.kubeflow.org/s3-endpoint"] = (
                    credentials.pop("s3_endpoint")
                )
            if credentials.get("s3_use_https"):
                annotations["serving.kubeflow.org/s3-usehttps"] = (
                    credentials.pop("s3_use_https")
                )
            if credentials.get("s3_verify_ssl"):
                annotations["serving.kubeflow.org/s3-verifyssl"] = (
                    credentials.pop("s3_verify_ssl")
                )

        # Convert all keys to uppercase
        credentials = {k.upper(): v for k, v in credentials.items()}

        # The GCP credentials need to use a specific key name
        if "GOOGLE_APPLICATION_CREDENTIALS" in credentials:
            credentials["gcloud-application-credentials.json"] = (
                credentials.pop("GOOGLE_APPLICATION_CREDENTIALS")
            )

        # Create or update the Kubernetes secret object
        config.k8s_secret = self.create_or_update_k8s_secret(
            name=config.k8s_secret,
            annotations=annotations,
            secret_values=credentials,
        )

        # Create or update the Kubernetes service account object
        config.k8s_service_account = self.create_or_update_k8s_service_account(
            name=config.k8s_service_account,
            secret_name=config.k8s_secret,
        )

    def _convert_artifact_store_secret(self) -> Optional[BaseSecretSchema]:
        """Convert the credentials configured for the artifact store into a ZenML secret.

        Returns:
            The KServe credentials in the format expected by KServe or None if
            no credentials are configured for the artifact store or if they
            cannot be converted into the KServe format.
        """
        artifact_store = Client().active_stack.artifact_store

        zenml_secret: BaseSecretSchema

        if artifact_store.flavor == "s3":
            from zenml.integrations.s3.artifact_stores import S3ArtifactStore

            assert isinstance(artifact_store, S3ArtifactStore)

            (
                aws_access_key_id,
                aws_secret_access_key,
                _,
            ) = artifact_store.get_credentials()

            if aws_access_key_id and aws_secret_access_key:
                # Convert the credentials into the format expected by KServe
                zenml_secret = KServeS3SecretSchema(
                    aws_access_key_id=aws_access_key_id,
                    aws_secret_access_key=aws_secret_access_key,
                )
                if artifact_store.config.client_kwargs:
                    if "endpoint_url" in artifact_store.config.client_kwargs:
                        zenml_secret.s3_endpoint = str(
                            artifact_store.config.client_kwargs["endpoint_url"]
                        )
                    if "region_name" in artifact_store.config.client_kwargs:
                        zenml_secret.s3_region = str(
                            artifact_store.config.client_kwargs["region_name"]
                        )
                    if "use_ssl" in artifact_store.config.client_kwargs:
                        zenml_secret.s3_use_https = str(
                            artifact_store.config.client_kwargs["use_ssl"]
                        )

                return zenml_secret

            logger.warning(
                "No credentials are configured for the active S3 artifact "
                "store. The KServe model deployer will assume an "
                "implicit form of authentication is available in the "
                "target Kubernetes cluster, but the served model may not "
                "be able to access the model artifacts."
            )

            # Assume implicit in-cluster IAM authentication
            return None

        elif artifact_store.flavor == "gcp":
            from zenml.integrations.gcp.artifact_stores import GCPArtifactStore

            assert isinstance(artifact_store, GCPArtifactStore)

            gcp_credentials = artifact_store.get_credentials()

            if gcp_credentials:
                # Convert the credentials into the format expected by KServe
                return KServeGSSecretSchema(
                    google_application_credentials=json.dumps(gcp_credentials),
                )

            logger.warning(
                "No credentials are configured for the active GCS artifact "
                "store. The KServe model deployer will assume an "
                "implicit form of authentication is available in the "
                "target Kubernetes cluster, but the served model may not "
                "be able to access the model artifacts."
            )
            return None

        elif artifact_store.flavor == "azure":
            from zenml.integrations.azure.artifact_stores import (
                AzureArtifactStore,
            )

            assert isinstance(artifact_store, AzureArtifactStore)

            azure_credentials = artifact_store.get_credentials()

            if azure_credentials:
                # Convert the credentials into the format expected by KServe
                if (
                    azure_credentials.client_id is not None
                    and azure_credentials.client_secret is not None
                    and azure_credentials.tenant_id is not None
                ):
                    return KServeAzureSecretSchema(
                        azure_client_id=azure_credentials.client_id,
                        azure_client_secret=azure_credentials.client_secret,
                        azure_tenant_id=azure_credentials.tenant_id,
                    )
                else:
                    logger.warning(
                        "The KServe model deployer could not use the "
                        "credentials currently configured in the active Azure "
                        "artifact store because it only supports service "
                        "principal Azure credentials. "
                        "Please configure Azure principal credentials for your "
                        "artifact store or specify a custom ZenML secret in "
                        "the model deployer configuration that holds the "
                        "credentials required to access the model artifacts. "
                        "The KServe model deployer will assume an implicit "
                        "form of authentication is available in the target "
                        "Kubernetes cluster, but the served model "
                        "may not be able to access the model artifacts."
                    )

                    return None

            logger.warning(
                "No credentials are configured for the active Azure "
                "artifact store. The Seldon Core model deployer will "
                "assume an implicit form of authentication is available "
                "in the target Kubernetes cluster, but the served model "
                "may not be able to access the model artifacts."
            )
            return None

        logger.warning(
            "The KServe model deployer doesn't know how to configure "
            f"credentials automatically for the `{artifact_store.flavor}` "
            "active artifact store flavor. "
            "Please use one of the supported artifact stores (S3 or GCP) "
            "or specify a ZenML secret in the model deployer "
            "configuration that holds the credentials required to access "
            "the model artifacts. The KServe model deployer will "
            "assume an implicit form of authentication is available "
            "in the target Kubernetes cluster, but the served model "
            "may not be able to access the model artifacts."
        )

        return None

    def create_or_update_k8s_secret(
        self,
        name: Optional[str] = None,
        secret_values: Dict[str, Any] = {},
        annotations: Dict[str, str] = {},
    ) -> str:
        """Create or update a Kubernetes Secret resource.

        Args:
            name: the name of the Secret resource to create. If not
                specified, a random name will be generated.
            secret_values: secret key-values that should be
                stored in the Secret resource.
            annotations: optional annotations to add to the Secret resource.

        Returns:
            The name of the created Secret resource.

        Raises:
            RuntimeError: if an unknown error occurs during the creation of
                the secret.
        """
        name = name or f"zenml-kserve-{uuid4().hex}"

        try:
            logger.debug(f"Creating Secret resource: {name}")

            core_api = k8s_client.CoreV1Api()

            secret_data = {
                k: base64.b64encode(str(v).encode("utf-8")).decode("ascii")
                for k, v in secret_values.items()
                if v is not None
            }

            secret = k8s_client.V1Secret(
                metadata=k8s_client.V1ObjectMeta(
                    name=name,
                    labels={"app": "zenml"},
                    annotations=annotations,
                ),
                type="Opaque",
                data=secret_data,
            )

            try:
                # check if the secret is already present
                core_api.read_namespaced_secret(
                    name=name,
                    namespace=self.config.kubernetes_namespace,
                )
                # if we got this far, the secret is already present, update it
                # in place
                response = core_api.replace_namespaced_secret(
                    name=name,
                    namespace=self.config.kubernetes_namespace,
                    body=secret,
                )
            except k8s_client.rest.ApiException as e:
                if e.status != 404:
                    # if an error other than 404 is raised here, treat it
                    # as an unexpected error
                    raise RuntimeError(
                        "Exception when reading Secret resource: %s", str(e)
                    )
                response = core_api.create_namespaced_secret(
                    namespace=self.config.kubernetes_namespace,
                    body=secret,
                )
            logger.debug("Kubernetes API response: %s", response)
        except k8s_client.rest.ApiException as e:
            raise RuntimeError(
                "Exception when creating Secret resource %s", str(e)
            )

        return name

    def delete_k8s_secret(
        self,
        name: str,
    ) -> None:
        """Delete a Kubernetes Secret resource managed by ZenML.

        Args:
            name: the name of the Kubernetes Secret resource to delete.

        Raises:
            RuntimeError: if an unknown error occurs during the removal
                of the secret.
        """
        try:
            logger.debug(f"Deleting Secret resource: {name}")

            core_api = k8s_client.CoreV1Api()

            response = core_api.delete_namespaced_secret(
                name=name,
                namespace=self.config.kubernetes_namespace,
            )
            logger.debug("Kubernetes API response: %s", response)
        except k8s_client.rest.ApiException as e:
            if e.status == 404:
                # the secret is no longer present, nothing to do
                return
            raise RuntimeError(
                f"Exception when deleting Secret resource {name}: {e}"
            )

    def create_or_update_k8s_service_account(
        self, name: Optional[str] = None, secret_name: Optional[str] = None
    ) -> str:
        """Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML.

        Args:
            name: the name of the ServiceAccount resource to create. If not
                specified, a random name will be generated.
            secret_name: the name of a secret to attach to the ServiceAccount.

        Returns:
            The name of the created ServiceAccount resource.

        Raises:
            RuntimeError: if an unknown error occurs during the creation of
                the service account.
        """
        name = name or f"zenml-kserve-{uuid4().hex}"

        service_account = k8s_client.V1ServiceAccount(
            metadata=k8s_client.V1ObjectMeta(
                name=name,
            ),
        )

        if secret_name:
            service_account.secrets = [
                k8s_client.V1ObjectReference(kind="Secret", name=secret_name)
            ]

        core_api = k8s_client.CoreV1Api()

        try:
            # check if the service account is already present
            core_api.read_namespaced_service_account(
                name=name,
                namespace=self.config.kubernetes_namespace,
            )
            # if we got this far, the service account is already present, update
            # it in place
            core_api.replace_namespaced_service_account(
                name=name,
                namespace=self.config.kubernetes_namespace,
                body=service_account,
            )
        except k8s_client.rest.ApiException as e:
            if e.status != 404:
                # if an error other than 404 is raised here, treat it
                # as an unexpected error
                raise RuntimeError(
                    "Exception when reading ServiceAccount resource: %s",
                    str(e),
                )
            core_api.create_namespaced_service_account(
                namespace=self.config.kubernetes_namespace,
                body=service_account,
            )

        return name

    def delete_k8s_service_account(
        self,
        name: str,
    ) -> None:
        """Delete a Kubernetes ServiceAccount resource managed by ZenML.

        Args:
            name: the name of the Kubernetes ServiceAccount resource to delete.

        Raises:
            RuntimeError: if an unknown error occurs during the removal
                of the service account.
        """
        try:
            logger.debug(f"Deleting ServiceAccount resource: {name}")

            core_api = k8s_client.CoreV1Api()

            response = core_api.delete_namespaced_service_account(
                name=name,
                namespace=self.config.kubernetes_namespace,
            )
            logger.debug("Kubernetes API response: %s", response)
        except k8s_client.rest.ApiException as e:
            if e.status == 404:
                # the service account is no longer present, nothing to do
                return
            raise RuntimeError(
                f"Exception when deleting ServiceAccount resource {name}: {e}"
            )
config: KServeModelDeployerConfig property readonly

Returns the KServeModelDeployerConfig config.

Returns:

Type Description
KServeModelDeployerConfig

The configuration.

kserve_client: kserve.KServeClient property readonly

Get the KServe client associated with this model deployer.

Returns:

Type Description
kserve.KServeClient

The KServeclient.

Exceptions:

Type Description
RuntimeError

If the Kubernetes namespace is not configured in the stack component when using a service connector to deploy models with KServe.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Ensures there is a container registry and image builder in the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

FLAVOR (BaseModelDeployerFlavor)

Flavor for the KServe model deployer.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the KServe model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return KSERVE_MODEL_DEPLOYER_FLAVOR

    @property
    def service_connector_requirements(
        self,
    ) -> Optional[ServiceConnectorRequirements]:
        """Service connector resource requirements for service connectors.

        Specifies resource requirements that are used to filter the available
        service connector types that are compatible with this flavor.

        Returns:
            Requirements for compatible service connectors, if a service
            connector is required for this flavor.
        """
        return ServiceConnectorRequirements(
            resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
        )

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/kserve.png"

    @property
    def config_class(self) -> Type[KServeModelDeployerConfig]:
        """Returns `KServeModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return KServeModelDeployerConfig

    @property
    def implementation_class(self) -> Type["KServeModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.kserve.model_deployers import (
            KServeModelDeployer,
        )

        return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] property readonly

Returns KServeModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KServeModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KServeModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] property readonly

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service connector is required for this flavor.

create_or_update_k8s_secret(self, name=None, secret_values={}, annotations={})

Create or update a Kubernetes Secret resource.

Parameters:

Name Type Description Default
name Optional[str]

the name of the Secret resource to create. If not specified, a random name will be generated.

None
secret_values Dict[str, Any]

secret key-values that should be stored in the Secret resource.

{}
annotations Dict[str, str]

optional annotations to add to the Secret resource.

{}

Returns:

Type Description
str

The name of the created Secret resource.

Exceptions:

Type Description
RuntimeError

if an unknown error occurs during the creation of the secret.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def create_or_update_k8s_secret(
    self,
    name: Optional[str] = None,
    secret_values: Dict[str, Any] = {},
    annotations: Dict[str, str] = {},
) -> str:
    """Create or update a Kubernetes Secret resource.

    Args:
        name: the name of the Secret resource to create. If not
            specified, a random name will be generated.
        secret_values: secret key-values that should be
            stored in the Secret resource.
        annotations: optional annotations to add to the Secret resource.

    Returns:
        The name of the created Secret resource.

    Raises:
        RuntimeError: if an unknown error occurs during the creation of
            the secret.
    """
    name = name or f"zenml-kserve-{uuid4().hex}"

    try:
        logger.debug(f"Creating Secret resource: {name}")

        core_api = k8s_client.CoreV1Api()

        secret_data = {
            k: base64.b64encode(str(v).encode("utf-8")).decode("ascii")
            for k, v in secret_values.items()
            if v is not None
        }

        secret = k8s_client.V1Secret(
            metadata=k8s_client.V1ObjectMeta(
                name=name,
                labels={"app": "zenml"},
                annotations=annotations,
            ),
            type="Opaque",
            data=secret_data,
        )

        try:
            # check if the secret is already present
            core_api.read_namespaced_secret(
                name=name,
                namespace=self.config.kubernetes_namespace,
            )
            # if we got this far, the secret is already present, update it
            # in place
            response = core_api.replace_namespaced_secret(
                name=name,
                namespace=self.config.kubernetes_namespace,
                body=secret,
            )
        except k8s_client.rest.ApiException as e:
            if e.status != 404:
                # if an error other than 404 is raised here, treat it
                # as an unexpected error
                raise RuntimeError(
                    "Exception when reading Secret resource: %s", str(e)
                )
            response = core_api.create_namespaced_secret(
                namespace=self.config.kubernetes_namespace,
                body=secret,
            )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        raise RuntimeError(
            "Exception when creating Secret resource %s", str(e)
        )

    return name
create_or_update_k8s_service_account(self, name=None, secret_name=None)

Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML.

Parameters:

Name Type Description Default
name Optional[str]

the name of the ServiceAccount resource to create. If not specified, a random name will be generated.

None
secret_name Optional[str]

the name of a secret to attach to the ServiceAccount.

None

Returns:

Type Description
str

The name of the created ServiceAccount resource.

Exceptions:

Type Description
RuntimeError

if an unknown error occurs during the creation of the service account.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def create_or_update_k8s_service_account(
    self, name: Optional[str] = None, secret_name: Optional[str] = None
) -> str:
    """Create or update a Kubernetes ServiceAccount resource with a secret managed by ZenML.

    Args:
        name: the name of the ServiceAccount resource to create. If not
            specified, a random name will be generated.
        secret_name: the name of a secret to attach to the ServiceAccount.

    Returns:
        The name of the created ServiceAccount resource.

    Raises:
        RuntimeError: if an unknown error occurs during the creation of
            the service account.
    """
    name = name or f"zenml-kserve-{uuid4().hex}"

    service_account = k8s_client.V1ServiceAccount(
        metadata=k8s_client.V1ObjectMeta(
            name=name,
        ),
    )

    if secret_name:
        service_account.secrets = [
            k8s_client.V1ObjectReference(kind="Secret", name=secret_name)
        ]

    core_api = k8s_client.CoreV1Api()

    try:
        # check if the service account is already present
        core_api.read_namespaced_service_account(
            name=name,
            namespace=self.config.kubernetes_namespace,
        )
        # if we got this far, the service account is already present, update
        # it in place
        core_api.replace_namespaced_service_account(
            name=name,
            namespace=self.config.kubernetes_namespace,
            body=service_account,
        )
    except k8s_client.rest.ApiException as e:
        if e.status != 404:
            # if an error other than 404 is raised here, treat it
            # as an unexpected error
            raise RuntimeError(
                "Exception when reading ServiceAccount resource: %s",
                str(e),
            )
        core_api.create_namespaced_service_account(
            namespace=self.config.kubernetes_namespace,
            body=service_account,
        )

    return name
delete_k8s_secret(self, name)

Delete a Kubernetes Secret resource managed by ZenML.

Parameters:

Name Type Description Default
name str

the name of the Kubernetes Secret resource to delete.

required

Exceptions:

Type Description
RuntimeError

if an unknown error occurs during the removal of the secret.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_k8s_secret(
    self,
    name: str,
) -> None:
    """Delete a Kubernetes Secret resource managed by ZenML.

    Args:
        name: the name of the Kubernetes Secret resource to delete.

    Raises:
        RuntimeError: if an unknown error occurs during the removal
            of the secret.
    """
    try:
        logger.debug(f"Deleting Secret resource: {name}")

        core_api = k8s_client.CoreV1Api()

        response = core_api.delete_namespaced_secret(
            name=name,
            namespace=self.config.kubernetes_namespace,
        )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            # the secret is no longer present, nothing to do
            return
        raise RuntimeError(
            f"Exception when deleting Secret resource {name}: {e}"
        )
delete_k8s_service_account(self, name)

Delete a Kubernetes ServiceAccount resource managed by ZenML.

Parameters:

Name Type Description Default
name str

the name of the Kubernetes ServiceAccount resource to delete.

required

Exceptions:

Type Description
RuntimeError

if an unknown error occurs during the removal of the service account.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_k8s_service_account(
    self,
    name: str,
) -> None:
    """Delete a Kubernetes ServiceAccount resource managed by ZenML.

    Args:
        name: the name of the Kubernetes ServiceAccount resource to delete.

    Raises:
        RuntimeError: if an unknown error occurs during the removal
            of the service account.
    """
    try:
        logger.debug(f"Deleting ServiceAccount resource: {name}")

        core_api = k8s_client.CoreV1Api()

        response = core_api.delete_namespaced_service_account(
            name=name,
            namespace=self.config.kubernetes_namespace,
        )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            # the service account is no longer present, nothing to do
            return
        raise RuntimeError(
            f"Exception when deleting ServiceAccount resource {name}: {e}"
        )
delete_model_server(self, uuid, timeout=300, force=False)

Delete a KServe model deployment server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to delete.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Delete a KServe model deployment server.

    Args:
        uuid: UUID of the model server to delete.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
    services = self.find_model_server(service_uuid=uuid)
    if len(services) == 0:
        return
    service = services[0]
    service.stop(timeout=timeout, force=force)
    assert isinstance(service, KServeDeploymentService)
    if service.config.k8s_service_account:
        self.delete_k8s_service_account(service.config.k8s_service_account)
    if service.config.k8s_secret:
        self.delete_k8s_secret(service.config.k8s_secret)
deploy_model(self, config, replace=False, timeout=300)

Create a new KServe deployment or update an existing one.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new KServe deployment server to reflect the model and other configuration parameters specified in the supplied KServe deployment config.

  • if replace is True, this method will first attempt to find an existing KServe deployment that is equivalent to the supplied configuration parameters. Two or more KServe deployments are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two KServe deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent KServe deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing KServe deployment to retain its prediction URL while performing a rolling update to serve a new model version.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new KServe deployment server for each new model version. If multiple equivalent KServe deployments are found, the most recently created deployment is selected to be updated and the others are deleted.

Parameters:

Name Type Description Default
config ServiceConfig

the configuration of the model to be deployed with KServe.

required
replace bool

set this flag to True to find and update an equivalent KServeDeployment server with the new model instead of starting a new deployment server.

False
timeout int

the timeout in seconds to wait for the KServe server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the KServe server is provisioned, without waiting for it to fully start.

300

Returns:

Type Description
BaseService

The ZenML KServe deployment service object that can be used to interact with the remote KServe server.

Exceptions:

Type Description
RuntimeError

if the KServe deployment server could not be stopped.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def deploy_model(
    self,
    config: ServiceConfig,
    replace: bool = False,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new KServe deployment or update an existing one.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new KServe
        deployment server to reflect the model and other configuration
        parameters specified in the supplied KServe deployment `config`.

      * if `replace` is True, this method will first attempt to find an
        existing KServe deployment that is *equivalent* to the supplied
        configuration parameters. Two or more KServe deployments are
        considered equivalent if they have the same `pipeline_name`,
        `pipeline_step_name` and `model_name` configuration parameters. To
        put it differently, two KServe deployments are equivalent if
        they serve versions of the same model deployed by the same pipeline
        step. If an equivalent KServe deployment is found, it will be
        updated in place to reflect the new configuration parameters. This
        allows an existing KServe deployment to retain its prediction
        URL while performing a rolling update to serve a new model version.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new KServe deployment
    server for each new model version. If multiple equivalent KServe
    deployments are found, the most recently created deployment is selected
    to be updated and the others are deleted.

    Args:
        config: the configuration of the model to be deployed with KServe.
        replace: set this flag to True to find and update an equivalent
            KServeDeployment server with the new model instead of
            starting a new deployment server.
        timeout: the timeout in seconds to wait for the KServe server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the KServe
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML KServe deployment service object that can be used to
        interact with the remote KServe server.

    Raises:
        RuntimeError: if the KServe deployment server could not be stopped.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(KServeDeploymentConfig, config)
        service = None

        # if replace is True, find equivalent KServe deployments
        if replace is True:
            equivalent_services = self.find_model_server(
                running=False,
                pipeline_name=config.pipeline_name,
                pipeline_step_name=config.pipeline_step_name,
                model_name=config.model_name,
            )

            for equivalent_service in equivalent_services:
                if service is None:
                    # keep the most recently created service
                    service = equivalent_service
                else:
                    try:
                        # delete the older services and don't wait for
                        # them to be deprovisioned
                        service.stop()
                    except RuntimeError as e:
                        raise RuntimeError(
                            "Failed to stop the KServe deployment "
                            "server:\n",
                            f"{e}\n",
                            "Please stop it manually and try again.",
                        )

        if service:
            # Reuse the service account and secret from the existing
            # service.
            assert isinstance(service, KServeDeploymentService)
            config.k8s_service_account = service.config.k8s_service_account
            config.k8s_secret = service.config.k8s_secret

        # configure the credentials for the KServe model server
        self._create_or_update_kserve_credentials(config)
        if service:
            # update an equivalent service in place
            service.update(config)
            logger.info(
                f"Updating an existing KServe deployment service: {service}"
            )
        else:
            # create a new service
            service = KServeDeploymentService(config=config)
            logger.info(
                f"Creating a new KServe deployment service: {service}"
            )

        # start the service which in turn provisions the KServe
        # deployment server and waits for it to reach a ready state
        service.start(timeout=timeout)

        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "is_custom_code_deployment": config.container is not None,
        }
    return service
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, run_name=None, pipeline_step_name=None, model_name=None, model_uri=None, predictor=None)

Find one or more KServe model services that match the given criteria.

Parameters:

Name Type Description Default
running bool

If true, only running services will be returned.

False
service_uuid Optional[uuid.UUID]

The UUID of the service that was originally used to deploy the model.

None
pipeline_name Optional[str]

name of the pipeline that the deployed model was part of.

None
run_name Optional[str]

name of the pipeline run which the deployed model was part of.

None
pipeline_step_name Optional[str]

the name of the pipeline model deployment step that deployed the model.

None
model_name Optional[str]

the name of the deployed model.

None
model_uri Optional[str]

URI of the deployed model.

None
predictor Optional[str]

the name of the predictor that was used to deploy the model.

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Service objects representing model servers that match the input search criteria.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def find_model_server(
    self,
    running: bool = False,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    run_name: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_uri: Optional[str] = None,
    predictor: Optional[str] = None,
) -> List[BaseService]:
    """Find one or more KServe model services that match the given criteria.

    Args:
        running: If true, only running services will be returned.
        service_uuid: The UUID of the service that was originally used
            to deploy the model.
        pipeline_name: name of the pipeline that the deployed model was part
            of.
        run_name: name of the pipeline run which the deployed model was
            part of.
        pipeline_step_name: the name of the pipeline model deployment step
            that deployed the model.
        model_name: the name of the deployed model.
        model_uri: URI of the deployed model.
        predictor: the name of the predictor that was used to deploy the model.

    Returns:
        One or more Service objects representing model servers that match
        the input search criteria.
    """
    config = KServeDeploymentConfig(
        pipeline_name=pipeline_name or "",
        run_name=run_name or "",
        pipeline_run_id=run_name or "",
        pipeline_step_name=pipeline_step_name or "",
        model_uri=model_uri or "",
        model_name=model_name or "",
        predictor=predictor or "",
        resources={},
    )
    labels = config.get_kubernetes_labels()

    if service_uuid:
        labels["zenml.service_uuid"] = str(service_uuid)

    deployments = self.get_kserve_deployments(labels=labels)

    services: List[BaseService] = []
    for deployment in deployments:
        # recreate the KServe deployment service object from the KServe
        # deployment resource
        service = KServeDeploymentService.create_from_deployment(
            deployment=deployment
        )
        if running and not service.is_running:
            # skip non-running services
            continue
        services.append(service)

    return services
get_docker_builds(self, deployment)

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
            build = BuildConfiguration(
                key=KSERVE_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_kserve_deployments(self, labels)

Get a list of KServe deployments that match the supplied labels.

Parameters:

Name Type Description Default
labels Dict[str, str]

a dictionary of labels to match against KServe deployments.

required

Returns:

Type Description
List[kserve.V1beta1InferenceService]

A list of KServe deployments that match the supplied labels.

Exceptions:

Type Description
RuntimeError

if an operational failure is encountered while

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def get_kserve_deployments(
    self, labels: Dict[str, str]
) -> List[V1beta1InferenceService]:
    """Get a list of KServe deployments that match the supplied labels.

    Args:
        labels: a dictionary of labels to match against KServe deployments.

    Returns:
        A list of KServe deployments that match the supplied labels.

    Raises:
        RuntimeError: if an operational failure is encountered while
    """
    label_selector = (
        ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
    )

    namespace = (
        self.config.kubernetes_namespace
        or utils.get_default_target_namespace()
    )

    try:
        response = (
            self.kserve_client.api_instance.list_namespaced_custom_object(
                constants.KSERVE_GROUP,
                constants.KSERVE_V1BETA1_VERSION,
                namespace,
                constants.KSERVE_PLURAL,
                label_selector=label_selector,
            )
        )
    except k8s_client.rest.ApiException as e:
        raise RuntimeError(
            "Exception when retrieving KServe inference services\
            %s\n"
            % e
        )

    # TODO[CRITICAL]: de-serialize each item into a complete
    #   V1beta1InferenceService object recursively using the OpenApi
    #   schema (this doesn't work right now)
    inference_services: List[V1beta1InferenceService] = []
    for item in response.get("items", []):
        snake_case_item = self._camel_to_snake(item)
        inference_service = V1beta1InferenceService(**snake_case_item)
        inference_services.append(inference_service)
    return inference_services
get_model_server_info(service_instance) staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance KServeDeploymentService

KServe deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "KServeDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: KServe deployment service object

    Returns:
        A dictionary containing the model server information.
    """
    return {
        "PREDICTION_URL": service_instance.prediction_url,
        "PREDICTION_HOSTNAME": service_instance.prediction_hostname,
        "MODEL_URI": service_instance.config.model_uri,
        "MODEL_NAME": service_instance.config.model_name,
        "KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
    }
start_model_server(self, uuid, timeout=300)

Start a KServe model deployment server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

300

Exceptions:

Type Description
NotImplementedError

since we don't support starting KServe model servers

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def start_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
    """Start a KServe model deployment server.

    Args:
        uuid: UUID of the model server to start.
        timeout: timeout in seconds to wait for the service to become
            active. . If set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.

    Raises:
        NotImplementedError: since we don't support starting KServe
            model servers
    """
    raise NotImplementedError(
        "Starting KServe model servers is not implemented"
    )
stop_model_server(self, uuid, timeout=300, force=False)

Stop a KServe model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop.

300
force bool

if True, force the service to stop.

False

Exceptions:

Type Description
NotImplementedError

stopping on KServe model servers is not supported.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Stop a KServe model server.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop.
        force: if True, force the service to stop.

    Raises:
        NotImplementedError: stopping on KServe model servers is not
            supported.
    """
    raise NotImplementedError(
        "Stopping KServe model servers is not implemented. Try "
        "deleting the KServe model server instead."
    )
KubeClientKServeClient (KServeClient)

KServe client initialized from a Kubernetes client.

This is a workaround for the fact that the native KServe client does not support initialization from an existing Kubernetes client.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KubeClientKServeClient(KServeClient):  # type: ignore[misc]
    """KServe client initialized from a Kubernetes client.

    This is a workaround for the fact that the native KServe client does not
    support initialization from an existing Kubernetes client.
    """

    def __init__(
        self, kube_client: k8s_client.ApiClient, *args: Any, **kwargs: Any
    ) -> None:
        """Initializes the KServe client from a Kubernetes client.

        Args:
            kube_client: pre-configured Kubernetes client.
            *args: standard KServe client positional arguments.
            **kwargs: standard KServe client keyword arguments.
        """
        from kubernetes import client

        self.core_api = client.CoreV1Api(kube_client)
        self.app_api = client.AppsV1Api(kube_client)
        self.api_instance = client.CustomObjectsApi(kube_client)
__init__(self, kube_client, *args, **kwargs) special

Initializes the KServe client from a Kubernetes client.

Parameters:

Name Type Description Default
kube_client kubernetes.client.ApiClient

pre-configured Kubernetes client.

required
*args Any

standard KServe client positional arguments.

()
**kwargs Any

standard KServe client keyword arguments.

{}
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def __init__(
    self, kube_client: k8s_client.ApiClient, *args: Any, **kwargs: Any
) -> None:
    """Initializes the KServe client from a Kubernetes client.

    Args:
        kube_client: pre-configured Kubernetes client.
        *args: standard KServe client positional arguments.
        **kwargs: standard KServe client keyword arguments.
    """
    from kubernetes import client

    self.core_api = client.CoreV1Api(kube_client)
    self.app_api = client.AppsV1Api(kube_client)
    self.api_instance = client.CustomObjectsApi(kube_client)

secret_schemas special

Initialization of Kserve Secret Schemas.

These are secret schemas that can be used to authenticate Kserve to the Artifact Store used to store served ML models.

secret_schemas

Implementation for KServe secret schemas.

KServeAzureSecretSchema (BaseSecretSchema) pydantic-model

KServe Azure Blob Storage credentials.

Attributes:

Name Type Description
azure_client_id Optional[str]

the Azure client ID.

azure_client_secret Optional[str]

the Azure client secret.

azure_tenant_id Optional[str]

the Azure tenant ID.

azure_subscription_id Optional[str]

the Azure subscription ID.

Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeAzureSecretSchema(BaseSecretSchema):
    """KServe Azure Blob Storage credentials.

    Attributes:
        azure_client_id: the Azure client ID.
        azure_client_secret: the Azure client secret.
        azure_tenant_id: the Azure tenant ID.
        azure_subscription_id: the Azure subscription ID.
    """

    azure_client_id: Optional[str] = None
    azure_client_secret: Optional[str] = None
    azure_tenant_id: Optional[str] = None
    azure_subscription_id: Optional[str] = None
KServeGSSecretSchema (BaseSecretSchema) pydantic-model

KServe GCS credentials.

Attributes:

Name Type Description
google_application_credentials Optional[str]

the GCP application credentials to use, in JSON format.

Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeGSSecretSchema(BaseSecretSchema):
    """KServe GCS credentials.

    Attributes:
        google_application_credentials: the GCP application credentials to use,
            in JSON format.
    """

    google_application_credentials: Optional[str]
KServeS3SecretSchema (BaseSecretSchema) pydantic-model

KServe S3 credentials.

Attributes:

Name Type Description
aws_access_key_id Optional[str]

the AWS access key ID.

aws_secret_access_key Optional[str]

the AWS secret access key.

s3_endpoint Optional[str]

the S3 endpoint.

s3_region Optional[str]

the S3 region.

s3_use_https Optional[str]

whether to use HTTPS.

s3_verify_ssl Optional[str]

whether to verify SSL.

Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeS3SecretSchema(BaseSecretSchema):
    """KServe S3 credentials.

    Attributes:
        aws_access_key_id: the AWS access key ID.
        aws_secret_access_key: the AWS secret access key.
        s3_endpoint: the S3 endpoint.
        s3_region: the S3 region.
        s3_use_https: whether to use HTTPS.
        s3_verify_ssl: whether to verify SSL.
    """

    aws_access_key_id: Optional[str] = None
    aws_secret_access_key: Optional[str] = None
    s3_endpoint: Optional[str] = None
    s3_region: Optional[str] = None
    s3_use_https: Optional[str] = None
    s3_verify_ssl: Optional[str] = None

services special

Initialization for KServe services.

kserve_deployment

Implementation for the KServe inference service.

KServeDeploymentConfig (ServiceConfig) pydantic-model

KServe deployment service configuration.

Attributes:

Name Type Description
model_uri str

URI of the model (or models) to serve.

model_name str

the name of the model. Multiple versions of the same model should use the same model name. Model name must use only lowercase alphanumeric characters and dashes.

secret_name Optional[str]

the name of the ZenML secret containing credentials required to authenticate to the artifact store.

k8s_secret Optional[str]

the name of the Kubernetes secret to use for the prediction service.

k8s_service_account Optional[str]

the name of the Kubernetes service account to use for the prediction service.

predictor str

the KServe predictor used to serve the model. The

predictor type can be one of the following

tensorflow, pytorch,

replicas int

number of replicas to use for the prediction service.

resources Optional[Dict[str, Any]]

the Kubernetes resources to allocate for the prediction service.

container Optional[Dict[str, Any]]

the container to use for the custom prediction services.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentConfig(ServiceConfig):
    """KServe deployment service configuration.

    Attributes:
        model_uri: URI of the model (or models) to serve.
        model_name: the name of the model. Multiple versions of the same model
            should use the same model name. Model name must use only lowercase
            alphanumeric characters and dashes.
        secret_name: the name of the ZenML secret containing credentials
            required to authenticate to the artifact store.
        k8s_secret: the name of the Kubernetes secret to use for the prediction
            service.
        k8s_service_account: the name of the Kubernetes service account to use
            for the prediction service.
        predictor: the KServe predictor used to serve the model. The
        predictor type can be one of the following: `tensorflow`, `pytorch`,
        `sklearn`, `xgboost`, `custom`.
        replicas: number of replicas to use for the prediction service.
        resources: the Kubernetes resources to allocate for the prediction service.
        container: the container to use for the custom prediction services.
    """

    model_uri: str = ""
    model_name: str
    secret_name: Optional[str] = None
    k8s_secret: Optional[str] = None
    k8s_service_account: Optional[str] = None
    predictor: str
    replicas: int = 1
    container: Optional[Dict[str, Any]] = None
    resources: Optional[Dict[str, Any]] = None

    @staticmethod
    def sanitize_labels(labels: Dict[str, str]) -> None:
        """Update the label values to be valid Kubernetes labels.

        See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

        Args:
            labels: The labels to sanitize.
        """
        # TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
        for key, value in labels.items():
            # Kubernetes labels must be alphanumeric, no longer than
            # 63 characters, and must begin and end with an alphanumeric
            # character ([a-z0-9A-Z])
            labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
                "-_."
            )

    def get_kubernetes_labels(self) -> Dict[str, str]:
        """Generate the labels for the KServe inference CRD from the service configuration.

        These labels are attached to the KServe inference service CRD
        and may be used as label selectors in lookup operations.

        Returns:
            The labels for the KServe inference service CRD.
        """
        labels = {"app": "zenml"}
        if self.pipeline_name:
            labels["zenml.pipeline_name"] = self.pipeline_name
        if self.run_name:
            labels["zenml.run_name"] = self.run_name
        if self.pipeline_step_name:
            labels["zenml.pipeline_step_name"] = self.pipeline_step_name
        if self.model_name:
            labels["zenml.model_name"] = self.model_name
        if self.model_uri:
            labels["zenml.model_uri"] = self.model_uri
        if self.predictor:
            labels["zenml.model_type"] = self.predictor
        self.sanitize_labels(labels)
        return labels

    def get_kubernetes_annotations(self) -> Dict[str, str]:
        """Generate the annotations for the KServe inference CRD the service configuration.

        The annotations are used to store additional information about the
        KServe ZenML service associated with the deployment that is
        not available on the labels. One annotation is particularly important
        is the serialized Service configuration itself, which is used to
        recreate the service configuration from a remote KServe inference
        service CRD.

        Returns:
            The annotations for the KServe inference service CRD.
        """
        annotations = {
            "zenml.service_config": self.json(),
            "zenml.version": __version__,
        }
        return annotations

    @classmethod
    def create_from_deployment(
        cls, deployment: V1beta1InferenceService
    ) -> "KServeDeploymentConfig":
        """Recreate a KServe service from a KServe deployment resource.

        Args:
            deployment: the KServe inference service CRD.

        Returns:
            The KServe ZenML service configuration corresponding to the given
            KServe inference service CRD.

        Raises:
            ValueError: if the given deployment resource does not contain
                the expected annotations or it contains an invalid or
                incompatible KServe ZenML service configuration.
        """
        config_data = deployment.metadata.get("annotations").get(
            "zenml.service_config"
        )
        if not config_data:
            raise ValueError(
                f"The given deployment resource does not contain a "
                f"'zenml.service_config' annotation: {deployment}"
            )
        try:
            service_config = cls.parse_raw(config_data)
        except ValidationError as e:
            raise ValueError(
                f"The loaded KServe Inference Service resource contains an "
                f"invalid or incompatible KServe ZenML service configuration: "
                f"{config_data}"
            ) from e
        return service_config
create_from_deployment(deployment) classmethod

Recreate a KServe service from a KServe deployment resource.

Parameters:

Name Type Description Default
deployment kserve.V1beta1InferenceService

the KServe inference service CRD.

required

Returns:

Type Description
KServeDeploymentConfig

The KServe ZenML service configuration corresponding to the given KServe inference service CRD.

Exceptions:

Type Description
ValueError

if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe ZenML service configuration.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
    cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentConfig":
    """Recreate a KServe service from a KServe deployment resource.

    Args:
        deployment: the KServe inference service CRD.

    Returns:
        The KServe ZenML service configuration corresponding to the given
        KServe inference service CRD.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected annotations or it contains an invalid or
            incompatible KServe ZenML service configuration.
    """
    config_data = deployment.metadata.get("annotations").get(
        "zenml.service_config"
    )
    if not config_data:
        raise ValueError(
            f"The given deployment resource does not contain a "
            f"'zenml.service_config' annotation: {deployment}"
        )
    try:
        service_config = cls.parse_raw(config_data)
    except ValidationError as e:
        raise ValueError(
            f"The loaded KServe Inference Service resource contains an "
            f"invalid or incompatible KServe ZenML service configuration: "
            f"{config_data}"
        ) from e
    return service_config
get_kubernetes_annotations(self)

Generate the annotations for the KServe inference CRD the service configuration.

The annotations are used to store additional information about the KServe ZenML service associated with the deployment that is not available on the labels. One annotation is particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote KServe inference service CRD.

Returns:

Type Description
Dict[str, str]

The annotations for the KServe inference service CRD.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_annotations(self) -> Dict[str, str]:
    """Generate the annotations for the KServe inference CRD the service configuration.

    The annotations are used to store additional information about the
    KServe ZenML service associated with the deployment that is
    not available on the labels. One annotation is particularly important
    is the serialized Service configuration itself, which is used to
    recreate the service configuration from a remote KServe inference
    service CRD.

    Returns:
        The annotations for the KServe inference service CRD.
    """
    annotations = {
        "zenml.service_config": self.json(),
        "zenml.version": __version__,
    }
    return annotations
get_kubernetes_labels(self)

Generate the labels for the KServe inference CRD from the service configuration.

These labels are attached to the KServe inference service CRD and may be used as label selectors in lookup operations.

Returns:

Type Description
Dict[str, str]

The labels for the KServe inference service CRD.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_labels(self) -> Dict[str, str]:
    """Generate the labels for the KServe inference CRD from the service configuration.

    These labels are attached to the KServe inference service CRD
    and may be used as label selectors in lookup operations.

    Returns:
        The labels for the KServe inference service CRD.
    """
    labels = {"app": "zenml"}
    if self.pipeline_name:
        labels["zenml.pipeline_name"] = self.pipeline_name
    if self.run_name:
        labels["zenml.run_name"] = self.run_name
    if self.pipeline_step_name:
        labels["zenml.pipeline_step_name"] = self.pipeline_step_name
    if self.model_name:
        labels["zenml.model_name"] = self.model_name
    if self.model_uri:
        labels["zenml.model_uri"] = self.model_uri
    if self.predictor:
        labels["zenml.model_type"] = self.predictor
    self.sanitize_labels(labels)
    return labels
sanitize_labels(labels) staticmethod

Update the label values to be valid Kubernetes labels.

See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

Parameters:

Name Type Description Default
labels Dict[str, str]

The labels to sanitize.

required
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
    """Update the label values to be valid Kubernetes labels.

    See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

    Args:
        labels: The labels to sanitize.
    """
    # TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
    for key, value in labels.items():
        # Kubernetes labels must be alphanumeric, no longer than
        # 63 characters, and must begin and end with an alphanumeric
        # character ([a-z0-9A-Z])
        labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
            "-_."
        )
KServeDeploymentService (BaseDeploymentService) pydantic-model

A ZenML service that represents a KServe inference service CRD.

Attributes:

Name Type Description
config KServeDeploymentConfig

service configuration.

status ServiceStatus

service status.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentService(BaseDeploymentService):
    """A ZenML service that represents a KServe inference service CRD.

    Attributes:
        config: service configuration.
        status: service status.
    """

    SERVICE_TYPE = ServiceType(
        name="kserve-deployment",
        type="model-serving",
        flavor="kserve",
        description="KServe inference service",
    )

    config: KServeDeploymentConfig
    status: ServiceStatus = Field(default_factory=lambda: ServiceStatus())

    def _get_model_deployer(self) -> "KServeModelDeployer":
        """Get the active KServe model deployer.

        Returns:
            The active KServeModelDeployer.
        """
        from zenml.integrations.kserve.model_deployers.kserve_model_deployer import (
            KServeModelDeployer,
        )

        return cast(
            KServeModelDeployer,
            KServeModelDeployer.get_active_model_deployer(),
        )

    def _get_client(self) -> KServeClient:
        """Get the KServe client from the active KServe model deployer.

        Returns:
            The KServe client.
        """
        return self._get_model_deployer().kserve_client

    def _get_namespace(self) -> Optional[str]:
        """Get the Kubernetes namespace from the active KServe model deployer.

        Returns:
            The Kubernetes namespace, or None, if the default namespace is
            used.
        """
        return self._get_model_deployer().config.kubernetes_namespace

    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the state of the KServe inference service.

        This method Checks the current operational state of the external KServe
        inference service and translate it into a `ServiceState` value and a printable message.

        This method should be overridden by subclasses that implement concrete service tracking functionality.

        Returns:
            The operational state of the external service and a message
            providing additional information about that state (e.g. a
            description of the error if one is encountered while checking the
            service status).
        """
        client = self._get_client()
        namespace = self._get_namespace()

        name = self.crd_name
        try:
            deployment = client.get(name=name, namespace=namespace)
        except RuntimeError:
            return (ServiceState.INACTIVE, "")

        # TODO[MEDIUM]: Implement better operational status checking that also
        #   cover errors
        if "status" not in deployment:
            return (ServiceState.INACTIVE, "No operational status available")
        status = "Unknown"
        for condition in deployment["status"].get("conditions", {}):
            if condition.get("type", "") == "PredictorReady":
                status = condition.get("status", "Unknown")
                if status.lower() == "true":
                    return (
                        ServiceState.ACTIVE,
                        f"Inference service '{name}' is available",
                    )

                elif status.lower() == "false":
                    return (
                        ServiceState.PENDING_STARTUP,
                        f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
                    )
        return (
            ServiceState.PENDING_STARTUP,
            f"Inference service '{name}' still starting up",
        )

    @property
    def crd_name(self) -> str:
        """Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.

        Returns:
            The name of the KServe inference service CRD.
        """
        return (
            self._get_kubernetes_labels().get("zenml.model_name")
            or f"zenml-{str(self.uuid)[:8]}"
        )

    def _get_kubernetes_labels(self) -> Dict[str, str]:
        """Generate the labels for the KServe inference service CRD from the service configuration.

        Returns:
            The labels for the KServe inference service.
        """
        labels = self.config.get_kubernetes_labels()
        labels["zenml.service_uuid"] = str(self.uuid)
        KServeDeploymentConfig.sanitize_labels(labels)
        return labels

    @classmethod
    def create_from_deployment(
        cls, deployment: V1beta1InferenceService
    ) -> "KServeDeploymentService":
        """Recreate the configuration of a KServe Service from a deployed instance.

        Args:
            deployment: the KServe deployment resource.

        Returns:
            The KServe service configuration corresponding to the given
            KServe deployment resource.

        Raises:
            ValueError: if the given deployment resource does not contain
                the expected annotations or it contains an invalid or
                incompatible KServe service configuration.
        """
        config = KServeDeploymentConfig.create_from_deployment(deployment)
        uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
        if not uuid:
            raise ValueError(
                f"The given deployment resource does not contain a valid "
                f"'zenml.service_uuid' label: {deployment}"
            )
        service = cls(uuid=UUID(uuid), config=config)
        service.update_status()
        return service

    def provision(self) -> None:
        """Provision or update remote KServe deployment instance.

        This should then match the current configuration.
        """
        client = self._get_client()
        namespace = self._get_namespace()

        api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
        name = self.crd_name

        # All supported model specs seem to have the same fields
        # so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
        if self.config.container is not None:
            predictor_kwargs = {
                "containers": [
                    k8s_client.V1Container(
                        name=self.config.container.get("name"),
                        image=self.config.container.get("image"),
                        command=self.config.container.get("command"),
                        args=self.config.container.get("args"),
                        env=[
                            k8s_client.V1EnvVar(
                                name="STORAGE_URI",
                                value=self.config.container.get("storage_uri"),
                            )
                        ],
                    )
                ],
                "service_account_name": self.config.k8s_service_account,
            }
        else:
            predictor_kwargs = {
                self.config.predictor: V1beta1PredictorExtensionSpec(
                    storage_uri=self.config.model_uri,
                    resources=self.config.resources,
                ),
                "service_account_name": self.config.k8s_service_account,
            }

        isvc = V1beta1InferenceService(
            api_version=api_version,
            kind=constants.KSERVE_KIND,
            metadata=k8s_client.V1ObjectMeta(
                name=name,
                namespace=namespace,
                labels=self._get_kubernetes_labels(),
                annotations=self.config.get_kubernetes_annotations(),
            ),
            spec=V1beta1InferenceServiceSpec(
                predictor=V1beta1PredictorSpec(**predictor_kwargs)
            ),
        )

        # TODO[HIGH]: better error handling when provisioning KServe instances
        try:
            client.get(name=name, namespace=namespace)
            # update the existing deployment
            client.replace(name, isvc, namespace=namespace)
        except RuntimeError:
            client.create(isvc)

    def deprovision(self, force: bool = False) -> None:
        """Deprovisions all resources used by the service.

        Args:
            force: if True, the service will be deprovisioned even if it is
                still in use.

        Raises:
            ValueError: if the service is still in use and force is False.
        """
        client = self._get_client()
        namespace = self._get_namespace()
        name = self.crd_name

        # TODO[HIGH]: catch errors if deleting a KServe instance that is no
        #   longer available
        try:
            client.delete(name=name, namespace=namespace)
        except RuntimeError:
            raise ValueError(
                f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
            )

    def _get_deployment_logs(
        self,
        name: str,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Get the logs of a KServe deployment resource.

        Args:
            name: the name of the KServe deployment to get logs for.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.

        Raises:
            Exception: if an unknown error occurs while fetching the logs.

        Yields:
            The logs of the given deployment.
        """
        client = self._get_client()
        namespace = self._get_namespace()

        logger.debug(f"Retrieving logs for InferenceService resource: {name}")
        try:
            response = client.core_api.list_namespaced_pod(
                namespace=namespace,
                label_selector=f"zenml.service_uuid={self.uuid}",
            )
            logger.debug("Kubernetes API response: %s", response)
            pods = response.items
            if not pods:
                raise Exception(
                    f"The KServe deployment {name} is not currently "
                    f"running: no Kubernetes pods associated with it were found"
                )
            pod = pods[0]
            pod_name = pod.metadata.name

            containers = [c.name for c in pod.spec.containers]
            init_containers = [c.name for c in pod.spec.init_containers]
            container_statuses = {
                c.name: c.started or c.restart_count
                for c in pod.status.container_statuses
            }

            container = "default"
            if container not in containers:
                container = containers[0]

            if not container_statuses[container]:
                container = init_containers[0]

            logger.info(
                f"Retrieving logs for pod: `{pod_name}` and container "
                f"`{container}` in namespace `{namespace}`"
            )
            response = client.core_api.read_namespaced_pod_log(
                name=pod_name,
                namespace=namespace,
                container=container,
                follow=follow,
                tail_lines=tail,
                _preload_content=False,
            )
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when fetching logs for InferenceService resource "
                "%s: %s",
                name,
                str(e),
            )
            raise Exception(
                f"Unexpected exception when fetching logs for InferenceService "
                f"resource: {name}"
            ) from e

        try:
            while True:
                line = response.readline().decode("utf-8").rstrip("\n")
                if not line:
                    return
                stop = yield line
                if stop:
                    return
        finally:
            response.release_conn()

    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the logs from the remote KServe inference service instance.

        Args:
            follow: if True, the logs will be streamed as they are written.
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.
        """
        return self._get_deployment_logs(
            self.crd_name,
            follow=follow,
            tail=tail,
        )

    @property
    def prediction_url(self) -> Optional[str]:
        """The prediction URI exposed by the prediction service.

        Returns:
            The prediction URI exposed by the prediction service, or None if
            the service is not yet ready.
        """
        if not self.is_running:
            return None

        model_deployer = self._get_model_deployer()
        return os.path.join(
            model_deployer.config.base_url,
            "v1/models",
            f"{self.crd_name}:predict",
        )

    @property
    def prediction_hostname(self) -> Optional[str]:
        """The prediction hostname exposed by the prediction service.

        Returns:
            The prediction hostname exposed by the prediction service status
            that will be used in the headers of the prediction request.
        """
        if not self.is_running:
            return None

        namespace = self._get_namespace()

        model_deployer = self._get_model_deployer()
        custom_domain = model_deployer.config.custom_domain or "example.com"
        return f"{self.crd_name}.{namespace}.{custom_domain}"

    def predict(self, request: str) -> Any:
        """Make a prediction using the service.

        Args:
            request: a NumPy array representing the request

        Returns:
            A NumPy array represents the prediction returned by the service.

        Raises:
            Exception: if the service is not yet ready.
            ValueError: if the prediction_url is not set.
        """
        if not self.is_running:
            raise Exception(
                "KServe prediction service is not running. "
                "Please start the service before making predictions."
            )

        if self.prediction_url is None:
            raise ValueError("`self.prediction_url` is not set, cannot post.")
        if self.prediction_hostname is None:
            raise ValueError(
                "`self.prediction_hostname` is not set, cannot post."
            )
        headers = {"Host": self.prediction_hostname}
        if isinstance(request, str):
            request = json.loads(request)
        else:
            raise ValueError("Request must be a json string.")
        response = requests.post(  # nosec
            self.prediction_url,
            headers=headers,
            json={"instances": request},
        )
        response.raise_for_status()
        return response.json()
crd_name: str property readonly

Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.

Returns:

Type Description
str

The name of the KServe inference service CRD.

prediction_hostname: Optional[str] property readonly

The prediction hostname exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction hostname exposed by the prediction service status that will be used in the headers of the prediction request.

prediction_url: Optional[str] property readonly

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if the service is not yet ready.

check_status(self)

Check the state of the KServe inference service.

This method Checks the current operational state of the external KServe inference service and translate it into a ServiceState value and a printable message.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status).

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the state of the KServe inference service.

    This method Checks the current operational state of the external KServe
    inference service and translate it into a `ServiceState` value and a printable message.

    This method should be overridden by subclasses that implement concrete service tracking functionality.

    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
    client = self._get_client()
    namespace = self._get_namespace()

    name = self.crd_name
    try:
        deployment = client.get(name=name, namespace=namespace)
    except RuntimeError:
        return (ServiceState.INACTIVE, "")

    # TODO[MEDIUM]: Implement better operational status checking that also
    #   cover errors
    if "status" not in deployment:
        return (ServiceState.INACTIVE, "No operational status available")
    status = "Unknown"
    for condition in deployment["status"].get("conditions", {}):
        if condition.get("type", "") == "PredictorReady":
            status = condition.get("status", "Unknown")
            if status.lower() == "true":
                return (
                    ServiceState.ACTIVE,
                    f"Inference service '{name}' is available",
                )

            elif status.lower() == "false":
                return (
                    ServiceState.PENDING_STARTUP,
                    f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
                )
    return (
        ServiceState.PENDING_STARTUP,
        f"Inference service '{name}' still starting up",
    )
create_from_deployment(deployment) classmethod

Recreate the configuration of a KServe Service from a deployed instance.

Parameters:

Name Type Description Default
deployment kserve.V1beta1InferenceService

the KServe deployment resource.

required

Returns:

Type Description
KServeDeploymentService

The KServe service configuration corresponding to the given KServe deployment resource.

Exceptions:

Type Description
ValueError

if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe service configuration.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
    cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentService":
    """Recreate the configuration of a KServe Service from a deployed instance.

    Args:
        deployment: the KServe deployment resource.

    Returns:
        The KServe service configuration corresponding to the given
        KServe deployment resource.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected annotations or it contains an invalid or
            incompatible KServe service configuration.
    """
    config = KServeDeploymentConfig.create_from_deployment(deployment)
    uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
    if not uuid:
        raise ValueError(
            f"The given deployment resource does not contain a valid "
            f"'zenml.service_uuid' label: {deployment}"
        )
    service = cls(uuid=UUID(uuid), config=config)
    service.update_status()
    return service
deprovision(self, force=False)

Deprovisions all resources used by the service.

Parameters:

Name Type Description Default
force bool

if True, the service will be deprovisioned even if it is still in use.

False

Exceptions:

Type Description
ValueError

if the service is still in use and force is False.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def deprovision(self, force: bool = False) -> None:
    """Deprovisions all resources used by the service.

    Args:
        force: if True, the service will be deprovisioned even if it is
            still in use.

    Raises:
        ValueError: if the service is still in use and force is False.
    """
    client = self._get_client()
    namespace = self._get_namespace()
    name = self.crd_name

    # TODO[HIGH]: catch errors if deleting a KServe instance that is no
    #   longer available
    try:
        client.delete(name=name, namespace=namespace)
    except RuntimeError:
        raise ValueError(
            f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
        )
get_logs(self, follow=False, tail=None)

Retrieve the logs from the remote KServe inference service instance.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written.

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the logs from the remote KServe inference service instance.

    Args:
        follow: if True, the logs will be streamed as they are written.
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    return self._get_deployment_logs(
        self.crd_name,
        follow=follow,
        tail=tail,
    )
predict(self, request)

Make a prediction using the service.

Parameters:

Name Type Description Default
request str

a NumPy array representing the request

required

Returns:

Type Description
Any

A NumPy array represents the prediction returned by the service.

Exceptions:

Type Description
Exception

if the service is not yet ready.

ValueError

if the prediction_url is not set.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def predict(self, request: str) -> Any:
    """Make a prediction using the service.

    Args:
        request: a NumPy array representing the request

    Returns:
        A NumPy array represents the prediction returned by the service.

    Raises:
        Exception: if the service is not yet ready.
        ValueError: if the prediction_url is not set.
    """
    if not self.is_running:
        raise Exception(
            "KServe prediction service is not running. "
            "Please start the service before making predictions."
        )

    if self.prediction_url is None:
        raise ValueError("`self.prediction_url` is not set, cannot post.")
    if self.prediction_hostname is None:
        raise ValueError(
            "`self.prediction_hostname` is not set, cannot post."
        )
    headers = {"Host": self.prediction_hostname}
    if isinstance(request, str):
        request = json.loads(request)
    else:
        raise ValueError("Request must be a json string.")
    response = requests.post(  # nosec
        self.prediction_url,
        headers=headers,
        json={"instances": request},
    )
    response.raise_for_status()
    return response.json()
provision(self)

Provision or update remote KServe deployment instance.

This should then match the current configuration.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def provision(self) -> None:
    """Provision or update remote KServe deployment instance.

    This should then match the current configuration.
    """
    client = self._get_client()
    namespace = self._get_namespace()

    api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
    name = self.crd_name

    # All supported model specs seem to have the same fields
    # so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
    if self.config.container is not None:
        predictor_kwargs = {
            "containers": [
                k8s_client.V1Container(
                    name=self.config.container.get("name"),
                    image=self.config.container.get("image"),
                    command=self.config.container.get("command"),
                    args=self.config.container.get("args"),
                    env=[
                        k8s_client.V1EnvVar(
                            name="STORAGE_URI",
                            value=self.config.container.get("storage_uri"),
                        )
                    ],
                )
            ],
            "service_account_name": self.config.k8s_service_account,
        }
    else:
        predictor_kwargs = {
            self.config.predictor: V1beta1PredictorExtensionSpec(
                storage_uri=self.config.model_uri,
                resources=self.config.resources,
            ),
            "service_account_name": self.config.k8s_service_account,
        }

    isvc = V1beta1InferenceService(
        api_version=api_version,
        kind=constants.KSERVE_KIND,
        metadata=k8s_client.V1ObjectMeta(
            name=name,
            namespace=namespace,
            labels=self._get_kubernetes_labels(),
            annotations=self.config.get_kubernetes_annotations(),
        ),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(**predictor_kwargs)
        ),
    )

    # TODO[HIGH]: better error handling when provisioning KServe instances
    try:
        client.get(name=name, namespace=namespace)
        # update the existing deployment
        client.replace(name, isvc, namespace=namespace)
    except RuntimeError:
        client.create(isvc)

steps special

Initialization for KServe steps.

kserve_deployer

Implementation of the KServe Deployer step.

CustomDeployParameters (BaseModel) pydantic-model

Custom model deployer step extra parameters.

Attributes:

Name Type Description
predict_function str

Path to Python file containing predict function.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class CustomDeployParameters(BaseModel):
    """Custom model deployer step extra parameters.

    Attributes:
        predict_function: Path to Python file containing predict function.
    """

    predict_function: str

    @validator("predict_function")
    def predict_function_validate(cls, predict_func_path: str) -> str:
        """Validate predict function.

        Args:
            predict_func_path: predict function path

        Returns:
            predict function path

        Raises:
            ValueError: if predict function path is not valid
            TypeError: if predict function path is not a callable function
        """
        try:
            predict_function = source_utils.load(predict_func_path)
        except AttributeError:
            raise ValueError("Predict function can't be found.")
        if not callable(predict_function):
            raise TypeError("Predict function must be callable.")
        return predict_func_path
predict_function_validate(predict_func_path) classmethod

Validate predict function.

Parameters:

Name Type Description Default
predict_func_path str

predict function path

required

Returns:

Type Description
str

predict function path

Exceptions:

Type Description
ValueError

if predict function path is not valid

TypeError

if predict function path is not a callable function

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
    """Validate predict function.

    Args:
        predict_func_path: predict function path

    Returns:
        predict function path

    Raises:
        ValueError: if predict function path is not valid
        TypeError: if predict function path is not a callable function
    """
    try:
        predict_function = source_utils.load(predict_func_path)
    except AttributeError:
        raise ValueError("Predict function can't be found.")
    if not callable(predict_function):
        raise TypeError("Predict function must be callable.")
    return predict_func_path
KServeDeployerStepParameters (BaseParameters) pydantic-model

KServe model deployer step parameters.

Attributes:

Name Type Description
service_config KServeDeploymentConfig

KServe deployment service configuration.

torch_serve_params

TorchServe set of parameters to deploy model.

timeout int

Timeout for model deployment.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
    """KServe model deployer step parameters.

    Attributes:
        service_config: KServe deployment service configuration.
        torch_serve_params: TorchServe set of parameters to deploy model.
        timeout: Timeout for model deployment.
    """

    service_config: KServeDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    torch_serve_parameters: Optional[TorchServeParameters] = None
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
TorchServeParameters (BaseModel) pydantic-model

KServe PyTorch model deployer step configuration.

Attributes:

Name Type Description
model_class str

Path to Python file containing model architecture.

handler str

TorchServe's handler file to handle custom TorchServe inference logic.

extra_files Optional[List[str]]

Comma separated path to extra dependency files.

model_version Optional[str]

Model version.

requirements_file Optional[str]

Path to requirements file.

torch_config Optional[str]

TorchServe configuration file path.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class TorchServeParameters(BaseModel):
    """KServe PyTorch model deployer step configuration.

    Attributes:
        model_class: Path to Python file containing model architecture.
        handler: TorchServe's handler file to handle custom TorchServe inference
            logic.
        extra_files: Comma separated path to extra dependency files.
        model_version: Model version.
        requirements_file: Path to requirements file.
        torch_config: TorchServe configuration file path.
    """

    model_class: str
    handler: str
    extra_files: Optional[List[str]] = None
    requirements_file: Optional[str] = None
    model_version: Optional[str] = "1.0"
    torch_config: Optional[str] = None

    @validator("model_class")
    def model_class_validate(cls, v: str) -> str:
        """Validate model class file path.

        Args:
            v: model class file path

        Returns:
            model class file path

        Raises:
            ValueError: if model class file path is not valid
        """
        if not v:
            raise ValueError("Model class file path is required.")
        if not Client.is_inside_repository(v):
            raise ValueError(
                "Model class file path must be inside the repository."
            )
        return v

    @validator("handler")
    def handler_validate(cls, v: str) -> str:
        """Validate handler.

        Args:
            v: handler file path

        Returns:
            handler file path

        Raises:
            ValueError: if handler file path is not valid
        """
        if v:
            if v in TORCH_HANDLERS:
                return v
            elif Client.is_inside_repository(v):
                return v
            else:
                raise ValueError(
                    "Handler must be one of the TorchServe handlers",
                    "or a file that exists inside the repository.",
                )
        else:
            raise ValueError("Handler is required.")

    @validator("extra_files")
    def extra_files_validate(
        cls, v: Optional[List[str]]
    ) -> Optional[List[str]]:
        """Validate extra files.

        Args:
            v: extra files path

        Returns:
            extra files path

        Raises:
            ValueError: if the extra files path is not valid
        """
        extra_files = []
        if v is not None:
            for file_path in v:
                if Client.is_inside_repository(file_path):
                    extra_files.append(file_path)
                else:
                    raise ValueError(
                        "Extra file path must be inside the repository."
                    )
            return extra_files
        return v

    @validator("torch_config")
    def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
        """Validate torch config file.

        Args:
            v: torch config file path

        Returns:
            torch config file path

        Raises:
            ValueError: if torch config file path is not valid.
        """
        if v:
            if Client.is_inside_repository(v):
                return v
            else:
                raise ValueError(
                    "Torch config file path must be inside the repository."
                )
        return v
extra_files_validate(v) classmethod

Validate extra files.

Parameters:

Name Type Description Default
v Optional[List[str]]

extra files path

required

Returns:

Type Description
Optional[List[str]]

extra files path

Exceptions:

Type Description
ValueError

if the extra files path is not valid

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("extra_files")
def extra_files_validate(
    cls, v: Optional[List[str]]
) -> Optional[List[str]]:
    """Validate extra files.

    Args:
        v: extra files path

    Returns:
        extra files path

    Raises:
        ValueError: if the extra files path is not valid
    """
    extra_files = []
    if v is not None:
        for file_path in v:
            if Client.is_inside_repository(file_path):
                extra_files.append(file_path)
            else:
                raise ValueError(
                    "Extra file path must be inside the repository."
                )
        return extra_files
    return v
handler_validate(v) classmethod

Validate handler.

Parameters:

Name Type Description Default
v str

handler file path

required

Returns:

Type Description
str

handler file path

Exceptions:

Type Description
ValueError

if handler file path is not valid

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("handler")
def handler_validate(cls, v: str) -> str:
    """Validate handler.

    Args:
        v: handler file path

    Returns:
        handler file path

    Raises:
        ValueError: if handler file path is not valid
    """
    if v:
        if v in TORCH_HANDLERS:
            return v
        elif Client.is_inside_repository(v):
            return v
        else:
            raise ValueError(
                "Handler must be one of the TorchServe handlers",
                "or a file that exists inside the repository.",
            )
    else:
        raise ValueError("Handler is required.")
model_class_validate(v) classmethod

Validate model class file path.

Parameters:

Name Type Description Default
v str

model class file path

required

Returns:

Type Description
str

model class file path

Exceptions:

Type Description
ValueError

if model class file path is not valid

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("model_class")
def model_class_validate(cls, v: str) -> str:
    """Validate model class file path.

    Args:
        v: model class file path

    Returns:
        model class file path

    Raises:
        ValueError: if model class file path is not valid
    """
    if not v:
        raise ValueError("Model class file path is required.")
    if not Client.is_inside_repository(v):
        raise ValueError(
            "Model class file path must be inside the repository."
        )
    return v
torch_config_validate(v) classmethod

Validate torch config file.

Parameters:

Name Type Description Default
v Optional[str]

torch config file path

required

Returns:

Type Description
Optional[str]

torch config file path

Exceptions:

Type Description
ValueError

if torch config file path is not valid.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("torch_config")
def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
    """Validate torch config file.

    Args:
        v: torch config file path

    Returns:
        torch config file path

    Raises:
        ValueError: if torch config file path is not valid.
    """
    if v:
        if Client.is_inside_repository(v):
            return v
        else:
            raise ValueError(
                "Torch config file path must be inside the repository."
            )
    return v
kserve_custom_model_deployer_step (_DecoratedStep)

KServe custom model deployer pipeline step.

This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
model

the model artifact to deploy

required
context

the step context

required

Exceptions:

Type Description
ValueError

if the custom deployer parameters is not defined

DoesNotExistException

if no active stack is found

RuntimeError

if the build is missing for the pipeline run

Returns:

Type Description

KServe deployment service

entrypoint(deploy_decision, params, context, model) staticmethod

KServe custom model deployer pipeline step.

This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params KServeDeployerStepParameters

parameters for the deployer step

required
model UnmaterializedArtifact

the model artifact to deploy

required
context StepContext

the step context

required

Exceptions:

Type Description
ValueError

if the custom deployer parameters is not defined

DoesNotExistException

if no active stack is found

RuntimeError

if the build is missing for the pipeline run

Returns:

Type Description
KServeDeploymentService

KServe deployment service

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False, extra={KSERVE_CUSTOM_DEPLOYMENT: True})
def kserve_custom_model_deployer_step(
    deploy_decision: bool,
    params: KServeDeployerStepParameters,
    context: StepContext,
    model: UnmaterializedArtifact,
) -> KServeDeploymentService:
    """KServe custom model deployer pipeline step.

    This step can be used in a pipeline to implement the
    process required to deploy a custom model with KServe.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        model: the model artifact to deploy
        context: the step context

    Raises:
        ValueError: if the custom deployer parameters is not defined
        DoesNotExistException: if no active stack is found
        RuntimeError: if the build is missing for the pipeline run


    Returns:
        KServe deployment service
    """
    # verify that a custom deployer is defined
    if not params.custom_deploy_parameters:
        raise ValueError(
            "Custom deploy parameter which contains the path of the ",
            "custom predict function is required for custom model deployment.",
        )

    # get the active model deployer
    model_deployer = cast(
        KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name, run id
    step_context = get_step_context()
    pipeline_name = step_context.pipeline.name
    run_name = step_context.pipeline_run.name
    step_name = step_context.step_run.name

    # update the step configuration with the real pipeline runtime information
    params.service_config.pipeline_name = pipeline_name
    params.service_config.run_name = run_name
    params.service_config.pipeline_step_name = step_name

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.service_config.model_name,
    )

    # even when the deploy decision is negative if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing the last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.service_config.model_name}'..."
        )
        service = cast(KServeDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # entrypoint for starting KServe server deployment for custom model
    entrypoint_command = [
        "python",
        "-m",
        "zenml.integrations.kserve.custom_deployer.zenml_custom_model",
        "--model_name",
        params.service_config.model_name,
        "--predict_func",
        params.custom_deploy_parameters.predict_function,
    ]

    # verify if there is an active stack before starting the service
    if not Client().active_stack:
        raise DoesNotExistException(
            "No active stack is available. "
            "Please make sure that you have registered and set a stack."
        )

    pipeline_run = step_context.pipeline_run
    if not pipeline_run.build:
        raise RuntimeError(
            f"Missing build for run {pipeline_run.id}. This is probably "
            "because the build was manually deleted."
        )

    image_name = pipeline_run.build.get_image(
        component_key=KSERVE_DOCKER_IMAGE_KEY, step=step_name
    )

    # copy the model files to a new specific directory for the deployment
    served_model_uri = os.path.join(
        context.get_output_artifact_uri(), "kserve"
    )
    fileio.makedirs(served_model_uri)
    io_utils.copy_dir(model.uri, served_model_uri)

    # save the model artifact metadata to the YAML file and copy it to the
    # deployment directory
    model_metadata_file = save_model_metadata(model)
    fileio.copy(
        model_metadata_file,
        os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
    )

    # prepare the service configuration for the deployment
    service_config = params.service_config.copy()
    service_config.model_uri = served_model_uri

    # Prepare container config for custom model deployment
    service_config.container = {
        "name": service_config.model_name,
        "image": image_name,
        "command": entrypoint_command,
        "storage_uri": service_config.model_uri,
    }

    # deploy the service
    service = cast(
        KServeDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"KServe deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
        f"    With the hostname: {service.prediction_hostname}."
    )

    return service
kserve_model_deployer_step (_DecoratedStep)

KServe model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
model

the model artifact to deploy

required
context

the step context

required

Returns:

Type Description

KServe deployment service

entrypoint(deploy_decision, params, context, model) staticmethod

KServe model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params KServeDeployerStepParameters

parameters for the deployer step

required
model UnmaterializedArtifact

the model artifact to deploy

required
context StepContext

the step context

required

Returns:

Type Description
KServeDeploymentService

KServe deployment service

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False)
def kserve_model_deployer_step(
    deploy_decision: bool,
    params: KServeDeployerStepParameters,
    context: StepContext,
    model: UnmaterializedArtifact,
) -> KServeDeploymentService:
    """KServe model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment for an ML model with KServe.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        model: the model artifact to deploy
        context: the step context

    Returns:
        KServe deployment service
    """
    model_deployer = cast(
        KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    step_context = get_step_context()
    pipeline_name = step_context.pipeline.name
    run_name = step_context.pipeline_run.name
    step_name = step_context.step_run.name

    # update the step configuration with the real pipeline runtime information
    params.service_config.pipeline_name = pipeline_name
    params.service_config.run_name = run_name
    params.service_config.pipeline_step_name = step_name

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.service_config.model_name,
    )

    # even when the deploy decision is negative if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing the last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.service_config.model_name}'..."
        )
        service = cast(KServeDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # invoke the KServe model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    if params.service_config.predictor == "pytorch":
        # import the prepare function from the step utils
        from zenml.integrations.kserve.steps.kserve_step_utils import (
            prepare_torch_service_config,
        )

        # prepare the service config
        service_config = prepare_torch_service_config(
            model_uri=model.uri,
            output_artifact_uri=context.get_output_artifact_uri(),
            params=params,
        )
    else:
        # import the prepare function from the step utils
        from zenml.integrations.kserve.steps.kserve_step_utils import (
            prepare_service_config,
        )

        # prepare the service config
        service_config = prepare_service_config(
            model_uri=model.uri,
            output_artifact_uri=context.get_output_artifact_uri(),
            params=params,
        )
    service = cast(
        KServeDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"KServe deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
        f"    With the hostname: {service.prediction_hostname}."
    )

    return service

kserve_step_utils

Utility functions used by the KServe deployer step.

TorchModelArchiver (BaseModel) pydantic-model

Model Archiver for PyTorch models.

Attributes:

Name Type Description
model_name str

Model name.

model_version

Model version.

serialized_file str

Serialized model file.

handler str

TorchServe's handler file to handle custom TorchServe inference logic.

extra_files Optional[List[str]]

Comma separated path to extra dependency files.

requirements_file Optional[str]

Path to requirements file.

export_path str

Path to export model.

runtime Optional[str]

Runtime of the model.

force Optional[bool]

Force export of the model.

archive_format Optional[str]

Archive format.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
class TorchModelArchiver(BaseModel):
    """Model Archiver for PyTorch models.

    Attributes:
        model_name: Model name.
        model_version: Model version.
        serialized_file: Serialized model file.
        handler: TorchServe's handler file to handle custom TorchServe inference logic.
        extra_files: Comma separated path to extra dependency files.
        requirements_file: Path to requirements file.
        export_path: Path to export model.
        runtime: Runtime of the model.
        force: Force export of the model.
        archive_format: Archive format.
    """

    model_name: str
    serialized_file: str
    model_file: str
    handler: str
    export_path: str
    extra_files: Optional[List[str]] = None
    version: Optional[str] = None
    requirements_file: Optional[str] = None
    runtime: Optional[str] = "python"
    force: Optional[bool] = None
    archive_format: Optional[str] = "default"
generate_model_deployer_config(model_name, directory)

Generate a model deployer config.

Parameters:

Name Type Description Default
model_name str

the name of the model

required
directory str

the directory where the model is stored

required

Returns:

Type Description
str

None

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def generate_model_deployer_config(
    model_name: str,
    directory: str,
) -> str:
    """Generate a model deployer config.

    Args:
        model_name: the name of the model
        directory: the directory where the model is stored

    Returns:
        None
    """
    config_lines = [
        "inference_address=http://0.0.0.0:8085",
        "management_address=http://0.0.0.0:8085",
        "metrics_address=http://0.0.0.0:8082",
        "grpc_inference_port=7070",
        "grpc_management_port=7071",
        "enable_metrics_api=true",
        "metrics_format=prometheus",
        "number_of_netty_threads=4",
        "job_queue_size=10",
        "enable_envvars_config=true",
        "install_py_dep_per_model=true",
        "model_store=/mnt/models/model-store",
    ]

    with tempfile.NamedTemporaryFile(
        suffix=".properties", mode="w+", dir=directory, delete=False
    ) as f:
        for line in config_lines:
            f.write(line + "\n")
        f.write(
            f'model_snapshot={{"name":"startup.cfg","modelCount":1,"models":{{"{model_name}":{{"1.0":{{"defaultVersion":true,"marName":"{model_name}.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"maxBatchDelay":10,"responseTimeout":120}}}}}}}}'
        )
    f.close()
    return f.name
is_valid_model_name(model_name)

Checks if the model name is valid.

Parameters:

Name Type Description Default
model_name str

the model name to check

required

Returns:

Type Description
bool

True if the model name is valid, False otherwise.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def is_valid_model_name(model_name: str) -> bool:
    """Checks if the model name is valid.

    Args:
        model_name: the model name to check

    Returns:
        True if the model name is valid, False otherwise.
    """
    pattern = re.compile("^[a-z0-9-]+$")
    return pattern.match(model_name) is not None
prepare_service_config(model_uri, output_artifact_uri, params)

Prepare the model files for model serving.

This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.

Parameters:

Name Type Description Default
model_uri str

the URI of the model artifact being served

required
output_artifact_uri str

the URI of the output artifact

required
params KServeDeployerStepParameters

the KServe deployer step parameters

required

Returns:

Type Description
KServeDeploymentConfig

The URL to the model is ready for serving.

Exceptions:

Type Description
RuntimeError

if the model files cannot be prepared.

ValidationError

if the model name is invalid.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_service_config(
    model_uri: str,
    output_artifact_uri: str,
    params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
    """Prepare the model files for model serving.

    This function ensures that the model files are in the correct format
    and file structure required by the KServe server implementation
    used for model serving.

    Args:
        model_uri: the URI of the model artifact being served
        output_artifact_uri: the URI of the output artifact
        params: the KServe deployer step parameters

    Returns:
        The URL to the model is ready for serving.

    Raises:
        RuntimeError: if the model files cannot be prepared.
        ValidationError: if the model name is invalid.
    """
    served_model_uri = os.path.join(output_artifact_uri, "kserve")
    fileio.makedirs(served_model_uri)

    # TODO [ENG-773]: determine how to formalize how models are organized into
    #   folders and sub-folders depending on the model type/format and the
    #   KServe protocol used to serve the model.

    # TODO [ENG-791]: an auto-detect built-in KServe server implementation
    #   from the model artifact type

    # TODO [ENG-792]: validate the model artifact type against the
    #   supported built-in KServe server implementations
    if params.service_config.predictor == "tensorflow":
        # the TensorFlow server expects model artifacts to be
        # stored in numbered subdirectories, each representing a model
        # version
        served_model_uri = os.path.join(
            served_model_uri,
            params.service_config.predictor,
            params.service_config.model_name,
        )
        fileio.makedirs(served_model_uri)
        io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
    elif params.service_config.predictor == "sklearn":
        # the sklearn server expects model artifacts to be
        # stored in a file called model.joblib
        model_uri = os.path.join(model_uri, "model")
        if not fileio.exists(model_uri):
            raise RuntimeError(
                f"Expected sklearn model artifact was not found at "
                f"{model_uri}"
            )
        served_model_uri = os.path.join(
            served_model_uri,
            params.service_config.predictor,
            params.service_config.model_name,
        )
        fileio.makedirs(served_model_uri)
        fileio.copy(model_uri, os.path.join(served_model_uri, "model.joblib"))
    elif not is_valid_model_name(params.service_config.model_name):
        raise ValidationError(
            f"Model name '{params.service_config.model_name}' is invalid. "
            f"The model name can only include lowercase alphanumeric "
            "characters and hyphens. Please rename your model and try again."
        )
    else:
        # default treatment for all other server implementations is to
        # simply reuse the model from the artifact store path where it
        # is originally stored
        served_model_uri = os.path.join(
            served_model_uri,
            params.service_config.predictor,
            params.service_config.model_name,
        )
        fileio.makedirs(served_model_uri)
        fileio.copy(model_uri, served_model_uri)

    service_config = params.service_config.copy()
    service_config.model_uri = served_model_uri
    return service_config
prepare_torch_service_config(model_uri, output_artifact_uri, params)

Prepare the PyTorch model files for model serving.

This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.

Parameters:

Name Type Description Default
model_uri str

the URI of the model artifact being served

required
output_artifact_uri str

the URI of the output artifact

required
params KServeDeployerStepParameters

the KServe deployer step parameters

required

Returns:

Type Description
KServeDeploymentConfig

The URL to the model is ready for serving.

Exceptions:

Type Description
RuntimeError

if the model files cannot be prepared.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_torch_service_config(
    model_uri: str,
    output_artifact_uri: str,
    params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
    """Prepare the PyTorch model files for model serving.

    This function ensures that the model files are in the correct format
    and file structure required by the KServe server implementation
    used for model serving.

    Args:
        model_uri: the URI of the model artifact being served
        output_artifact_uri: the URI of the output artifact
        params: the KServe deployer step parameters

    Returns:
        The URL to the model is ready for serving.

    Raises:
        RuntimeError: if the model files cannot be prepared.
    """
    deployment_folder_uri = os.path.join(output_artifact_uri, "kserve")
    served_model_uri = os.path.join(deployment_folder_uri, "model-store")
    config_properties_uri = os.path.join(deployment_folder_uri, "config")
    fileio.makedirs(served_model_uri)
    fileio.makedirs(config_properties_uri)

    if params.torch_serve_parameters is None:
        raise RuntimeError("No torch serve parameters provided")
    else:
        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-pytorch-temp-")
        tmp_model_uri = os.path.join(
            str(temp_dir), f"{params.service_config.model_name}.pt"
        )

        # Copy from artifact store to temporary file
        fileio.copy(f"{model_uri}/checkpoint.pt", tmp_model_uri)

        torch_archiver_args = TorchModelArchiver(
            model_name=params.service_config.model_name,
            serialized_file=tmp_model_uri,
            model_file=params.torch_serve_parameters.model_class,
            handler=params.torch_serve_parameters.handler,
            export_path=temp_dir,
            version=params.torch_serve_parameters.model_version,
        )

        manifest = ModelExportUtils.generate_manifest_json(torch_archiver_args)
        package_model(torch_archiver_args, manifest=manifest)

        # Copy from temporary file to artifact store
        archived_model_uri = os.path.join(
            temp_dir, f"{params.service_config.model_name}.mar"
        )
        if not fileio.exists(archived_model_uri):
            raise RuntimeError(
                f"Expected torch archived model artifact was not found at "
                f"{archived_model_uri}"
            )

        # Copy the torch model archive artifact to the model store
        fileio.copy(
            archived_model_uri,
            os.path.join(
                served_model_uri, f"{params.service_config.model_name}.mar"
            ),
        )

        # Get or Generate the config file
        if params.torch_serve_parameters.torch_config:
            # Copy the torch model config to the model store
            fileio.copy(
                params.torch_serve_parameters.torch_config,
                os.path.join(config_properties_uri, "config.properties"),
            )
        else:
            # Generate the config file
            config_file_uri = generate_model_deployer_config(
                model_name=params.service_config.model_name,
                directory=temp_dir,
            )
            # Copy the torch model config to the model store
            fileio.copy(
                config_file_uri,
                os.path.join(config_properties_uri, "config.properties"),
            )

    service_config = params.service_config.copy()
    service_config.model_uri = deployment_folder_uri
    return service_config