Seldon
zenml.integrations.seldon
special
Initialization of the Seldon integration.
The Seldon Core integration allows you to use the Seldon Core model serving platform to implement continuous model deployment.
SeldonIntegration (Integration)
Definition of Seldon Core integration for ZenML.
Source code in zenml/integrations/seldon/__init__.py
class SeldonIntegration(Integration):
"""Definition of Seldon Core integration for ZenML."""
NAME = SELDON
REQUIREMENTS = [
"kubernetes==18.20.0",
]
@classmethod
def activate(cls) -> None:
"""Activate the Seldon Core integration."""
from zenml.integrations.seldon import secret_schemas # noqa
from zenml.integrations.seldon import services # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Seldon Core.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor
return [SeldonModelDeployerFlavor]
activate()
classmethod
Activate the Seldon Core integration.
Source code in zenml/integrations/seldon/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the Seldon Core integration."""
from zenml.integrations.seldon import secret_schemas # noqa
from zenml.integrations.seldon import services # noqa
flavors()
classmethod
Declare the stack component flavors for the Seldon Core.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/seldon/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Seldon Core.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor
return [SeldonModelDeployerFlavor]
constants
Seldon constants.
custom_deployer
special
Initialization of ZenML custom deployer.
zenml_custom_model
Implements a custom model for the Seldon integration.
ZenMLCustomModel
Custom model class for ZenML and Seldon.
This class is used to implement a custom model for the Seldon Core integration, which is used as the main entry point for custom code execution.
Attributes:
Name | Type | Description |
---|---|---|
name |
The name of the model. |
|
model_uri |
The URI of the model. |
|
predict_func |
The predict function of the model. |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel:
"""Custom model class for ZenML and Seldon.
This class is used to implement a custom model for the Seldon Core integration,
which is used as the main entry point for custom code execution.
Attributes:
name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
self.name = model_name
self.model_uri = model_uri
self.predict_func = source_utils.load(predict_func)
self.model = None
self.ready = False
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a file at the preparing time and loading it again at the
prediction time by the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.artifacts.utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
def predict(
self,
X: Array_Like,
features_names: Optional[List[str]],
**kwargs: Any,
) -> Array_Like:
"""Predict the given request.
The main predict function of the model. This function is called by the
Seldon Core server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
X: The request to predict in a dictionary.
features_names: The names of the features.
**kwargs: Additional arguments.
Returns:
The prediction dictionary.
Raises:
Exception: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {"predictions": self.predict_func(self.model, X)}
except Exception as e:
raise Exception("Failed to predict: {}".format(e))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func)
special
Initializes a ZenMLCustomModel object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
The name of the model. |
required |
model_uri |
str |
The URI of the model. |
required |
predict_func |
str |
The predict function of the model. |
required |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
self.name = model_name
self.model_uri = model_uri
self.predict_func = source_utils.load(predict_func)
self.model = None
self.ready = False
load(self)
Load the model.
This function loads the model into memory and sets the ready flag to True. The model is loaded using the materializer, by saving the information of the artifact to a file at the preparing time and loading it again at the prediction time by the materializer.
Returns:
Type | Description |
---|---|
bool |
True if the model was loaded successfully, False otherwise. |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a file at the preparing time and loading it again at the
prediction time by the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.artifacts.utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
predict(self, X, features_names, **kwargs)
Predict the given request.
The main predict function of the model. This function is called by the Seldon Core server when a request is received. Then inside this function, the user-defined predict function is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
Union[numpy.ndarray[Any, Any], List[Any], str, bytes, Dict[str, Any]] |
The request to predict in a dictionary. |
required |
features_names |
Optional[List[str]] |
The names of the features. |
required |
**kwargs |
Any |
Additional arguments. |
{} |
Returns:
Type | Description |
---|---|
Union[numpy.ndarray[Any, Any], List[Any], str, bytes, Dict[str, Any]] |
The prediction dictionary. |
Exceptions:
Type | Description |
---|---|
Exception |
If function could not be called. |
NotImplementedError |
If the model is not ready. |
TypeError |
If the request is not a dictionary. |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def predict(
self,
X: Array_Like,
features_names: Optional[List[str]],
**kwargs: Any,
) -> Array_Like:
"""Predict the given request.
The main predict function of the model. This function is called by the
Seldon Core server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
X: The request to predict in a dictionary.
features_names: The names of the features.
**kwargs: Additional arguments.
Returns:
The prediction dictionary.
Raises:
Exception: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {"predictions": self.predict_func(self.model, X)}
except Exception as e:
raise Exception("Failed to predict: {}".format(e))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
flavors
special
Seldon integration flavors.
seldon_model_deployer_flavor
Seldon model deployer flavor.
SeldonModelDeployerConfig (BaseModelDeployerConfig)
Config for the Seldon Model Deployer.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
Optional[str] |
the Kubernetes context to use to contact the remote Seldon Core installation. If not specified, the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is ignored. |
kubernetes_namespace |
Optional[str] |
the Kubernetes namespace where the Seldon Core deployment servers are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is mandatory. |
base_url |
str |
the base URL of the Kubernetes ingress used to expose the Seldon Core deployment servers. |
secret |
Optional[str] |
the name of a ZenML secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored - see https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials). |
kubernetes_secret_name |
Optional[str] |
the name of the Kubernetes secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored) - This is used when the secret is not managed by ZenML and is already present in the Kubernetes cluster. |
Source code in zenml/integrations/seldon/flavors/seldon_model_deployer_flavor.py
class SeldonModelDeployerConfig(BaseModelDeployerConfig):
"""Config for the Seldon Model Deployer.
Attributes:
kubernetes_context: the Kubernetes context to use to contact the remote
Seldon Core installation. If not specified, the current
configuration is used. Depending on where the Seldon model deployer
is being used, this can be either a locally active context or an
in-cluster Kubernetes configuration (if running inside a pod).
If the model deployer stack component is linked to a Kubernetes
service connector, this field is ignored.
kubernetes_namespace: the Kubernetes namespace where the Seldon Core
deployment servers are provisioned and managed by ZenML. If not
specified, the namespace set in the current configuration is used.
Depending on where the Seldon model deployer is being used, this can
be either the current namespace configured in the locally active
context or the namespace in the context of which the pod is running
(if running inside a pod).
If the model deployer stack component is linked to a Kubernetes
service connector, this field is mandatory.
base_url: the base URL of the Kubernetes ingress used to expose the
Seldon Core deployment servers.
secret: the name of a ZenML secret containing the credentials used by
Seldon Core storage initializers to authenticate to the Artifact
Store (i.e. the storage backend where models are stored - see
https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials).
kubernetes_secret_name: the name of the Kubernetes secret containing
the credentials used by Seldon Core storage initializers to
authenticate to the Artifact Store (i.e. the storage backend where
models are stored) - This is used when the secret is not managed by
ZenML and is already present in the Kubernetes cluster.
"""
kubernetes_context: Optional[str] = None
kubernetes_namespace: Optional[str] = None
base_url: str # TODO: unused?
secret: Optional[str]
kubernetes_secret_name: Optional[
str
] # TODO: Add full documentation section on this
SeldonModelDeployerFlavor (BaseModelDeployerFlavor)
Seldon Core model deployer flavor.
Source code in zenml/integrations/seldon/flavors/seldon_model_deployer_flavor.py
class SeldonModelDeployerFlavor(BaseModelDeployerFlavor):
"""Seldon Core model deployer flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SELDON_MODEL_DEPLOYER_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/seldon.png"
@property
def config_class(self) -> Type[SeldonModelDeployerConfig]:
"""Returns `SeldonModelDeployerConfig` config class.
Returns:
The config class.
"""
return SeldonModelDeployerConfig
@property
def implementation_class(self) -> Type["SeldonModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.seldon.model_deployers import (
SeldonModelDeployer,
)
return SeldonModelDeployer
config_class: Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig]
property
readonly
Returns SeldonModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SeldonModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SeldonModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
model_deployers
special
Initialization of the Seldon Model Deployer.
seldon_model_deployer
Implementation of the Seldon Model Deployer.
SeldonModelDeployer (BaseModelDeployer)
Seldon Core model deployer stack component implementation.
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
class SeldonModelDeployer(BaseModelDeployer):
"""Seldon Core model deployer stack component implementation."""
NAME: ClassVar[str] = "Seldon Core"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = SeldonModelDeployerFlavor
_client: Optional[SeldonClient] = None
@property
def config(self) -> SeldonModelDeployerConfig:
"""Returns the `SeldonModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(SeldonModelDeployerConfig, self._config)
@property
def validator(self) -> Optional[StackValidator]:
"""Ensures there is a container registry and image builder in the stack.
Returns:
A `StackValidator` instance.
"""
return StackValidator(
required_components={
StackComponentType.IMAGE_BUILDER,
}
)
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information that might be relevant to the user.
Args:
service_instance: Instance of a SeldonDeploymentService
Returns:
Model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
}
@property
def seldon_client(self) -> SeldonClient:
"""Get the Seldon Core client associated with this model deployer.
Returns:
The Seldon Core client.
Raises:
RuntimeError: If the Kubernetes namespace is not configured when
using a service connector to deploy models with Seldon Core.
"""
from kubernetes import client as k8s_client
# Refresh the client also if the connector has expired
if self._client and not self.connector_has_expired():
return self._client
connector = self.get_connector()
kube_client: Optional[k8s_client.ApiClient] = None
if connector:
if not self.config.kubernetes_namespace:
raise RuntimeError(
"The Kubernetes namespace must be explicitly configured in "
"the stack component when using a service connector to "
"deploy models with Seldon Core."
)
kube_client = connector.connect()
if not isinstance(kube_client, k8s_client.ApiClient):
raise RuntimeError(
f"Expected a k8s_client.ApiClient while trying to use the "
f"linked connector, but got {type(kube_client)}."
)
self._client = SeldonClient(
context=self.config.kubernetes_context,
namespace=self.config.kubernetes_namespace,
kube_client=kube_client,
)
return self._client
@property
def kubernetes_secret_name(self) -> str:
"""Get the Kubernetes secret name associated with this model deployer.
If a pre-existing Kubernetes secret is configured for this model
deployer, that name is returned to be used by all Seldon Core
deployments associated with this model deployer.
Otherwise, a Kubernetes secret name is generated based on the ID of
the active artifact store. The reason for this is that the same model
deployer may be used to deploy models in combination with different
artifact stores at the same time, and each artifact store may require
different credentials to be accessed.
Returns:
The name of a Kubernetes secret to be used with Seldon Core
deployments.
"""
if self.config.kubernetes_secret_name:
return self.config.kubernetes_secret_name
artifact_store = Client().active_stack.artifact_store
return (
re.sub(
r"[^0-9a-zA-Z-]+",
"-",
f"zenml-seldon-core-{artifact_store.id}",
)
.strip("-")
.lower()
)
def get_docker_builds(
self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
build = BuildConfiguration(
key=SELDON_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
def _create_or_update_kubernetes_secret(self) -> Optional[str]:
"""Create or update the Kubernetes secret used to access the artifact store.
Uses the information stored in the ZenML secret configured for the model deployer.
Returns:
The name of the Kubernetes secret that was created or updated, or
None if no secret was configured.
Raises:
RuntimeError: if the secret cannot be created or updated.
"""
# if a Kubernetes secret was explicitly configured in the model
# deployer, use that instead of creating a new one
if self.config.kubernetes_secret_name:
logger.warning(
"Your Seldon Core model deployer is configured to use a "
"pre-existing Kubernetes secret that holds credentials needed "
"to access the artifact store. The authentication method is "
"deprecated and will be removed in a future release. Please "
"remove this attribute by running `zenml model-deployer "
f"remove-attribute {self.name} --kubernetes_secret_name` and "
"configure credentials for the artifact store stack component "
"instead. The Seldon Core model deployer will use those "
"credentials to authenticate to the artifact store "
"automatically."
)
return self.config.kubernetes_secret_name
# if a ZenML secret reference was configured in the model deployer,
# create a Kubernetes secret from that
if self.config.secret:
logger.warning(
"Your Seldon Core model deployer is configured to use a "
"ZenML secret that holds credentials needed to access the "
"artifact store. The recommended authentication method is to "
"configure credentials for the artifact store stack component "
"instead. The Seldon Core model deployer will use those "
"credentials to authenticate to the artifact store "
"automatically."
)
try:
zenml_secret = Client().get_secret_by_name_and_scope(
name=self.config.secret,
)
except KeyError as e:
raise RuntimeError(
f"The ZenML secret '{self.config.secret}' specified in the "
f"Seldon Core Model Deployer configuration was not found "
f"in the secrets store: {e}."
)
self.seldon_client.create_or_update_secret(
self.kubernetes_secret_name, zenml_secret.secret_values
)
else:
# if no ZenML secret was configured, try to convert the credentials
# configured for the artifact store, if any are included, into
# the format expected by Seldon Core
converted_secret = self._convert_artifact_store_secret()
self.seldon_client.create_or_update_secret(
self.kubernetes_secret_name, converted_secret.get_values()
)
return self.kubernetes_secret_name
def _convert_artifact_store_secret(self) -> BaseSecretSchema:
"""Convert the credentials configured for the artifact store into a ZenML secret.
Returns:
The ZenML secret.
Raises:
RuntimeError: if the credentials cannot be converted.
"""
artifact_store = Client().active_stack.artifact_store
zenml_secret: BaseSecretSchema
if artifact_store.flavor == "s3":
from zenml.integrations.s3.artifact_stores import S3ArtifactStore
assert isinstance(artifact_store, S3ArtifactStore)
(
aws_access_key_id,
aws_secret_access_key,
aws_session_token,
) = artifact_store.get_credentials()
if aws_access_key_id and aws_secret_access_key:
# Convert the credentials into the format expected by Seldon
# Core
zenml_secret = SeldonS3SecretSchema(
rclone_config_s3_access_key_id=aws_access_key_id,
rclone_config_s3_secret_access_key=aws_secret_access_key,
rclone_config_s3_session_token=aws_session_token,
)
if (
artifact_store.config.client_kwargs
and "endpoint_url" in artifact_store.config.client_kwargs
):
zenml_secret.rclone_config_s3_endpoint = (
artifact_store.config.client_kwargs["endpoint_url"]
)
# Assume minio is the provider if endpoint is set
zenml_secret.rclone_config_s3_provider = "Minio"
return zenml_secret
logger.warning(
"No credentials are configured for the active S3 artifact "
"store. The Seldon Core model deployer will assume an "
"implicit form of authentication is available in the "
"target Kubernetes cluster, but the served model may not "
"be able to access the model artifacts."
)
# Assume implicit in-cluster IAM authentication
return SeldonS3SecretSchema(rclone_config_s3_env_auth=True)
elif artifact_store.flavor == "gcp":
from zenml.integrations.gcp.artifact_stores import GCPArtifactStore
assert isinstance(artifact_store, GCPArtifactStore)
gcp_credentials = artifact_store.get_credentials()
if gcp_credentials:
# Convert the credentials into the format expected by Seldon
# Core
if isinstance(gcp_credentials, dict):
if gcp_credentials.get("type") == "service_account":
return SeldonGSSecretSchema(
rclone_config_gs_service_account_credentials=json.dumps(
gcp_credentials
),
)
elif gcp_credentials.get("type") == "authorized_user":
return SeldonGSSecretSchema(
rclone_config_gs_client_id=gcp_credentials.get(
"client_id"
),
rclone_config_gs_client_secret=gcp_credentials.get(
"client_secret"
),
rclone_config_gs_token=json.dumps(
dict(
refresh_token=gcp_credentials.get(
"refresh_token"
)
)
),
)
else:
# Connector token-based authentication
return SeldonGSSecretSchema(
rclone_config_gs_token=json.dumps(
dict(
access_token=gcp_credentials.token,
)
),
)
logger.warning(
"No credentials are configured for the active GCS artifact "
"store. The Seldon Core model deployer will assume an "
"implicit form of authentication is available in the "
"target Kubernetes cluster, but the served model may not "
"be able to access the model artifacts."
)
return SeldonGSSecretSchema(rclone_config_gs_anonymous=False)
elif artifact_store.flavor == "azure":
from zenml.integrations.azure.artifact_stores import (
AzureArtifactStore,
)
assert isinstance(artifact_store, AzureArtifactStore)
azure_credentials = artifact_store.get_credentials()
if azure_credentials:
# Convert the credentials into the format expected by Seldon
# Core
if azure_credentials.connection_string is not None:
try:
# We need to extract the account name and key from the
# connection string
tokens = azure_credentials.connection_string.split(";")
token_dict = dict(
[token.split("=", maxsplit=1) for token in tokens]
)
account_name = token_dict["AccountName"]
account_key = token_dict["AccountKey"]
except (KeyError, ValueError) as e:
raise RuntimeError(
"The Azure connection string configured for the "
"artifact store expected format."
) from e
return SeldonAzureSecretSchema(
rclone_config_az_account=account_name,
rclone_config_az_key=account_key,
)
if azure_credentials.sas_token is not None:
return SeldonAzureSecretSchema(
rclone_config_az_sas_url=azure_credentials.sas_token,
)
if (
azure_credentials.account_name is not None
and azure_credentials.account_key is not None
):
return SeldonAzureSecretSchema(
rclone_config_az_account=azure_credentials.account_name,
rclone_config_az_key=azure_credentials.account_key,
)
if (
azure_credentials.client_id is not None
and azure_credentials.client_secret is not None
and azure_credentials.tenant_id is not None
and azure_credentials.account_name is not None
):
return SeldonAzureSecretSchema(
rclone_config_az_client_id=azure_credentials.client_id,
rclone_config_az_client_secret=azure_credentials.client_secret,
rclone_config_az_tenant=azure_credentials.tenant_id,
)
logger.warning(
"No credentials are configured for the active Azure "
"artifact store. The Seldon Core model deployer will "
"assume an implicit form of authentication is available "
"in the target Kubernetes cluster, but the served model "
"may not be able to access the model artifacts."
)
return SeldonAzureSecretSchema(rclone_config_az_env_auth=True)
raise RuntimeError(
"The Seldon Core model deployer doesn't know how to configure "
f"credentials automatically for the `{artifact_store.flavor}` "
"active artifact store flavor. "
"Please use one of the supported artifact stores (S3, GCP or "
"Azure) or specify a ZenML secret in the model deployer "
"configuration that holds the credentials required to access "
"the model artifacts."
)
def _delete_kubernetes_secret(self, secret_name: str) -> None:
"""Delete a Kubernetes secret associated with this model deployer.
Do this if no Seldon Core deployments are using it. The only exception
is if the secret name is the one pre-configured in the model deployer
configuration.
Args:
secret_name: The name of the Kubernetes secret to delete.
"""
if secret_name == self.config.kubernetes_secret_name:
return
# fetch all the Seldon Core deployments that currently
# configured to use this secret
services = self.find_model_server()
for service in services:
config = cast(SeldonDeploymentConfig, service.config)
if config.secret_name == secret_name:
return
self.seldon_client.delete_secret(secret_name)
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new Seldon Core deployment or update an existing one.
# noqa: DAR402
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new Seldon
Core deployment server to reflect the model and other configuration
parameters specified in the supplied Seldon deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing Seldon Core deployment that is *equivalent* to the supplied
configuration parameters. Two or more Seldon Core deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two Seldon Core deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent Seldon Core deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing Seldon Core deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new Seldon Core deployment
server for each new model version. If multiple equivalent Seldon Core
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
id: the UUID of the model server to deploy.
config: the configuration of the model to be deployed with Seldon.
Core
timeout: the timeout in seconds to wait for the Seldon Core server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the Seldon Core
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML Seldon Core deployment service object that can be used to
interact with the remote Seldon Core server.
Raises:
SeldonClientError: if a Seldon Core client error is encountered
while provisioning the Seldon Core deployment server.
RuntimeError: if `timeout` is set to a positive value that is
exceeded while waiting for the Seldon Core deployment server
to start, or if an operational failure is encountered before
it reaches a ready state.
"""
with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(SeldonDeploymentConfig, config)
# if a custom Kubernetes secret is not explicitly specified in the
# SeldonDeploymentConfig, try to create one from the ZenML secret
# configured for the model deployer
config.secret_name = (
config.secret_name
or self._create_or_update_kubernetes_secret()
)
# create a new service
service = SeldonDeploymentService(uuid=id, config=config)
logger.info(f"Creating a new Seldon deployment service: {service}")
# start the service which in turn provisions the Seldon Core
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.is_custom_deployment,
}
return service
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Stop a Seldon Core model server.
Args:
service: The service to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping Seldon Core model servers is not
supported.
"""
raise NotImplementedError(
"Stopping Seldon Core model servers is not implemented. Try "
"deleting the Seldon Core model server instead."
)
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Start a Seldon Core model deployment server.
Args:
service: The service to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting Seldon Core
model servers
"""
raise NotImplementedError(
"Starting Seldon Core model servers is not implemented"
)
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a Seldon Core model deployment server.
Args:
service: The service to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
service = cast(SeldonDeploymentService, service)
service.stop(timeout=timeout, force=force)
if service.config.secret_name:
# delete the Kubernetes secret used to store the authentication
# information for the Seldon Core model server storage initializer
# if no other Seldon Core model servers are using it
self._delete_kubernetes_secret(service.config.secret_name)
config: SeldonModelDeployerConfig
property
readonly
Returns the SeldonModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
SeldonModelDeployerConfig |
The configuration. |
kubernetes_secret_name: str
property
readonly
Get the Kubernetes secret name associated with this model deployer.
If a pre-existing Kubernetes secret is configured for this model deployer, that name is returned to be used by all Seldon Core deployments associated with this model deployer.
Otherwise, a Kubernetes secret name is generated based on the ID of the active artifact store. The reason for this is that the same model deployer may be used to deploy models in combination with different artifact stores at the same time, and each artifact store may require different credentials to be accessed.
Returns:
Type | Description |
---|---|
str |
The name of a Kubernetes secret to be used with Seldon Core deployments. |
seldon_client: SeldonClient
property
readonly
Get the Seldon Core client associated with this model deployer.
Returns:
Type | Description |
---|---|
SeldonClient |
The Seldon Core client. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the Kubernetes namespace is not configured when using a service connector to deploy models with Seldon Core. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Ensures there is a container registry and image builder in the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
FLAVOR (BaseModelDeployerFlavor)
Seldon Core model deployer flavor.
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
class SeldonModelDeployerFlavor(BaseModelDeployerFlavor):
"""Seldon Core model deployer flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SELDON_MODEL_DEPLOYER_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
resource_type=KUBERNETES_CLUSTER_RESOURCE_TYPE,
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/seldon.png"
@property
def config_class(self) -> Type[SeldonModelDeployerConfig]:
"""Returns `SeldonModelDeployerConfig` config class.
Returns:
The config class.
"""
return SeldonModelDeployerConfig
@property
def implementation_class(self) -> Type["SeldonModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.seldon.model_deployers import (
SeldonModelDeployer,
)
return SeldonModelDeployer
config_class: Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig]
property
readonly
Returns SeldonModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[SeldonModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SeldonModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
get_docker_builds(self, deployment)
Gets the Docker builds required for the component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment for which to get the builds. |
required |
Returns:
Type | Description |
---|---|
List[BuildConfiguration] |
The required Docker builds. |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def get_docker_builds(
self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
build = BuildConfiguration(
key=SELDON_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
get_model_server_info(service_instance)
staticmethod
Return implementation specific information that might be relevant to the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
SeldonDeploymentService |
Instance of a SeldonDeploymentService |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
Model server information. |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information that might be relevant to the user.
Args:
service_instance: Instance of a SeldonDeploymentService
Returns:
Model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
}
perform_delete_model(self, service, timeout=300, force=False)
Delete a Seldon Core model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to delete. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a Seldon Core model deployment server.
Args:
service: The service to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
service = cast(SeldonDeploymentService, service)
service.stop(timeout=timeout, force=force)
if service.config.secret_name:
# delete the Kubernetes secret used to store the authentication
# information for the Seldon Core model server storage initializer
# if no other Seldon Core model servers are using it
self._delete_kubernetes_secret(service.config.secret_name)
perform_deploy_model(self, id, config, timeout=300)
Create a new Seldon Core deployment or update an existing one.
noqa: DAR402
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new Seldon Core deployment server to reflect the model and other configuration parameters specified in the supplied Seldon deploymentconfig
. -
if
replace
is True, this method will first attempt to find an existing Seldon Core deployment that is equivalent to the supplied configuration parameters. Two or more Seldon Core deployments are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two Seldon Core deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent Seldon Core deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing Seldon Core deployment to retain its prediction URL while performing a rolling update to serve a new model version.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new Seldon Core deployment
server for each new model version. If multiple equivalent Seldon Core
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
the UUID of the model server to deploy. |
required |
config |
ServiceConfig |
the configuration of the model to be deployed with Seldon. Core |
required |
timeout |
int |
the timeout in seconds to wait for the Seldon Core server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Seldon Core server is provisioned, without waiting for it to fully start. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML Seldon Core deployment service object that can be used to interact with the remote Seldon Core server. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if a Seldon Core client error is encountered while provisioning the Seldon Core deployment server. |
RuntimeError |
if |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new Seldon Core deployment or update an existing one.
# noqa: DAR402
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new Seldon
Core deployment server to reflect the model and other configuration
parameters specified in the supplied Seldon deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing Seldon Core deployment that is *equivalent* to the supplied
configuration parameters. Two or more Seldon Core deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two Seldon Core deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent Seldon Core deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing Seldon Core deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new Seldon Core deployment
server for each new model version. If multiple equivalent Seldon Core
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
id: the UUID of the model server to deploy.
config: the configuration of the model to be deployed with Seldon.
Core
timeout: the timeout in seconds to wait for the Seldon Core server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the Seldon Core
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML Seldon Core deployment service object that can be used to
interact with the remote Seldon Core server.
Raises:
SeldonClientError: if a Seldon Core client error is encountered
while provisioning the Seldon Core deployment server.
RuntimeError: if `timeout` is set to a positive value that is
exceeded while waiting for the Seldon Core deployment server
to start, or if an operational failure is encountered before
it reaches a ready state.
"""
with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(SeldonDeploymentConfig, config)
# if a custom Kubernetes secret is not explicitly specified in the
# SeldonDeploymentConfig, try to create one from the ZenML secret
# configured for the model deployer
config.secret_name = (
config.secret_name
or self._create_or_update_kubernetes_secret()
)
# create a new service
service = SeldonDeploymentService(uuid=id, config=config)
logger.info(f"Creating a new Seldon deployment service: {service}")
# start the service which in turn provisions the Seldon Core
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.is_custom_deployment,
}
return service
perform_start_model(self, service, timeout=300)
Start a Seldon Core model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
since we don't support starting Seldon Core model servers |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Start a Seldon Core model deployment server.
Args:
service: The service to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting Seldon Core
model servers
"""
raise NotImplementedError(
"Starting Seldon Core model servers is not implemented"
)
perform_stop_model(self, service, timeout=300, force=False)
Stop a Seldon Core model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
stopping Seldon Core model servers is not supported. |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Stop a Seldon Core model server.
Args:
service: The service to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping Seldon Core model servers is not
supported.
"""
raise NotImplementedError(
"Stopping Seldon Core model servers is not implemented. Try "
"deleting the Seldon Core model server instead."
)
secret_schemas
special
Initialization for the Seldon secret schemas.
These are secret schemas that can be used to authenticate Seldon to the Artifact Store used to store served ML models.
secret_schemas
Implementation for Seldon secret schemas.
SeldonAzureSecretSchema (BaseSecretSchema)
Seldon Azure Blob Storage credentials.
Based on: https://rclone.org/azureblob/
Attributes:
Name | Type | Description |
---|---|---|
rclone_config_az_type |
Literal['azureblob'] |
the rclone config type. Must be set to "azureblob" for this schema. |
rclone_config_az_env_auth |
bool |
read credentials from runtime (environment variables or MSI). |
rclone_config_az_account |
Optional[str] |
storage Account Name. Leave blank to use SAS URL or MSI. |
rclone_config_az_key |
Optional[str] |
storage Account Key. Leave blank to use SAS URL or MSI. |
rclone_config_az_sas_url |
Optional[str] |
SAS URL for container level access only. Leave blank if using account/key or MSI. |
rclone_config_az_use_msi |
bool |
use a managed service identity to authenticate (only works in Azure). |
rclone_config_az_client_secret |
Optional[str] |
client secret for service principal authentication. |
rclone_config_az_client_id |
Optional[str] |
client id for service principal authentication. |
rclone_config_az_tenant |
Optional[str] |
tenant id for service principal authentication. |
Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonAzureSecretSchema(BaseSecretSchema):
"""Seldon Azure Blob Storage credentials.
Based on: https://rclone.org/azureblob/
Attributes:
rclone_config_az_type: the rclone config type. Must be set to
"azureblob" for this schema.
rclone_config_az_env_auth: read credentials from runtime
(environment variables or MSI).
rclone_config_az_account: storage Account Name. Leave blank to
use SAS URL or MSI.
rclone_config_az_key: storage Account Key. Leave blank to
use SAS URL or MSI.
rclone_config_az_sas_url: SAS URL for container level access
only. Leave blank if using account/key or MSI.
rclone_config_az_use_msi: use a managed service identity to
authenticate (only works in Azure).
rclone_config_az_client_secret: client secret for service
principal authentication.
rclone_config_az_client_id: client id for service principal
authentication.
rclone_config_az_tenant: tenant id for service principal
authentication.
"""
rclone_config_az_type: Literal["azureblob"] = "azureblob"
rclone_config_az_env_auth: bool = False
rclone_config_az_account: Optional[str] = None
rclone_config_az_key: Optional[str] = None
rclone_config_az_sas_url: Optional[str] = None
rclone_config_az_use_msi: bool = False
rclone_config_az_client_secret: Optional[str] = None
rclone_config_az_client_id: Optional[str] = None
rclone_config_az_tenant: Optional[str] = None
SeldonGSSecretSchema (BaseSecretSchema)
Seldon GCS credentials.
Based on: https://rclone.org/googlecloudstorage/
Attributes:
Name | Type | Description |
---|---|---|
rclone_config_gs_type |
Literal['google cloud storage'] |
the rclone config type. Must be set to "google cloud storage" for this schema. |
rclone_config_gs_client_id |
Optional[str] |
OAuth client id. |
rclone_config_gs_client_secret |
Optional[str] |
OAuth client secret. |
rclone_config_gs_token |
Optional[str] |
OAuth Access Token as a JSON blob. |
rclone_config_gs_project_number |
Optional[str] |
project number. |
rclone_config_gs_service_account_credentials |
Optional[str] |
service account credentials JSON blob. |
rclone_config_gs_anonymous |
bool |
access public buckets and objects without credentials. Set to True if you just want to download files and don't configure credentials. |
rclone_config_gs_auth_url |
Optional[str] |
auth server URL. |
Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonGSSecretSchema(BaseSecretSchema):
"""Seldon GCS credentials.
Based on: https://rclone.org/googlecloudstorage/
Attributes:
rclone_config_gs_type: the rclone config type. Must be set to "google
cloud storage" for this schema.
rclone_config_gs_client_id: OAuth client id.
rclone_config_gs_client_secret: OAuth client secret.
rclone_config_gs_token: OAuth Access Token as a JSON blob.
rclone_config_gs_project_number: project number.
rclone_config_gs_service_account_credentials: service account
credentials JSON blob.
rclone_config_gs_anonymous: access public buckets and objects without
credentials. Set to True if you just want to download files and
don't configure credentials.
rclone_config_gs_auth_url: auth server URL.
"""
rclone_config_gs_type: Literal["google cloud storage"] = (
"google cloud storage"
)
rclone_config_gs_client_id: Optional[str] = None
rclone_config_gs_client_secret: Optional[str] = None
rclone_config_gs_project_number: Optional[str] = None
rclone_config_gs_service_account_credentials: Optional[str] = None
rclone_config_gs_anonymous: bool = False
rclone_config_gs_token: Optional[str] = None
rclone_config_gs_auth_url: Optional[str] = None
rclone_config_gs_token_url: Optional[str] = None
SeldonS3SecretSchema (BaseSecretSchema)
Seldon S3 credentials.
Based on: https://rclone.org/s3/#amazon-s3
Attributes:
Name | Type | Description |
---|---|---|
rclone_config_s3_type |
Literal['s3'] |
the rclone config type. Must be set to "s3" for this schema. |
rclone_config_s3_provider |
str |
the S3 provider (e.g. aws, ceph, minio). |
rclone_config_s3_env_auth |
bool |
get AWS credentials from EC2/ECS meta data (i.e. with IAM roles configuration). Only applies if access_key_id and secret_access_key are blank. |
rclone_config_s3_access_key_id |
Optional[str] |
AWS Access Key ID. |
rclone_config_s3_secret_access_key |
Optional[str] |
AWS Secret Access Key. |
rclone_config_s3_session_token |
Optional[str] |
AWS Session Token. |
rclone_config_s3_region |
Optional[str] |
region to connect to. |
rclone_config_s3_endpoint |
Optional[str] |
S3 API endpoint. |
Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonS3SecretSchema(BaseSecretSchema):
"""Seldon S3 credentials.
Based on: https://rclone.org/s3/#amazon-s3
Attributes:
rclone_config_s3_type: the rclone config type. Must be set to "s3" for
this schema.
rclone_config_s3_provider: the S3 provider (e.g. aws, ceph, minio).
rclone_config_s3_env_auth: get AWS credentials from EC2/ECS meta data
(i.e. with IAM roles configuration). Only applies if access_key_id
and secret_access_key are blank.
rclone_config_s3_access_key_id: AWS Access Key ID.
rclone_config_s3_secret_access_key: AWS Secret Access Key.
rclone_config_s3_session_token: AWS Session Token.
rclone_config_s3_region: region to connect to.
rclone_config_s3_endpoint: S3 API endpoint.
"""
rclone_config_s3_type: Literal["s3"] = "s3"
rclone_config_s3_provider: str = "aws"
rclone_config_s3_env_auth: bool = False
rclone_config_s3_access_key_id: Optional[str] = None
rclone_config_s3_secret_access_key: Optional[str] = None
rclone_config_s3_session_token: Optional[str] = None
rclone_config_s3_region: Optional[str] = None
rclone_config_s3_endpoint: Optional[str] = None
seldon_client
Implementation of the Seldon client for ZenML.
SeldonClient
A client for interacting with Seldon Deployments.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClient:
"""A client for interacting with Seldon Deployments."""
def __init__(
self,
context: Optional[str],
namespace: Optional[str],
kube_client: Optional[k8s_client.ApiClient] = None,
):
"""Initialize a Seldon Core client.
Args:
context: the Kubernetes context to use.
namespace: the Kubernetes namespace to use.
kube_client: a Kubernetes client to use.
"""
self._namespace = namespace
self._initialize_k8s_clients(context=context, kube_client=kube_client)
def _initialize_k8s_clients(
self,
context: Optional[str],
kube_client: Optional[k8s_client.ApiClient] = None,
) -> None:
"""Initialize the Kubernetes clients.
Args:
context: a Kubernetes configuratino context to use.
kube_client: a Kubernetes client to use.
Raises:
SeldonClientError: if Kubernetes configuration could not be loaded
"""
if kube_client:
# Initialize the Seldon client using the provided Kubernetes
# client, if supplied.
self._core_api = k8s_client.CoreV1Api(kube_client)
self._custom_objects_api = k8s_client.CustomObjectsApi(kube_client)
return
try:
k8s_config.load_incluster_config()
if not self._namespace:
# load the namespace in the context of which the
# current pod is running
self._namespace = open(
"/var/run/secrets/kubernetes.io/serviceaccount/namespace"
).read()
except k8s_config.config_exception.ConfigException:
if not self._namespace:
raise SeldonClientError(
"The Kubernetes namespace must be explicitly "
"configured when running outside of a cluster."
)
try:
k8s_config.load_kube_config(
context=context, persist_config=False
)
except k8s_config.config_exception.ConfigException as e:
raise SeldonClientError(
"Could not load the Kubernetes configuration"
) from e
self._core_api = k8s_client.CoreV1Api()
self._custom_objects_api = k8s_client.CustomObjectsApi()
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: the labels to sanitize.
"""
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
@property
def namespace(self) -> str:
"""Returns the Kubernetes namespace in use by the client.
Returns:
The Kubernetes namespace in use by the client.
Raises:
RuntimeError: if the namespace has not been configured.
"""
if not self._namespace:
# shouldn't happen if the client is initialized, but we need to
# appease the mypy type checker
raise RuntimeError("The Kubernetes namespace is not configured")
return self._namespace
def create_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Create a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to create
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the created Seldon Core deployment resource with updated status.
Raises:
SeldonDeploymentExistsError: if a deployment with the same name
already exists.
SeldonClientError: if an unknown error occurs during the creation of
the deployment.
"""
try:
logger.debug(f"Creating SeldonDeployment resource: {deployment}")
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
body_deploy = deployment.model_dump(exclude_none=True)
response = (
self._custom_objects_api.create_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
body=body_deploy,
_request_timeout=poll_timeout or None,
)
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when creating SeldonDeployment resource: %s", str(e)
)
if e.status == 409:
raise SeldonDeploymentExistsError(
f"A deployment with the name {deployment.name} "
f"already exists in namespace {self._namespace}"
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
created_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and created_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
created_deployment = self.get_deployment(name=deployment.name)
return created_deployment
def delete_deployment(
self,
name: str,
force: bool = False,
poll_timeout: int = 0,
) -> None:
"""Delete a Seldon Core deployment resource managed by ZenML.
Args:
name: the name of the Seldon Core deployment resource to delete.
force: if True, the deployment deletion will be forced (the graceful
period will be set to zero).
poll_timeout: the maximum time to wait for the deployment to be
deleted. If set to 0, the function will return immediately
without checking the deployment status. If a timeout
occurs and the deployment still exists, this method will
return and no exception will be raised.
Raises:
SeldonClientError: if an unknown error occurs during the deployment
removal.
"""
try:
logger.debug(f"Deleting SeldonDeployment resource: {name}")
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=name)
response = (
self._custom_objects_api.delete_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
_request_timeout=poll_timeout or None,
grace_period_seconds=0 if force else None,
)
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when deleting SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting SeldonDeployment resource {name}"
) from e
while poll_timeout > 0:
try:
self.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return
time.sleep(5)
poll_timeout -= 5
def update_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Update a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to update
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the updated Seldon Core deployment resource with updated status.
Raises:
SeldonClientError: if an unknown error occurs while updating the
deployment.
"""
try:
logger.debug(
f"Updating SeldonDeployment resource: {deployment.name}"
)
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=deployment.name)
response = self._custom_objects_api.patch_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=deployment.name,
body=deployment.model_dump(exclude_none=True),
_request_timeout=poll_timeout or None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when updating SeldonDeployment resource: %s", str(e)
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
updated_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and updated_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
updated_deployment = self.get_deployment(name=deployment.name)
return updated_deployment
def get_deployment(self, name: str) -> SeldonDeployment:
"""Get a ZenML managed Seldon Core deployment resource by name.
Args:
name: the name of the Seldon Core deployment resource to fetch.
Returns:
The Seldon Core deployment resource.
Raises:
SeldonDeploymentNotFoundError: if the deployment resource cannot
be found or is not managed by ZenML.
SeldonClientError: if an unknown error occurs while fetching
the deployment.
"""
try:
logger.debug(f"Retrieving SeldonDeployment resource: {name}")
response = self._custom_objects_api.get_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
)
logger.debug("Seldon Core API response: %s", response)
try:
deployment = SeldonDeployment(**response)
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(response),
)
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource {name} could not be parsed"
)
# Only Seldon deployments managed by ZenML are returned
if not deployment.is_managed_by_zenml():
raise SeldonDeploymentNotFoundError(
f"Seldon Deployment {name} is not managed by ZenML"
)
return deployment
except k8s_client.rest.ApiException as e:
if e.status == 404:
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource not found: {name}"
) from e
logger.error(
"Exception when fetching SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching SeldonDeployment "
f"resource: {name}"
) from e
def find_deployments(
self,
name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
fields: Optional[Dict[str, str]] = None,
) -> List[SeldonDeployment]:
"""Find all ZenML-managed Seldon Core deployment resources matching the given criteria.
Args:
name: optional name of the deployment resource to find.
fields: optional selector to restrict the list of returned
Seldon deployments by their fields. Defaults to everything.
labels: optional selector to restrict the list of returned
Seldon deployments by their labels. Defaults to everything.
Returns:
List of Seldon Core deployments that match the given criteria.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the deployments.
"""
fields = fields or {}
labels = labels or {}
# always filter results to only include Seldon deployments managed
# by ZenML
labels["app"] = "zenml"
if name:
fields = {"metadata.name": name}
field_selector = (
",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
)
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
try:
logger.debug(
f"Searching SeldonDeployment resources with label selector "
f"'{labels or ''}' and field selector '{fields or ''}'"
)
response = self._custom_objects_api.list_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
field_selector=field_selector,
label_selector=label_selector,
)
logger.debug(
"Seldon Core API returned %s items", len(response["items"])
)
deployments = []
for item in response.get("items") or []:
try:
deployments.append(SeldonDeployment(**item))
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(item),
)
return deployments
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when searching SeldonDeployment resources with "
"label selector '%s' and field selector '%s': %s",
label_selector or "",
field_selector or "",
)
raise SeldonClientError(
f"Unexpected exception when searching SeldonDeployment "
f"with labels '{labels or ''}' and field '{fields or ''}'"
) from e
def get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core deployment resource.
Args:
name: the name of the Seldon Core deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Yields:
The next log line.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the logs.
"""
logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
try:
response = self._core_api.list_namespaced_pod(
namespace=self._namespace,
label_selector=f"seldon-deployment-id={name}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise SeldonClientError(
f"The Seldon Core deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name
containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}
container = "default"
if container not in containers:
container = containers[0]
# some containers might not be running yet and have no logs to show,
# so we need to filter them out
if not container_statuses[container]:
container = init_containers[0]
logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{self._namespace}`"
)
response = self._core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self._namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for SeldonDeployment resource "
"%s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching logs for SeldonDeployment "
f"resource: {name}"
) from e
try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()
def create_or_update_secret(
self,
name: str,
secret_values: Dict[str, Any],
) -> None:
"""Create or update a Kubernetes Secret resource.
Args:
name: the name of the Secret resource to create.
secret_values: secret key-values that should be
stored in the Secret resource.
Raises:
SeldonClientError: if an unknown error occurs during the creation of
the secret.
k8s_client.rest.ApiException: unexpected error.
"""
try:
logger.debug(f"Creating Secret resource: {name}")
secret_data = {
k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
"ascii"
)
for k, v in secret_values.items()
if v is not None
}
secret = k8s_client.V1Secret(
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={"app": "zenml"},
),
type="Opaque",
data=secret_data,
)
try:
# check if the secret is already present
self._core_api.read_namespaced_secret(
name=name,
namespace=self._namespace,
)
# if we got this far, the secret is already present, update it
# in place
response = self._core_api.replace_namespaced_secret(
name=name,
namespace=self._namespace,
body=secret,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise
response = self._core_api.create_namespaced_secret(
namespace=self._namespace,
body=secret,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error("Exception when creating Secret resource: %s", str(e))
raise SeldonClientError(
"Exception when creating Secret resource"
) from e
def delete_secret(
self,
name: str,
) -> None:
"""Delete a Kubernetes Secret resource managed by ZenML.
Args:
name: the name of the Kubernetes Secret resource to delete.
Raises:
SeldonClientError: if an unknown error occurs during the removal
of the secret.
"""
try:
logger.debug(f"Deleting Secret resource: {name}")
response = self._core_api.delete_namespaced_secret(
name=name,
namespace=self._namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the secret is no longer present, nothing to do
return
logger.error(
"Exception when deleting Secret resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting Secret resource {name}"
) from e
namespace: str
property
readonly
Returns the Kubernetes namespace in use by the client.
Returns:
Type | Description |
---|---|
str |
The Kubernetes namespace in use by the client. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the namespace has not been configured. |
__init__(self, context, namespace, kube_client=None)
special
Initialize a Seldon Core client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
Optional[str] |
the Kubernetes context to use. |
required |
namespace |
Optional[str] |
the Kubernetes namespace to use. |
required |
kube_client |
Optional[kubernetes.client.ApiClient] |
a Kubernetes client to use. |
None |
Source code in zenml/integrations/seldon/seldon_client.py
def __init__(
self,
context: Optional[str],
namespace: Optional[str],
kube_client: Optional[k8s_client.ApiClient] = None,
):
"""Initialize a Seldon Core client.
Args:
context: the Kubernetes context to use.
namespace: the Kubernetes namespace to use.
kube_client: a Kubernetes client to use.
"""
self._namespace = namespace
self._initialize_k8s_clients(context=context, kube_client=kube_client)
create_deployment(self, deployment, poll_timeout=0)
Create a Seldon Core deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource to create |
required |
poll_timeout |
int |
the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised. |
0 |
Returns:
Type | Description |
---|---|
SeldonDeployment |
the created Seldon Core deployment resource with updated status. |
Exceptions:
Type | Description |
---|---|
SeldonDeploymentExistsError |
if a deployment with the same name already exists. |
SeldonClientError |
if an unknown error occurs during the creation of the deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def create_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Create a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to create
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the created Seldon Core deployment resource with updated status.
Raises:
SeldonDeploymentExistsError: if a deployment with the same name
already exists.
SeldonClientError: if an unknown error occurs during the creation of
the deployment.
"""
try:
logger.debug(f"Creating SeldonDeployment resource: {deployment}")
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
body_deploy = deployment.model_dump(exclude_none=True)
response = (
self._custom_objects_api.create_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
body=body_deploy,
_request_timeout=poll_timeout or None,
)
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when creating SeldonDeployment resource: %s", str(e)
)
if e.status == 409:
raise SeldonDeploymentExistsError(
f"A deployment with the name {deployment.name} "
f"already exists in namespace {self._namespace}"
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
created_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and created_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
created_deployment = self.get_deployment(name=deployment.name)
return created_deployment
create_or_update_secret(self, name, secret_values)
Create or update a Kubernetes Secret resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Secret resource to create. |
required |
secret_values |
Dict[str, Any] |
secret key-values that should be stored in the Secret resource. |
required |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs during the creation of the secret. |
k8s_client.rest.ApiException |
unexpected error. |
Source code in zenml/integrations/seldon/seldon_client.py
def create_or_update_secret(
self,
name: str,
secret_values: Dict[str, Any],
) -> None:
"""Create or update a Kubernetes Secret resource.
Args:
name: the name of the Secret resource to create.
secret_values: secret key-values that should be
stored in the Secret resource.
Raises:
SeldonClientError: if an unknown error occurs during the creation of
the secret.
k8s_client.rest.ApiException: unexpected error.
"""
try:
logger.debug(f"Creating Secret resource: {name}")
secret_data = {
k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
"ascii"
)
for k, v in secret_values.items()
if v is not None
}
secret = k8s_client.V1Secret(
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={"app": "zenml"},
),
type="Opaque",
data=secret_data,
)
try:
# check if the secret is already present
self._core_api.read_namespaced_secret(
name=name,
namespace=self._namespace,
)
# if we got this far, the secret is already present, update it
# in place
response = self._core_api.replace_namespaced_secret(
name=name,
namespace=self._namespace,
body=secret,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise
response = self._core_api.create_namespaced_secret(
namespace=self._namespace,
body=secret,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error("Exception when creating Secret resource: %s", str(e))
raise SeldonClientError(
"Exception when creating Secret resource"
) from e
delete_deployment(self, name, force=False, poll_timeout=0)
Delete a Seldon Core deployment resource managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Seldon Core deployment resource to delete. |
required |
force |
bool |
if True, the deployment deletion will be forced (the graceful period will be set to zero). |
False |
poll_timeout |
int |
the maximum time to wait for the deployment to be deleted. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment still exists, this method will return and no exception will be raised. |
0 |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs during the deployment removal. |
Source code in zenml/integrations/seldon/seldon_client.py
def delete_deployment(
self,
name: str,
force: bool = False,
poll_timeout: int = 0,
) -> None:
"""Delete a Seldon Core deployment resource managed by ZenML.
Args:
name: the name of the Seldon Core deployment resource to delete.
force: if True, the deployment deletion will be forced (the graceful
period will be set to zero).
poll_timeout: the maximum time to wait for the deployment to be
deleted. If set to 0, the function will return immediately
without checking the deployment status. If a timeout
occurs and the deployment still exists, this method will
return and no exception will be raised.
Raises:
SeldonClientError: if an unknown error occurs during the deployment
removal.
"""
try:
logger.debug(f"Deleting SeldonDeployment resource: {name}")
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=name)
response = (
self._custom_objects_api.delete_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
_request_timeout=poll_timeout or None,
grace_period_seconds=0 if force else None,
)
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when deleting SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting SeldonDeployment resource {name}"
) from e
while poll_timeout > 0:
try:
self.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return
time.sleep(5)
poll_timeout -= 5
delete_secret(self, name)
Delete a Kubernetes Secret resource managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Kubernetes Secret resource to delete. |
required |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs during the removal of the secret. |
Source code in zenml/integrations/seldon/seldon_client.py
def delete_secret(
self,
name: str,
) -> None:
"""Delete a Kubernetes Secret resource managed by ZenML.
Args:
name: the name of the Kubernetes Secret resource to delete.
Raises:
SeldonClientError: if an unknown error occurs during the removal
of the secret.
"""
try:
logger.debug(f"Deleting Secret resource: {name}")
response = self._core_api.delete_namespaced_secret(
name=name,
namespace=self._namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the secret is no longer present, nothing to do
return
logger.error(
"Exception when deleting Secret resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting Secret resource {name}"
) from e
find_deployments(self, name=None, labels=None, fields=None)
Find all ZenML-managed Seldon Core deployment resources matching the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Optional[str] |
optional name of the deployment resource to find. |
None |
fields |
Optional[Dict[str, str]] |
optional selector to restrict the list of returned Seldon deployments by their fields. Defaults to everything. |
None |
labels |
Optional[Dict[str, str]] |
optional selector to restrict the list of returned Seldon deployments by their labels. Defaults to everything. |
None |
Returns:
Type | Description |
---|---|
List[zenml.integrations.seldon.seldon_client.SeldonDeployment] |
List of Seldon Core deployments that match the given criteria. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs while fetching the deployments. |
Source code in zenml/integrations/seldon/seldon_client.py
def find_deployments(
self,
name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
fields: Optional[Dict[str, str]] = None,
) -> List[SeldonDeployment]:
"""Find all ZenML-managed Seldon Core deployment resources matching the given criteria.
Args:
name: optional name of the deployment resource to find.
fields: optional selector to restrict the list of returned
Seldon deployments by their fields. Defaults to everything.
labels: optional selector to restrict the list of returned
Seldon deployments by their labels. Defaults to everything.
Returns:
List of Seldon Core deployments that match the given criteria.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the deployments.
"""
fields = fields or {}
labels = labels or {}
# always filter results to only include Seldon deployments managed
# by ZenML
labels["app"] = "zenml"
if name:
fields = {"metadata.name": name}
field_selector = (
",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
)
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
try:
logger.debug(
f"Searching SeldonDeployment resources with label selector "
f"'{labels or ''}' and field selector '{fields or ''}'"
)
response = self._custom_objects_api.list_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
field_selector=field_selector,
label_selector=label_selector,
)
logger.debug(
"Seldon Core API returned %s items", len(response["items"])
)
deployments = []
for item in response.get("items") or []:
try:
deployments.append(SeldonDeployment(**item))
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(item),
)
return deployments
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when searching SeldonDeployment resources with "
"label selector '%s' and field selector '%s': %s",
label_selector or "",
field_selector or "",
)
raise SeldonClientError(
f"Unexpected exception when searching SeldonDeployment "
f"with labels '{labels or ''}' and field '{fields or ''}'"
) from e
get_deployment(self, name)
Get a ZenML managed Seldon Core deployment resource by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Seldon Core deployment resource to fetch. |
required |
Returns:
Type | Description |
---|---|
SeldonDeployment |
The Seldon Core deployment resource. |
Exceptions:
Type | Description |
---|---|
SeldonDeploymentNotFoundError |
if the deployment resource cannot be found or is not managed by ZenML. |
SeldonClientError |
if an unknown error occurs while fetching the deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_deployment(self, name: str) -> SeldonDeployment:
"""Get a ZenML managed Seldon Core deployment resource by name.
Args:
name: the name of the Seldon Core deployment resource to fetch.
Returns:
The Seldon Core deployment resource.
Raises:
SeldonDeploymentNotFoundError: if the deployment resource cannot
be found or is not managed by ZenML.
SeldonClientError: if an unknown error occurs while fetching
the deployment.
"""
try:
logger.debug(f"Retrieving SeldonDeployment resource: {name}")
response = self._custom_objects_api.get_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
)
logger.debug("Seldon Core API response: %s", response)
try:
deployment = SeldonDeployment(**response)
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(response),
)
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource {name} could not be parsed"
)
# Only Seldon deployments managed by ZenML are returned
if not deployment.is_managed_by_zenml():
raise SeldonDeploymentNotFoundError(
f"Seldon Deployment {name} is not managed by ZenML"
)
return deployment
except k8s_client.rest.ApiException as e:
if e.status == 404:
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource not found: {name}"
) from e
logger.error(
"Exception when fetching SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching SeldonDeployment "
f"resource: {name}"
) from e
get_deployment_logs(self, name, follow=False, tail=None)
Get the logs of a Seldon Core deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Seldon Core deployment to get logs for. |
required |
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Yields:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
The next log line. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs while fetching the logs. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core deployment resource.
Args:
name: the name of the Seldon Core deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Yields:
The next log line.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the logs.
"""
logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
try:
response = self._core_api.list_namespaced_pod(
namespace=self._namespace,
label_selector=f"seldon-deployment-id={name}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise SeldonClientError(
f"The Seldon Core deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name
containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}
container = "default"
if container not in containers:
container = containers[0]
# some containers might not be running yet and have no logs to show,
# so we need to filter them out
if not container_statuses[container]:
container = init_containers[0]
logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{self._namespace}`"
)
response = self._core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self._namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for SeldonDeployment resource "
"%s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching logs for SeldonDeployment "
f"resource: {name}"
) from e
try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()
sanitize_labels(labels)
staticmethod
Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
Dict[str, str] |
the labels to sanitize. |
required |
Source code in zenml/integrations/seldon/seldon_client.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: the labels to sanitize.
"""
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
update_deployment(self, deployment, poll_timeout=0)
Update a Seldon Core deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource to update |
required |
poll_timeout |
int |
the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised. |
0 |
Returns:
Type | Description |
---|---|
SeldonDeployment |
the updated Seldon Core deployment resource with updated status. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs while updating the deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def update_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Update a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to update
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the updated Seldon Core deployment resource with updated status.
Raises:
SeldonClientError: if an unknown error occurs while updating the
deployment.
"""
try:
logger.debug(
f"Updating SeldonDeployment resource: {deployment.name}"
)
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=deployment.name)
response = self._custom_objects_api.patch_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=deployment.name,
body=deployment.model_dump(exclude_none=True),
_request_timeout=poll_timeout or None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when updating SeldonDeployment resource: %s", str(e)
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
updated_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and updated_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
updated_deployment = self.get_deployment(name=deployment.name)
return updated_deployment
SeldonClientError (Exception)
Base exception class for all exceptions raised by the SeldonClient.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClientError(Exception):
"""Base exception class for all exceptions raised by the SeldonClient."""
SeldonClientTimeout (SeldonClientError)
Raised when the Seldon client timed out while waiting for a resource to reach the expected status.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClientTimeout(SeldonClientError):
"""Raised when the Seldon client timed out while waiting for a resource to reach the expected status."""
SeldonDeployment (BaseModel)
A Seldon Core deployment CRD.
This is a Pydantic representation of some of the fields in the Seldon Core CRD (documented here: https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).
Note that not all fields are represented, only those that are relevant to the ZenML integration. The fields that are not represented are silently ignored when the Seldon Deployment is created or updated from an external SeldonDeployment CRD representation.
Attributes:
Name | Type | Description |
---|---|---|
kind |
Literal['SeldonDeployment'] |
Kubernetes kind field. |
apiVersion |
Literal['machinelearning.seldon.io/v1'] |
Kubernetes apiVersion field. |
metadata |
SeldonDeploymentMetadata |
Kubernetes metadata field. |
spec |
SeldonDeploymentSpec |
Seldon Deployment spec entry. |
status |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatus] |
Seldon Deployment status. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeployment(BaseModel):
"""A Seldon Core deployment CRD.
This is a Pydantic representation of some of the fields in the Seldon Core
CRD (documented here:
https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).
Note that not all fields are represented, only those that are relevant to
the ZenML integration. The fields that are not represented are silently
ignored when the Seldon Deployment is created or updated from an external
SeldonDeployment CRD representation.
Attributes:
kind: Kubernetes kind field.
apiVersion: Kubernetes apiVersion field.
metadata: Kubernetes metadata field.
spec: Seldon Deployment spec entry.
status: Seldon Deployment status.
"""
kind: Literal["SeldonDeployment"] = "SeldonDeployment"
apiVersion: Literal["machinelearning.seldon.io/v1"] = (
"machinelearning.seldon.io/v1"
)
metadata: SeldonDeploymentMetadata
spec: SeldonDeploymentSpec
status: Optional[SeldonDeploymentStatus] = None
def __str__(self) -> str:
"""Returns a string representation of the Seldon Deployment.
Returns:
A string representation of the Seldon Deployment.
"""
return json.dumps(self.model_dump(exclude_none=True), indent=4)
@classmethod
def build(
cls,
name: Optional[str] = None,
model_uri: Optional[str] = None,
model_name: Optional[str] = None,
implementation: Optional[str] = None,
parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None,
engineResources: Optional[SeldonResourceRequirements] = None,
secret_name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
annotations: Optional[Dict[str, str]] = None,
is_custom_deployment: Optional[bool] = False,
spec: Optional[Dict[Any, Any]] = None,
serviceAccountName: Optional[str] = None,
) -> "SeldonDeployment":
"""Build a basic Seldon Deployment object.
Args:
name: The name of the Seldon Deployment. If not explicitly passed,
a unique name is autogenerated.
model_uri: The URI of the model.
model_name: The name of the model.
implementation: The implementation of the model.
parameters: The predictor graph parameters.
engineResources: The resources to be allocated to the model.
secret_name: The name of the Kubernetes secret containing
environment variable values (e.g. with credentials for the
artifact store) to use with the deployment service.
labels: A dictionary of labels to apply to the Seldon Deployment.
annotations: A dictionary of annotations to apply to the Seldon
Deployment.
spec: A Kubernetes pod spec to use for the Seldon Deployment.
is_custom_deployment: Whether the Seldon Deployment is a custom
or a built-in one.
serviceAccountName: The name of the service account to associate
with the predictive unit container.
Returns:
A minimal SeldonDeployment object built from the provided
parameters.
"""
if not name:
name = f"zenml-{time.time()}"
if labels is None:
labels = {}
if annotations is None:
annotations = {}
if is_custom_deployment:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
parameters=parameters,
serviceAccountName=serviceAccountName,
),
engineResources=engineResources,
componentSpecs=[
SeldonDeploymentComponentSpecs(
spec=spec
# TODO [HIGH]: Add support for other component types (e.g. graph)
)
],
)
]
else:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
modelUri=model_uri or "",
implementation=implementation or "",
envSecretRefName=secret_name,
parameters=parameters,
serviceAccountName=serviceAccountName,
),
engineResources=engineResources,
)
]
return SeldonDeployment(
metadata=SeldonDeploymentMetadata(
name=name, labels=labels, annotations=annotations
),
spec=SeldonDeploymentSpec(name=name, predictors=predictors),
)
def is_managed_by_zenml(self) -> bool:
"""Checks if this Seldon Deployment is managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
Returns:
True if the Seldon Deployment is managed by ZenML, False
otherwise.
"""
return self.metadata.labels.get("app") == "zenml"
def mark_as_managed_by_zenml(self) -> None:
"""Marks this Seldon Deployment as managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
"""
self.metadata.labels["app"] = "zenml"
@property
def name(self) -> str:
"""Returns the name of this Seldon Deployment.
This is just a shortcut for `self.metadata.name`.
Returns:
The name of this Seldon Deployment.
"""
return self.metadata.name
@property
def state(self) -> SeldonDeploymentStatusState:
"""The state of the Seldon Deployment.
Returns:
The state of the Seldon Deployment.
"""
if not self.status:
return SeldonDeploymentStatusState.UNKNOWN
return self.status.state
def is_pending(self) -> bool:
"""Checks if the Seldon Deployment is in a pending state.
Returns:
True if the Seldon Deployment is pending, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.CREATING
def is_available(self) -> bool:
"""Checks if the Seldon Deployment is in an available state.
Returns:
True if the Seldon Deployment is available, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.AVAILABLE
def is_failed(self) -> bool:
"""Checks if the Seldon Deployment is in a failed state.
Returns:
True if the Seldon Deployment is failed, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.FAILED
def get_error(self) -> Optional[str]:
"""Get a message describing the error, if in an error state.
Returns:
A message describing the error, if in an error state, otherwise
None.
"""
if self.status and self.is_failed():
return self.status.description
return None
def get_pending_message(self) -> Optional[str]:
"""Get a message describing the pending conditions of the Seldon Deployment.
Returns:
A message describing the pending condition of the Seldon
Deployment, or None, if no conditions are pending.
"""
if not self.status or not self.status.conditions:
return None
ready_condition_message = [
c.message
for c in self.status.conditions
if c.type == "Ready" and not c.status
]
if not ready_condition_message:
return None
return ready_condition_message[0]
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
name: str
property
readonly
Returns the name of this Seldon Deployment.
This is just a shortcut for self.metadata.name
.
Returns:
Type | Description |
---|---|
str |
The name of this Seldon Deployment. |
state: SeldonDeploymentStatusState
property
readonly
The state of the Seldon Deployment.
Returns:
Type | Description |
---|---|
SeldonDeploymentStatusState |
The state of the Seldon Deployment. |
__str__(self)
special
Returns a string representation of the Seldon Deployment.
Returns:
Type | Description |
---|---|
str |
A string representation of the Seldon Deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def __str__(self) -> str:
"""Returns a string representation of the Seldon Deployment.
Returns:
A string representation of the Seldon Deployment.
"""
return json.dumps(self.model_dump(exclude_none=True), indent=4)
build(name=None, model_uri=None, model_name=None, implementation=None, parameters=None, engineResources=None, secret_name=None, labels=None, annotations=None, is_custom_deployment=False, spec=None, serviceAccountName=None)
classmethod
Build a basic Seldon Deployment object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Optional[str] |
The name of the Seldon Deployment. If not explicitly passed, a unique name is autogenerated. |
None |
model_uri |
Optional[str] |
The URI of the model. |
None |
model_name |
Optional[str] |
The name of the model. |
None |
implementation |
Optional[str] |
The implementation of the model. |
None |
parameters |
Optional[List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictorParameter]] |
The predictor graph parameters. |
None |
engineResources |
Optional[zenml.integrations.seldon.seldon_client.SeldonResourceRequirements] |
The resources to be allocated to the model. |
None |
secret_name |
Optional[str] |
The name of the Kubernetes secret containing environment variable values (e.g. with credentials for the artifact store) to use with the deployment service. |
None |
labels |
Optional[Dict[str, str]] |
A dictionary of labels to apply to the Seldon Deployment. |
None |
annotations |
Optional[Dict[str, str]] |
A dictionary of annotations to apply to the Seldon Deployment. |
None |
spec |
Optional[Dict[Any, Any]] |
A Kubernetes pod spec to use for the Seldon Deployment. |
None |
is_custom_deployment |
Optional[bool] |
Whether the Seldon Deployment is a custom or a built-in one. |
False |
serviceAccountName |
Optional[str] |
The name of the service account to associate with the predictive unit container. |
None |
Returns:
Type | Description |
---|---|
SeldonDeployment |
A minimal SeldonDeployment object built from the provided parameters. |
Source code in zenml/integrations/seldon/seldon_client.py
@classmethod
def build(
cls,
name: Optional[str] = None,
model_uri: Optional[str] = None,
model_name: Optional[str] = None,
implementation: Optional[str] = None,
parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None,
engineResources: Optional[SeldonResourceRequirements] = None,
secret_name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
annotations: Optional[Dict[str, str]] = None,
is_custom_deployment: Optional[bool] = False,
spec: Optional[Dict[Any, Any]] = None,
serviceAccountName: Optional[str] = None,
) -> "SeldonDeployment":
"""Build a basic Seldon Deployment object.
Args:
name: The name of the Seldon Deployment. If not explicitly passed,
a unique name is autogenerated.
model_uri: The URI of the model.
model_name: The name of the model.
implementation: The implementation of the model.
parameters: The predictor graph parameters.
engineResources: The resources to be allocated to the model.
secret_name: The name of the Kubernetes secret containing
environment variable values (e.g. with credentials for the
artifact store) to use with the deployment service.
labels: A dictionary of labels to apply to the Seldon Deployment.
annotations: A dictionary of annotations to apply to the Seldon
Deployment.
spec: A Kubernetes pod spec to use for the Seldon Deployment.
is_custom_deployment: Whether the Seldon Deployment is a custom
or a built-in one.
serviceAccountName: The name of the service account to associate
with the predictive unit container.
Returns:
A minimal SeldonDeployment object built from the provided
parameters.
"""
if not name:
name = f"zenml-{time.time()}"
if labels is None:
labels = {}
if annotations is None:
annotations = {}
if is_custom_deployment:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
parameters=parameters,
serviceAccountName=serviceAccountName,
),
engineResources=engineResources,
componentSpecs=[
SeldonDeploymentComponentSpecs(
spec=spec
# TODO [HIGH]: Add support for other component types (e.g. graph)
)
],
)
]
else:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
modelUri=model_uri or "",
implementation=implementation or "",
envSecretRefName=secret_name,
parameters=parameters,
serviceAccountName=serviceAccountName,
),
engineResources=engineResources,
)
]
return SeldonDeployment(
metadata=SeldonDeploymentMetadata(
name=name, labels=labels, annotations=annotations
),
spec=SeldonDeploymentSpec(name=name, predictors=predictors),
)
get_error(self)
Get a message describing the error, if in an error state.
Returns:
Type | Description |
---|---|
Optional[str] |
A message describing the error, if in an error state, otherwise None. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_error(self) -> Optional[str]:
"""Get a message describing the error, if in an error state.
Returns:
A message describing the error, if in an error state, otherwise
None.
"""
if self.status and self.is_failed():
return self.status.description
return None
get_pending_message(self)
Get a message describing the pending conditions of the Seldon Deployment.
Returns:
Type | Description |
---|---|
Optional[str] |
A message describing the pending condition of the Seldon Deployment, or None, if no conditions are pending. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_pending_message(self) -> Optional[str]:
"""Get a message describing the pending conditions of the Seldon Deployment.
Returns:
A message describing the pending condition of the Seldon
Deployment, or None, if no conditions are pending.
"""
if not self.status or not self.status.conditions:
return None
ready_condition_message = [
c.message
for c in self.status.conditions
if c.type == "Ready" and not c.status
]
if not ready_condition_message:
return None
return ready_condition_message[0]
is_available(self)
Checks if the Seldon Deployment is in an available state.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is available, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_available(self) -> bool:
"""Checks if the Seldon Deployment is in an available state.
Returns:
True if the Seldon Deployment is available, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.AVAILABLE
is_failed(self)
Checks if the Seldon Deployment is in a failed state.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is failed, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_failed(self) -> bool:
"""Checks if the Seldon Deployment is in a failed state.
Returns:
True if the Seldon Deployment is failed, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.FAILED
is_managed_by_zenml(self)
Checks if this Seldon Deployment is managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the app
label value to zenml
.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is managed by ZenML, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_managed_by_zenml(self) -> bool:
"""Checks if this Seldon Deployment is managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
Returns:
True if the Seldon Deployment is managed by ZenML, False
otherwise.
"""
return self.metadata.labels.get("app") == "zenml"
is_pending(self)
Checks if the Seldon Deployment is in a pending state.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is pending, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_pending(self) -> bool:
"""Checks if the Seldon Deployment is in a pending state.
Returns:
True if the Seldon Deployment is pending, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.CREATING
mark_as_managed_by_zenml(self)
Marks this Seldon Deployment as managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the app
label value to zenml
.
Source code in zenml/integrations/seldon/seldon_client.py
def mark_as_managed_by_zenml(self) -> None:
"""Marks this Seldon Deployment as managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
"""
self.metadata.labels["app"] = "zenml"
SeldonDeploymentComponentSpecs (BaseModel)
Component specs for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
spec |
Optional[Dict[str, Any]] |
the component spec. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentComponentSpecs(BaseModel):
"""Component specs for a Seldon Deployment.
Attributes:
spec: the component spec.
"""
spec: Optional[Dict[str, Any]] = None
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentExistsError (SeldonClientError)
Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentExistsError(SeldonClientError):
"""Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists."""
SeldonDeploymentMetadata (BaseModel)
Metadata for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the Seldon Deployment. |
labels |
Dict[str, str] |
Kubernetes labels for the Seldon Deployment. |
annotations |
Dict[str, str] |
Kubernetes annotations for the Seldon Deployment. |
creationTimestamp |
Optional[str] |
the creation timestamp of the Seldon Deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentMetadata(BaseModel):
"""Metadata for a Seldon Deployment.
Attributes:
name: the name of the Seldon Deployment.
labels: Kubernetes labels for the Seldon Deployment.
annotations: Kubernetes annotations for the Seldon Deployment.
creationTimestamp: the creation timestamp of the Seldon Deployment.
"""
name: str
labels: Dict[str, str] = Field(default_factory=dict)
annotations: Dict[str, str] = Field(default_factory=dict)
creationTimestamp: Optional[str] = None
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentNotFoundError (SeldonClientError)
Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentNotFoundError(SeldonClientError):
"""Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML."""
SeldonDeploymentPredictiveUnit (BaseModel)
Seldon Deployment predictive unit.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the predictive unit. |
type |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnitType] |
predictive unit type. |
implementation |
Optional[str] |
the Seldon Core implementation used to serve the model. |
modelUri |
Optional[str] |
URI of the model (or models) to serve. |
serviceAccountName |
Optional[str] |
the name of the service account to associate with the predictive unit container. |
envSecretRefName |
Optional[str] |
the name of a Kubernetes secret that contains environment variables (e.g. credentials) to be configured for the predictive unit container. |
children |
Optional[List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnit]] |
a list of child predictive units that together make up the model serving graph. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictiveUnit(BaseModel):
"""Seldon Deployment predictive unit.
Attributes:
name: the name of the predictive unit.
type: predictive unit type.
implementation: the Seldon Core implementation used to serve the model.
modelUri: URI of the model (or models) to serve.
serviceAccountName: the name of the service account to associate with
the predictive unit container.
envSecretRefName: the name of a Kubernetes secret that contains
environment variables (e.g. credentials) to be configured for the
predictive unit container.
children: a list of child predictive units that together make up the
model serving graph.
"""
name: str
type: Optional[SeldonDeploymentPredictiveUnitType] = (
SeldonDeploymentPredictiveUnitType.MODEL
)
implementation: Optional[str] = None
modelUri: Optional[str] = None
parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None
serviceAccountName: Optional[str] = None
envSecretRefName: Optional[str] = None
children: Optional[List["SeldonDeploymentPredictiveUnit"]] = None
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentPredictiveUnitType (StrEnum)
Predictive unit types for a Seldon Deployment.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictiveUnitType(StrEnum):
"""Predictive unit types for a Seldon Deployment."""
UNKNOWN_TYPE = "UNKNOWN_TYPE"
ROUTER = "ROUTER"
COMBINER = "COMBINER"
MODEL = "MODEL"
TRANSFORMER = "TRANSFORMER"
OUTPUT_TRANSFORMER = "OUTPUT_TRANSFORMER"
SeldonDeploymentPredictor (BaseModel)
Seldon Deployment predictor.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the predictor. |
replicas |
int |
the number of pod replicas for the predictor. |
graph |
SeldonDeploymentPredictiveUnit |
the serving graph composed of one or more predictive units. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictor(BaseModel):
"""Seldon Deployment predictor.
Attributes:
name: the name of the predictor.
replicas: the number of pod replicas for the predictor.
graph: the serving graph composed of one or more predictive units.
"""
name: str
replicas: int = 1
graph: SeldonDeploymentPredictiveUnit
engineResources: Optional[SeldonResourceRequirements] = Field(
default_factory=SeldonResourceRequirements
)
componentSpecs: Optional[List[SeldonDeploymentComponentSpecs]] = None
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentPredictorParameter (BaseModel)
Parameter for Seldon Deployment predictor.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
parameter name |
type |
str |
parameter, can be INT, FLOAT, DOUBLE, STRING, BOOL |
value |
str |
parameter value |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictorParameter(BaseModel):
"""Parameter for Seldon Deployment predictor.
Attributes:
name: parameter name
type: parameter, can be INT, FLOAT, DOUBLE, STRING, BOOL
value: parameter value
"""
name: str = ""
type: str = ""
value: str = ""
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentSpec (BaseModel)
Spec for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the Seldon Deployment. |
protocol |
Optional[str] |
the API protocol used for the Seldon Deployment. |
predictors |
List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictor] |
a list of predictors that make up the serving graph. |
replicas |
int |
the default number of pod replicas used for the predictors. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentSpec(BaseModel):
"""Spec for a Seldon Deployment.
Attributes:
name: the name of the Seldon Deployment.
protocol: the API protocol used for the Seldon Deployment.
predictors: a list of predictors that make up the serving graph.
replicas: the default number of pod replicas used for the predictors.
"""
name: str
protocol: Optional[str] = None
predictors: List[SeldonDeploymentPredictor]
replicas: int = 1
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentStatus (BaseModel)
The status of a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
state |
SeldonDeploymentStatusState |
the current state of the Seldon Deployment. |
description |
Optional[str] |
a human-readable description of the current state. |
replicas |
Optional[int] |
the current number of running pod replicas |
address |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatusAddress] |
the address where the Seldon Deployment API can be accessed. |
conditions |
List[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatusCondition] |
the list of Kubernetes conditions for the Seldon Deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatus(BaseModel):
"""The status of a Seldon Deployment.
Attributes:
state: the current state of the Seldon Deployment.
description: a human-readable description of the current state.
replicas: the current number of running pod replicas
address: the address where the Seldon Deployment API can be accessed.
conditions: the list of Kubernetes conditions for the Seldon Deployment.
"""
state: SeldonDeploymentStatusState = SeldonDeploymentStatusState.UNKNOWN
description: Optional[str] = None
replicas: Optional[int] = None
address: Optional[SeldonDeploymentStatusAddress] = None
conditions: List[SeldonDeploymentStatusCondition]
model_config = ConfigDict(
# validate attribute assignments
validate_assignment=True,
# Ignore extra attributes from the CRD that are not reflected here
extra="ignore",
)
SeldonDeploymentStatusAddress (BaseModel)
The status address for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
url |
str |
the URL where the Seldon Deployment API can be accessed internally. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusAddress(BaseModel):
"""The status address for a Seldon Deployment.
Attributes:
url: the URL where the Seldon Deployment API can be accessed internally.
"""
url: str
SeldonDeploymentStatusCondition (BaseModel)
The Kubernetes status condition entry for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
type |
str |
Type of runtime condition. |
status |
bool |
Status of the condition. |
reason |
Optional[str] |
Brief CamelCase string containing reason for the condition's last transition. |
message |
Optional[str] |
Human-readable message indicating details about last transition. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusCondition(BaseModel):
"""The Kubernetes status condition entry for a Seldon Deployment.
Attributes:
type: Type of runtime condition.
status: Status of the condition.
reason: Brief CamelCase string containing reason for the condition's
last transition.
message: Human-readable message indicating details about last
transition.
"""
type: str
status: bool
reason: Optional[str] = None
message: Optional[str] = None
SeldonDeploymentStatusState (StrEnum)
Possible state values for a Seldon Deployment.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusState(StrEnum):
"""Possible state values for a Seldon Deployment."""
UNKNOWN = "Unknown"
AVAILABLE = "Available"
CREATING = "Creating"
FAILED = "Failed"
SeldonResourceRequirements (BaseModel)
Resource requirements for a Seldon deployed model.
Attributes:
Name | Type | Description |
---|---|---|
limits |
Dict[str, str] |
an upper limit of resources to be used by the model |
requests |
Dict[str, str] |
resources requested by the model |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonResourceRequirements(BaseModel):
"""Resource requirements for a Seldon deployed model.
Attributes:
limits: an upper limit of resources to be used by the model
requests: resources requested by the model
"""
limits: Dict[str, str] = Field(default_factory=dict)
requests: Dict[str, str] = Field(default_factory=dict)
create_seldon_core_custom_spec(model_uri, custom_docker_image, secret_name, command, container_registry_secret_name=None)
Create a custom pod spec for the seldon core container.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_uri |
Optional[str] |
The URI of the model to load. |
required |
custom_docker_image |
Optional[str] |
The docker image to use. |
required |
secret_name |
Optional[str] |
The name of the Kubernetes secret to use. |
required |
command |
Optional[List[str]] |
The command to run in the container. |
required |
container_registry_secret_name |
Optional[str] |
The name of the secret to use for docker image pull. |
None |
Returns:
Type | Description |
---|---|
kubernetes.client.V1PodSpec |
A pod spec for the seldon core container. |
Source code in zenml/integrations/seldon/seldon_client.py
def create_seldon_core_custom_spec(
model_uri: Optional[str],
custom_docker_image: Optional[str],
secret_name: Optional[str],
command: Optional[List[str]],
container_registry_secret_name: Optional[str] = None,
) -> k8s_client.V1PodSpec:
"""Create a custom pod spec for the seldon core container.
Args:
model_uri: The URI of the model to load.
custom_docker_image: The docker image to use.
secret_name: The name of the Kubernetes secret to use.
command: The command to run in the container.
container_registry_secret_name: The name of the secret to use for docker
image pull.
Returns:
A pod spec for the seldon core container.
"""
volume = k8s_client.V1Volume(
name="classifier-provision-location",
empty_dir={},
)
init_container = k8s_client.V1Container(
name="classifier-model-initializer",
image="seldonio/rclone-storage-initializer:1.14.0-dev",
image_pull_policy="IfNotPresent",
args=[model_uri, "/mnt/models"],
volume_mounts=[
k8s_client.V1VolumeMount(
name="classifier-provision-location", mount_path="/mnt/models"
)
],
)
if secret_name:
init_container.env_from = [
k8s_client.V1EnvFromSource(
secret_ref=k8s_client.V1SecretEnvSource(
name=secret_name, optional=False
)
)
]
container = k8s_client.V1Container(
name="classifier",
image=custom_docker_image,
image_pull_policy="IfNotPresent",
command=command,
volume_mounts=[
k8s_client.V1VolumeMount(
name="classifier-provision-location",
mount_path="/mnt/models",
read_only=True,
)
],
ports=[
k8s_client.V1ContainerPort(container_port=5000),
k8s_client.V1ContainerPort(container_port=9000),
],
)
if container_registry_secret_name:
image_pull_secret = k8s_client.V1LocalObjectReference(
name=container_registry_secret_name
)
spec = k8s_client.V1PodSpec(
volumes=[
volume,
],
init_containers=[
init_container,
],
image_pull_secrets=[image_pull_secret],
containers=[container],
)
else:
spec = k8s_client.V1PodSpec(
volumes=[
volume,
],
init_containers=[
init_container,
],
containers=[container],
)
return api.sanitize_for_serialization(spec)
services
special
Initialization for Seldon services.
seldon_deployment
Implementation for the Seldon Deployment step.
SeldonDeploymentConfig (ServiceConfig)
Seldon Core deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_uri |
str |
URI of the model (or models) to serve. |
model_name |
str |
the name of the model. Multiple versions of the same model should use the same model name. |
implementation |
str |
the Seldon Core implementation used to serve the model.
The implementation type can be one of the following: |
replicas |
int |
number of replicas to use for the prediction service. |
secret_name |
Optional[str] |
the name of a Kubernetes secret containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store). |
model_metadata |
Dict[str, Any] |
optional model metadata information (see https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html). |
extra_args |
Dict[str, Any] |
additional arguments to pass to the Seldon Core deployment resource configuration. |
is_custom_deployment |
Optional[bool] |
whether the deployment is a custom deployment |
spec |
Optional[Dict[Any, Any]] |
custom Kubernetes resource specification for the Seldon Core |
serviceAccountName |
Optional[str] |
The name of the Service Account applied to the deployment. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentConfig(ServiceConfig):
"""Seldon Core deployment service configuration.
Attributes:
model_uri: URI of the model (or models) to serve.
model_name: the name of the model. Multiple versions of the same model
should use the same model name.
implementation: the Seldon Core implementation used to serve the model.
The implementation type can be one of the following: `TENSORFLOW_SERVER`,
`SKLEARN_SERVER`, `XGBOOST_SERVER`, `custom`.
replicas: number of replicas to use for the prediction service.
secret_name: the name of a Kubernetes secret containing additional
configuration parameters for the Seldon Core deployment (e.g.
credentials to access the Artifact Store).
model_metadata: optional model metadata information (see
https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html).
extra_args: additional arguments to pass to the Seldon Core deployment
resource configuration.
is_custom_deployment: whether the deployment is a custom deployment
spec: custom Kubernetes resource specification for the Seldon Core
serviceAccountName: The name of the Service Account applied to the deployment.
"""
model_uri: str = ""
model_name: str = "default"
# TODO [ENG-775]: have an enum of all supported Seldon Core implementations
implementation: str
parameters: Optional[List[SeldonDeploymentPredictorParameter]]
resources: Optional[SeldonResourceRequirements]
replicas: int = 1
secret_name: Optional[str]
model_metadata: Dict[str, Any] = Field(default_factory=dict)
extra_args: Dict[str, Any] = Field(default_factory=dict)
is_custom_deployment: Optional[bool] = False
spec: Optional[Dict[Any, Any]] = Field(default_factory=dict)
serviceAccountName: Optional[str] = None
def get_seldon_deployment_labels(self) -> Dict[str, str]:
"""Generate labels for the Seldon Core deployment from the service configuration.
These labels are attached to the Seldon Core deployment resource
and may be used as label selectors in lookup operations.
Returns:
The labels for the Seldon Core deployment.
"""
labels = {}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.implementation:
labels["zenml.model_type"] = self.implementation
if self.extra_args:
for key, value in self.extra_args.items():
labels[f"zenml.{key}"] = value
SeldonClient.sanitize_labels(labels)
return labels
def get_seldon_deployment_annotations(self) -> Dict[str, str]:
"""Generate annotations for the Seldon Core deployment from the service configuration.
The annotations are used to store additional information about the
Seldon Core service that is associated with the deployment that is
not available in the labels. One annotation particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote Seldon deployment.
Returns:
The annotations for the Seldon Core deployment.
"""
annotations = {
"zenml.service_config": self.model_dump_json(),
"zenml.version": __version__,
}
return annotations
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentConfig":
"""Recreate the configuration of a Seldon Core Service from a deployed instance.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service configuration corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations, or it contains an invalid or
incompatible Seldon Core service configuration.
"""
config_data = deployment.metadata.annotations.get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.model_validate_json(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded Seldon Core deployment resource contains an "
f"invalid or incompatible Seldon Core service configuration: "
f"{config_data}"
) from e
return service_config
create_from_deployment(deployment)
classmethod
Recreate the configuration of a Seldon Core Service from a deployed instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource. |
required |
Returns:
Type | Description |
---|---|
SeldonDeploymentConfig |
The Seldon Core service configuration corresponding to the given Seldon Core deployment resource. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected annotations, or it contains an invalid or incompatible Seldon Core service configuration. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentConfig":
"""Recreate the configuration of a Seldon Core Service from a deployed instance.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service configuration corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations, or it contains an invalid or
incompatible Seldon Core service configuration.
"""
config_data = deployment.metadata.annotations.get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.model_validate_json(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded Seldon Core deployment resource contains an "
f"invalid or incompatible Seldon Core service configuration: "
f"{config_data}"
) from e
return service_config
get_seldon_deployment_annotations(self)
Generate annotations for the Seldon Core deployment from the service configuration.
The annotations are used to store additional information about the Seldon Core service that is associated with the deployment that is not available in the labels. One annotation particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote Seldon deployment.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The annotations for the Seldon Core deployment. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_seldon_deployment_annotations(self) -> Dict[str, str]:
"""Generate annotations for the Seldon Core deployment from the service configuration.
The annotations are used to store additional information about the
Seldon Core service that is associated with the deployment that is
not available in the labels. One annotation particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote Seldon deployment.
Returns:
The annotations for the Seldon Core deployment.
"""
annotations = {
"zenml.service_config": self.model_dump_json(),
"zenml.version": __version__,
}
return annotations
get_seldon_deployment_labels(self)
Generate labels for the Seldon Core deployment from the service configuration.
These labels are attached to the Seldon Core deployment resource and may be used as label selectors in lookup operations.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The labels for the Seldon Core deployment. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_seldon_deployment_labels(self) -> Dict[str, str]:
"""Generate labels for the Seldon Core deployment from the service configuration.
These labels are attached to the Seldon Core deployment resource
and may be used as label selectors in lookup operations.
Returns:
The labels for the Seldon Core deployment.
"""
labels = {}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.implementation:
labels["zenml.model_type"] = self.implementation
if self.extra_args:
for key, value in self.extra_args.items():
labels[f"zenml.{key}"] = value
SeldonClient.sanitize_labels(labels)
return labels
SeldonDeploymentService (BaseDeploymentService)
A service that represents a Seldon Core deployment server.
Attributes:
Name | Type | Description |
---|---|---|
config |
SeldonDeploymentConfig |
service configuration. |
status |
SeldonDeploymentServiceStatus |
service status. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentService(BaseDeploymentService):
"""A service that represents a Seldon Core deployment server.
Attributes:
config: service configuration.
status: service status.
"""
SERVICE_TYPE = ServiceType(
name="seldon-deployment",
type="model-serving",
flavor="seldon",
description="Seldon Core prediction service",
logo_url="https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/seldon.png",
)
config: SeldonDeploymentConfig
status: SeldonDeploymentServiceStatus = Field(
default_factory=lambda: SeldonDeploymentServiceStatus()
)
def _get_client(self) -> SeldonClient:
"""Get the Seldon Core client from the active Seldon Core model deployer.
Returns:
The Seldon Core client.
"""
from zenml.integrations.seldon.model_deployers.seldon_model_deployer import (
SeldonModelDeployer,
)
model_deployer = cast(
SeldonModelDeployer,
SeldonModelDeployer.get_active_model_deployer(),
)
return model_deployer.seldon_client
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the Seldon Core deployment.
Returns:
The operational state of the Seldon Core deployment and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
deployment = client.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return (ServiceState.INACTIVE, "")
if deployment.is_available():
return (
ServiceState.ACTIVE,
f"Seldon Core deployment '{name}' is available",
)
if deployment.is_failed():
return (
ServiceState.ERROR,
f"Seldon Core deployment '{name}' failed: "
f"{deployment.get_error()}",
)
pending_message = deployment.get_pending_message() or ""
return (
ServiceState.PENDING_STARTUP,
"Seldon Core deployment is being created: " + pending_message,
)
@property
def seldon_deployment_name(self) -> str:
"""Get the name of the Seldon Core deployment.
It should return the one that uniquely corresponds to this service instance.
Returns:
The name of the Seldon Core deployment.
"""
return f"zenml-{str(self.uuid)}"
def _get_seldon_deployment_labels(self) -> Dict[str, str]:
"""Generate the labels for the Seldon Core deployment from the service configuration.
Returns:
The labels for the Seldon Core deployment.
"""
labels = self.config.get_seldon_deployment_labels()
labels["zenml.service_uuid"] = str(self.uuid)
SeldonClient.sanitize_labels(labels)
return labels
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentService":
"""Recreate a Seldon Core service from a Seldon Core deployment resource.
It should then update their operational status.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected service_uuid label.
"""
config = SeldonDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.labels.get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
def provision(self) -> None:
"""Provision or update remote Seldon Core deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
name = self.seldon_deployment_name
deployment = SeldonDeployment.build(
name=name,
model_uri=self.config.model_uri,
model_name=self.config.model_name,
implementation=self.config.implementation,
parameters=self.config.parameters,
engineResources=self.config.resources,
secret_name=self.config.secret_name,
labels=self._get_seldon_deployment_labels(),
annotations=self.config.get_seldon_deployment_annotations(),
is_custom_deployment=self.config.is_custom_deployment,
spec=self.config.spec,
serviceAccountName=self.config.serviceAccountName,
)
deployment.spec.replicas = self.config.replicas
deployment.spec.predictors[0].replicas = self.config.replicas
# check if the Seldon deployment already exists
try:
client.get_deployment(name=name)
# update the existing deployment
client.update_deployment(deployment)
except SeldonDeploymentNotFoundError:
# create the deployment
client.create_deployment(deployment=deployment)
def deprovision(self, force: bool = False) -> None:
"""Deprovision the remote Seldon Core deployment instance.
Args:
force: if True, the remote deployment instance will be
forcefully deprovisioned.
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
client.delete_deployment(name=name, force=force)
except SeldonDeploymentNotFoundError:
pass
def get_logs(
self,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core model deployment.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_client().get_deployment_logs(
self.seldon_deployment_name,
follow=follow,
tail=tail,
)
@property
def prediction_url(self) -> Optional[str]:
"""The prediction URI exposed by the prediction service.
Returns:
The prediction URI exposed by the prediction service, or None if
the service is not yet ready.
"""
from zenml.integrations.seldon.model_deployers.seldon_model_deployer import (
SeldonModelDeployer,
)
if not self.is_running:
return None
namespace = self._get_client().namespace
model_deployer = cast(
SeldonModelDeployer,
SeldonModelDeployer.get_active_model_deployer(),
)
return os.path.join(
model_deployer.config.base_url,
"seldon",
namespace,
self.seldon_deployment_name,
"api/v0.1/predictions",
)
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a numpy array representing the request
Returns:
A numpy array representing the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"Seldon prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post( # nosec
self.prediction_url,
json={"data": {"ndarray": request}},
)
response.raise_for_status()
return response.json()
prediction_url: Optional[str]
property
readonly
The prediction URI exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction URI exposed by the prediction service, or None if the service is not yet ready. |
seldon_deployment_name: str
property
readonly
Get the name of the Seldon Core deployment.
It should return the one that uniquely corresponds to this service instance.
Returns:
Type | Description |
---|---|
str |
The name of the Seldon Core deployment. |
check_status(self)
Check the the current operational state of the Seldon Core deployment.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the Seldon Core deployment and a message providing additional information about that state (e.g. a description of the error, if one is encountered). |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the Seldon Core deployment.
Returns:
The operational state of the Seldon Core deployment and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
deployment = client.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return (ServiceState.INACTIVE, "")
if deployment.is_available():
return (
ServiceState.ACTIVE,
f"Seldon Core deployment '{name}' is available",
)
if deployment.is_failed():
return (
ServiceState.ERROR,
f"Seldon Core deployment '{name}' failed: "
f"{deployment.get_error()}",
)
pending_message = deployment.get_pending_message() or ""
return (
ServiceState.PENDING_STARTUP,
"Seldon Core deployment is being created: " + pending_message,
)
create_from_deployment(deployment)
classmethod
Recreate a Seldon Core service from a Seldon Core deployment resource.
It should then update their operational status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource. |
required |
Returns:
Type | Description |
---|---|
SeldonDeploymentService |
The Seldon Core service corresponding to the given Seldon Core deployment resource. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected service_uuid label. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentService":
"""Recreate a Seldon Core service from a Seldon Core deployment resource.
It should then update their operational status.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected service_uuid label.
"""
config = SeldonDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.labels.get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
deprovision(self, force=False)
Deprovision the remote Seldon Core deployment instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the remote deployment instance will be forcefully deprovisioned. |
False |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def deprovision(self, force: bool = False) -> None:
"""Deprovision the remote Seldon Core deployment instance.
Args:
force: if True, the remote deployment instance will be
forcefully deprovisioned.
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
client.delete_deployment(name=name, force=force)
except SeldonDeploymentNotFoundError:
pass
get_logs(self, follow=False, tail=None)
Get the logs of a Seldon Core model deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_logs(
self,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core model deployment.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_client().get_deployment_logs(
self.seldon_deployment_name,
follow=follow,
tail=tail,
)
predict(self, request)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
str |
a numpy array representing the request |
required |
Returns:
Type | Description |
---|---|
Any |
A numpy array representing the prediction returned by the service. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not yet ready. |
ValueError |
if the prediction_url is not set. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a numpy array representing the request
Returns:
A numpy array representing the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"Seldon prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post( # nosec
self.prediction_url,
json={"data": {"ndarray": request}},
)
response.raise_for_status()
return response.json()
provision(self)
Provision or update remote Seldon Core deployment instance.
This should then match the current configuration.
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def provision(self) -> None:
"""Provision or update remote Seldon Core deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
name = self.seldon_deployment_name
deployment = SeldonDeployment.build(
name=name,
model_uri=self.config.model_uri,
model_name=self.config.model_name,
implementation=self.config.implementation,
parameters=self.config.parameters,
engineResources=self.config.resources,
secret_name=self.config.secret_name,
labels=self._get_seldon_deployment_labels(),
annotations=self.config.get_seldon_deployment_annotations(),
is_custom_deployment=self.config.is_custom_deployment,
spec=self.config.spec,
serviceAccountName=self.config.serviceAccountName,
)
deployment.spec.replicas = self.config.replicas
deployment.spec.predictors[0].replicas = self.config.replicas
# check if the Seldon deployment already exists
try:
client.get_deployment(name=name)
# update the existing deployment
client.update_deployment(deployment)
except SeldonDeploymentNotFoundError:
# create the deployment
client.create_deployment(deployment=deployment)
SeldonDeploymentServiceStatus (ServiceStatus)
Seldon Core deployment service status.
Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentServiceStatus(ServiceStatus):
"""Seldon Core deployment service status."""
steps
special
Initialization for Seldon steps.
seldon_deployer
Implementation of the Seldon Deployer step.