Model Deployers
zenml.model_deployers
special
Model deployers are stack components responsible for online model serving.
Online serving is the process of hosting and loading machine-learning models as part of a managed web service and providing access to the models through an API endpoint like HTTP or GRPC. Once deployed, you can send inference requests to the model through the web service's API and receive fast, low-latency responses.
Add a model deployer to your ZenML stack to be able to implement continuous model deployment pipelines that train models and continuously deploy them to a model prediction web service.
When present in a stack, the model deployer also acts as a registry for models that are served with ZenML. You can use the model deployer to list all models that are currently deployed for online inference or filtered according to a particular pipeline run or step, or to suspend, resume or delete an external model server managed through ZenML.
base_model_deployer
Base class for all ZenML model deployers.
BaseModelDeployer (StackComponent, ABC)
Base class for all ZenML model deployers.
The model deployer serves three major purposes:
-
It contains all the stack related configuration attributes required to interact with the remote model serving tool, service or platform (e.g. hostnames, URLs, references to credentials, other client related configuration parameters).
-
It implements the continuous deployment logic necessary to deploy models in a way that updates an existing model server that is already serving a previous version of the same model instead of creating a new model server for every new model version (see the
deploy_model
abstract method). This functionality can be consumed directly from ZenML pipeline steps, but it can also be used outside the pipeline to deploy ad hoc models. It is also usually coupled with a standard model deployer step, implemented by each integration, that hides the details of the deployment process away from the user. -
It acts as a ZenML BaseService registry, where every BaseService instance is used as an internal representation of a remote model server (see the
find_model_server
abstract method). To achieve this, it must be able to re-create the configuration of a BaseService from information that is persisted externally, alongside or even part of the remote model server configuration itself. For example, for model servers that are implemented as Kubernetes resources, the BaseService instances can be serialized and saved as Kubernetes resource annotations. This allows the model deployer to keep track of all externally running model servers and to re-create their corresponding BaseService instance representations at any given time. The model deployer also defines methods that implement basic life-cycle management on remote model servers outside the coverage of a pipeline (seestop_model_server
,start_model_server
anddelete_model_server
).
Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployer(StackComponent, ABC):
"""Base class for all ZenML model deployers.
The model deployer serves three major purposes:
1. It contains all the stack related configuration attributes required to
interact with the remote model serving tool, service or platform (e.g.
hostnames, URLs, references to credentials, other client related
configuration parameters).
2. It implements the continuous deployment logic necessary to deploy models
in a way that updates an existing model server that is already serving a
previous version of the same model instead of creating a new model server
for every new model version (see the `deploy_model` abstract method).
This functionality can be consumed directly from ZenML pipeline steps, but
it can also be used outside the pipeline to deploy ad hoc models. It is
also usually coupled with a standard model deployer step, implemented by
each integration, that hides the details of the deployment process away from
the user.
3. It acts as a ZenML BaseService registry, where every BaseService instance
is used as an internal representation of a remote model server (see the
`find_model_server` abstract method). To achieve this, it must be able to
re-create the configuration of a BaseService from information that is
persisted externally, alongside or even part of the remote model server
configuration itself. For example, for model servers that are implemented as
Kubernetes resources, the BaseService instances can be serialized and saved
as Kubernetes resource annotations. This allows the model deployer to keep
track of all externally running model servers and to re-create their
corresponding BaseService instance representations at any given time.
The model deployer also defines methods that implement basic life-cycle
management on remote model servers outside the coverage of a pipeline
(see `stop_model_server`, `start_model_server` and `delete_model_server`).
"""
NAME: ClassVar[str]
FLAVOR: ClassVar[Type["BaseModelDeployerFlavor"]]
@property
def config(self) -> BaseModelDeployerConfig:
"""Returns the `BaseModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(BaseModelDeployerConfig, self._config)
@classmethod
def get_active_model_deployer(cls) -> "BaseModelDeployer":
"""Get the model deployer registered in the active stack.
Returns:
The model deployer registered in the active stack.
Raises:
TypeError: if a model deployer is not part of the
active stack.
"""
flavor: BaseModelDeployerFlavor = cls.FLAVOR()
client = Client()
model_deployer = client.active_stack.model_deployer
if not model_deployer or not isinstance(model_deployer, cls):
raise TypeError(
f"The active stack needs to have a {cls.NAME} model "
f"deployer component registered to be able deploy models "
f"with {cls.NAME}. You can create a new stack with "
f"a {cls.NAME} model deployer component or update your "
f"active stack to add this component, e.g.:\n\n"
f" `zenml model-deployer register {flavor.name} "
f"--flavor={flavor.name} ...`\n"
f" `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
f" or:\n"
f" `zenml stack update -d {flavor.name}`\n\n"
)
return model_deployer
def deploy_model(
self,
config: ServiceConfig,
service_type: ServiceType,
replace: bool = False,
continuous_deployment_mode: bool = False,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Deploy a model.
the deploy_model method is the main entry point for deploying models
using the model deployer. It is used to deploy a model to a model server
instance that is running on a remote serving platform or service. The
method is responsible for detecting if there is an existing model server
instance running serving one or more previous versions of the same model
and deploying the model to the serving platform or updating the existing
model server instance to include the new model version. The method
returns a Service object that is a representation of the external model
server instance. The Service object must implement basic operational
state tracking and lifecycle management operations for the model server
(e.g. start, stop, etc.).
Args:
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
replace: If True, it will replace any existing model server instances
that serve the same model. If False, it does not replace any
existing model server instance.
continuous_deployment_mode: If True, it will replace any existing
model server instances that serve the same model, regardless of
the configuration. If False, it will only replace existing model
server instances that serve the same model if the configuration
is exactly the same.
timeout: The maximum time in seconds to wait for the model server
to start serving the model.
service_type: The type of the service to deploy. If not provided,
the default service type of the model deployer will be used.
Raises:
RuntimeError: if the model deployment fails.
Returns:
The deployment Service object.
"""
# Instantiate the client
client = Client()
if not continuous_deployment_mode:
# Find existing model server
services = self.find_model_server(
config=config.dict(),
service_type=service_type,
)
if len(services) > 0:
logger.info(
f"Existing model server found for {config.name or config.model_name} with the exact same configuration. Returning the existing service named {services[0].config.service_name}."
)
return services[0]
else:
# Find existing model server
services = self.find_model_server(
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
service_type=service_type,
)
if len(services) > 0:
logger.info(
f"Existing model server found for {config.pipeline_name} and {config.pipeline_step_name}, since continuous deployment mode is enabled, replacing the existing service named {services[0].config.service_name}."
)
service = services[0]
self.delete_model_server(service.uuid)
logger.info(
f"Deploying model server for {config.model_name} with the following configuration: {config.dict()}"
)
service_response = client.create_service(
config=config,
service_type=service_type,
model_version_id=get_model_version_id_if_exists(
config.model_name, config.model_version
),
)
try:
service = self.perform_deploy_model(
id=service_response.id,
config=config,
timeout=timeout,
)
except Exception as e:
client.delete_service(service_response.id)
raise RuntimeError(
f"Failed to deploy model server for {config.model_name}: {e}"
) from e
# Update the service in store
client.update_service(
id=service.uuid,
name=service.config.service_name,
service_source=service.dict().get("type"),
admin_state=service.admin_state,
status=service.status.dict(),
endpoint=service.endpoint.dict() if service.endpoint else None,
# labels=service.config.get_service_labels() # TODO: fix labels in services and config
prediction_url=service.get_prediction_url(),
health_check_url=service.get_healthcheck_url(),
)
return service
@abstractmethod
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Abstract method to deploy a model.
Concrete model deployer subclasses must implement the following
functionality in this method:
- Detect if there is an existing model server instance running serving
one or more previous versions of the same model
- Deploy the model to the serving platform or update the existing model
server instance to include the new model version
- Return a Service object that is a representation of the external model
server instance. The Service must implement basic operational state
tracking and lifecycle management operations for the model server (e.g.
start, stop, etc.)
Args:
id: UUID of the service that was originally used to deploy the model.
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
timeout: The maximum time in seconds to wait for the model server
to start serving the model.
Returns:
The deployment Service object.
"""
@staticmethod
@abstractmethod
def get_model_server_info(
service: BaseService,
) -> Dict[str, Optional[str]]:
"""Give implementation specific way to extract relevant model server properties for the user.
Args:
service: Integration-specific service instance
Returns:
A dictionary containing the relevant model server properties.
"""
def find_model_server(
self,
config: Optional[Dict[str, Any]] = None,
running: Optional[bool] = None,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
service_name: Optional[str] = None,
model_name: Optional[str] = None,
model_version: Optional[str] = None,
service_type: Optional[ServiceType] = None,
type: Optional[str] = None,
flavor: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
) -> List[BaseService]:
"""Abstract method to find one or more a model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_step_name: The name of the pipeline step that was originally used
to deploy the model.
pipeline_name: The name of the pipeline that was originally used to deploy
the model from the model registry.
model_name: The name of the model that was originally used to deploy
the model from the model registry.
model_version: The version of the model that was originally used to
deploy the model from the model registry.
service_type: The type of the service to find.
type: The type of the service to find.
flavor: The flavor of the service to find.
pipeline_run_id: The UUID of the pipeline run that was originally used
to deploy the model.
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
service_name: The name of the service to find.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
client = Client()
service_responses = client.list_services(
sort_by="desc:created",
id=service_uuid,
running=running,
service_name=service_name,
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_version_id=get_model_version_id_if_exists(
model_name, model_version
),
pipeline_run_id=pipeline_run_id,
config=config,
type=type or service_type.type if service_type else None,
flavor=flavor or service_type.flavor if service_type else None,
hydrate=True,
)
services = []
for service_response in service_responses.items:
if not service_response.service_source:
client.delete_service(service_response.id)
continue
service = BaseDeploymentService.from_model(service_response)
service.update_status()
if service.status.dict() != service_response.status:
client.update_service(
id=service.uuid,
admin_state=service.admin_state,
status=service.status.dict(),
endpoint=service.endpoint.dict()
if service.endpoint
else None,
)
if running and not service.is_running:
logger.warning(
f"Service {service.uuid} is in an unexpected state. "
f"Expected running={running}, but found running={service.is_running}."
)
continue
services.append(service)
return services
@abstractmethod
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by `find_model_server` and
it should be possible to start it again by calling `start_model_server`.
Args:
service: The service to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by `find_model_server` and
it should be possible to start it again by calling `start_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
Raises:
RuntimeError: if the model server is not found.
"""
client = Client()
try:
service = self.find_model_server(service_uuid=uuid)[0]
updated_service = self.perform_stop_model(service, timeout, force)
client.update_service(
id=updated_service.uuid,
admin_state=updated_service.admin_state,
status=updated_service.status.dict(),
endpoint=updated_service.endpoint.dict()
if updated_service.endpoint
else None,
)
except Exception as e:
raise RuntimeError(
f"Failed to stop model server with UUID {uuid}: {e}"
) from e
@abstractmethod
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Abstract method to start a model server.
Args:
service: The service to start.
timeout: timeout in seconds to wait for the service to start. If
set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
"""
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Abstract method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to start. If
set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
RuntimeError: if the model server is not found.
"""
client = Client()
try:
service = self.find_model_server(service_uuid=uuid)[0]
updated_service = self.perform_start_model(service, timeout)
client.update_service(
id=updated_service.uuid,
admin_state=updated_service.admin_state,
status=updated_service.status.dict(),
endpoint=updated_service.endpoint.dict()
if updated_service.endpoint
else None,
)
except Exception as e:
raise RuntimeError(
f"Failed to start model server with UUID {uuid}: {e}"
) from e
@abstractmethod
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by `find_model_server`.
Args:
service: The service to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by `find_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
Raises:
RuntimeError: if the model server is not found.
"""
client = Client()
try:
service = self.find_model_server(service_uuid=uuid)[0]
self.perform_delete_model(service, timeout, force)
client.delete_service(uuid)
except Exception as e:
raise RuntimeError(
f"Failed to delete model server with UUID {uuid}: {e}"
) from e
def get_model_server_logs(
self,
uuid: UUID,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a model server.
Args:
uuid: UUID of the model server to get the logs of.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that yields the logs of the model server.
Raises:
RuntimeError: if the model server is not found.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
raise RuntimeError(f"No model server found with UUID {uuid}")
return services[0].get_logs(follow=follow, tail=tail)
def load_service(
self,
service_id: UUID,
) -> BaseService:
"""Load a service from a URI.
Args:
service_id: The ID of the service to load.
Returns:
The loaded service.
"""
client = Client()
service = client.get_service(service_id)
return BaseDeploymentService.from_model(service)
config: BaseModelDeployerConfig
property
readonly
Returns the BaseModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
BaseModelDeployerConfig |
The configuration. |
delete_model_server(self, uuid, timeout=300, force=False)
Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by find_model_server
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model server is not found. |
Source code in zenml/model_deployers/base_model_deployer.py
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by `find_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
Raises:
RuntimeError: if the model server is not found.
"""
client = Client()
try:
service = self.find_model_server(service_uuid=uuid)[0]
self.perform_delete_model(service, timeout, force)
client.delete_service(uuid)
except Exception as e:
raise RuntimeError(
f"Failed to delete model server with UUID {uuid}: {e}"
) from e
deploy_model(self, config, service_type, replace=False, continuous_deployment_mode=False, timeout=300)
Deploy a model.
the deploy_model method is the main entry point for deploying models using the model deployer. It is used to deploy a model to a model server instance that is running on a remote serving platform or service. The method is responsible for detecting if there is an existing model server instance running serving one or more previous versions of the same model and deploying the model to the serving platform or updating the existing model server instance to include the new model version. The method returns a Service object that is a representation of the external model server instance. The Service object must implement basic operational state tracking and lifecycle management operations for the model server (e.g. start, stop, etc.).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc. |
required |
replace |
bool |
If True, it will replace any existing model server instances that serve the same model. If False, it does not replace any existing model server instance. |
False |
continuous_deployment_mode |
bool |
If True, it will replace any existing model server instances that serve the same model, regardless of the configuration. If False, it will only replace existing model server instances that serve the same model if the configuration is exactly the same. |
False |
timeout |
int |
The maximum time in seconds to wait for the model server to start serving the model. |
300 |
service_type |
ServiceType |
The type of the service to deploy. If not provided, the default service type of the model deployer will be used. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model deployment fails. |
Returns:
Type | Description |
---|---|
BaseService |
The deployment Service object. |
Source code in zenml/model_deployers/base_model_deployer.py
def deploy_model(
self,
config: ServiceConfig,
service_type: ServiceType,
replace: bool = False,
continuous_deployment_mode: bool = False,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Deploy a model.
the deploy_model method is the main entry point for deploying models
using the model deployer. It is used to deploy a model to a model server
instance that is running on a remote serving platform or service. The
method is responsible for detecting if there is an existing model server
instance running serving one or more previous versions of the same model
and deploying the model to the serving platform or updating the existing
model server instance to include the new model version. The method
returns a Service object that is a representation of the external model
server instance. The Service object must implement basic operational
state tracking and lifecycle management operations for the model server
(e.g. start, stop, etc.).
Args:
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
replace: If True, it will replace any existing model server instances
that serve the same model. If False, it does not replace any
existing model server instance.
continuous_deployment_mode: If True, it will replace any existing
model server instances that serve the same model, regardless of
the configuration. If False, it will only replace existing model
server instances that serve the same model if the configuration
is exactly the same.
timeout: The maximum time in seconds to wait for the model server
to start serving the model.
service_type: The type of the service to deploy. If not provided,
the default service type of the model deployer will be used.
Raises:
RuntimeError: if the model deployment fails.
Returns:
The deployment Service object.
"""
# Instantiate the client
client = Client()
if not continuous_deployment_mode:
# Find existing model server
services = self.find_model_server(
config=config.dict(),
service_type=service_type,
)
if len(services) > 0:
logger.info(
f"Existing model server found for {config.name or config.model_name} with the exact same configuration. Returning the existing service named {services[0].config.service_name}."
)
return services[0]
else:
# Find existing model server
services = self.find_model_server(
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
service_type=service_type,
)
if len(services) > 0:
logger.info(
f"Existing model server found for {config.pipeline_name} and {config.pipeline_step_name}, since continuous deployment mode is enabled, replacing the existing service named {services[0].config.service_name}."
)
service = services[0]
self.delete_model_server(service.uuid)
logger.info(
f"Deploying model server for {config.model_name} with the following configuration: {config.dict()}"
)
service_response = client.create_service(
config=config,
service_type=service_type,
model_version_id=get_model_version_id_if_exists(
config.model_name, config.model_version
),
)
try:
service = self.perform_deploy_model(
id=service_response.id,
config=config,
timeout=timeout,
)
except Exception as e:
client.delete_service(service_response.id)
raise RuntimeError(
f"Failed to deploy model server for {config.model_name}: {e}"
) from e
# Update the service in store
client.update_service(
id=service.uuid,
name=service.config.service_name,
service_source=service.dict().get("type"),
admin_state=service.admin_state,
status=service.status.dict(),
endpoint=service.endpoint.dict() if service.endpoint else None,
# labels=service.config.get_service_labels() # TODO: fix labels in services and config
prediction_url=service.get_prediction_url(),
health_check_url=service.get_healthcheck_url(),
)
return service
find_model_server(self, config=None, running=None, service_uuid=None, pipeline_name=None, pipeline_step_name=None, service_name=None, model_name=None, model_version=None, service_type=None, type=None, flavor=None, pipeline_run_id=None)
Abstract method to find one or more a model servers that match the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
Optional[bool] |
If true, only running services will be returned. |
None |
service_uuid |
Optional[uuid.UUID] |
The UUID of the service that was originally used to deploy the model. |
None |
pipeline_step_name |
Optional[str] |
The name of the pipeline step that was originally used to deploy the model. |
None |
pipeline_name |
Optional[str] |
The name of the pipeline that was originally used to deploy the model from the model registry. |
None |
model_name |
Optional[str] |
The name of the model that was originally used to deploy the model from the model registry. |
None |
model_version |
Optional[str] |
The version of the model that was originally used to deploy the model from the model registry. |
None |
service_type |
Optional[zenml.services.service_type.ServiceType] |
The type of the service to find. |
None |
type |
Optional[str] |
The type of the service to find. |
None |
flavor |
Optional[str] |
The flavor of the service to find. |
None |
pipeline_run_id |
Optional[str] |
The UUID of the pipeline run that was originally used to deploy the model. |
None |
config |
Optional[Dict[str, Any]] |
Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc. |
None |
service_name |
Optional[str] |
The name of the service to find. |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Service objects representing model servers that match the input search criteria. |
Source code in zenml/model_deployers/base_model_deployer.py
def find_model_server(
self,
config: Optional[Dict[str, Any]] = None,
running: Optional[bool] = None,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
service_name: Optional[str] = None,
model_name: Optional[str] = None,
model_version: Optional[str] = None,
service_type: Optional[ServiceType] = None,
type: Optional[str] = None,
flavor: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
) -> List[BaseService]:
"""Abstract method to find one or more a model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_step_name: The name of the pipeline step that was originally used
to deploy the model.
pipeline_name: The name of the pipeline that was originally used to deploy
the model from the model registry.
model_name: The name of the model that was originally used to deploy
the model from the model registry.
model_version: The version of the model that was originally used to
deploy the model from the model registry.
service_type: The type of the service to find.
type: The type of the service to find.
flavor: The flavor of the service to find.
pipeline_run_id: The UUID of the pipeline run that was originally used
to deploy the model.
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
service_name: The name of the service to find.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
client = Client()
service_responses = client.list_services(
sort_by="desc:created",
id=service_uuid,
running=running,
service_name=service_name,
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_version_id=get_model_version_id_if_exists(
model_name, model_version
),
pipeline_run_id=pipeline_run_id,
config=config,
type=type or service_type.type if service_type else None,
flavor=flavor or service_type.flavor if service_type else None,
hydrate=True,
)
services = []
for service_response in service_responses.items:
if not service_response.service_source:
client.delete_service(service_response.id)
continue
service = BaseDeploymentService.from_model(service_response)
service.update_status()
if service.status.dict() != service_response.status:
client.update_service(
id=service.uuid,
admin_state=service.admin_state,
status=service.status.dict(),
endpoint=service.endpoint.dict()
if service.endpoint
else None,
)
if running and not service.is_running:
logger.warning(
f"Service {service.uuid} is in an unexpected state. "
f"Expected running={running}, but found running={service.is_running}."
)
continue
services.append(service)
return services
get_active_model_deployer()
classmethod
Get the model deployer registered in the active stack.
Returns:
Type | Description |
---|---|
BaseModelDeployer |
The model deployer registered in the active stack. |
Exceptions:
Type | Description |
---|---|
TypeError |
if a model deployer is not part of the active stack. |
Source code in zenml/model_deployers/base_model_deployer.py
@classmethod
def get_active_model_deployer(cls) -> "BaseModelDeployer":
"""Get the model deployer registered in the active stack.
Returns:
The model deployer registered in the active stack.
Raises:
TypeError: if a model deployer is not part of the
active stack.
"""
flavor: BaseModelDeployerFlavor = cls.FLAVOR()
client = Client()
model_deployer = client.active_stack.model_deployer
if not model_deployer or not isinstance(model_deployer, cls):
raise TypeError(
f"The active stack needs to have a {cls.NAME} model "
f"deployer component registered to be able deploy models "
f"with {cls.NAME}. You can create a new stack with "
f"a {cls.NAME} model deployer component or update your "
f"active stack to add this component, e.g.:\n\n"
f" `zenml model-deployer register {flavor.name} "
f"--flavor={flavor.name} ...`\n"
f" `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
f" or:\n"
f" `zenml stack update -d {flavor.name}`\n\n"
)
return model_deployer
get_model_server_info(service)
staticmethod
Give implementation specific way to extract relevant model server properties for the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
Integration-specific service instance |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the relevant model server properties. |
Source code in zenml/model_deployers/base_model_deployer.py
@staticmethod
@abstractmethod
def get_model_server_info(
service: BaseService,
) -> Dict[str, Optional[str]]:
"""Give implementation specific way to extract relevant model server properties for the user.
Args:
service: Integration-specific service instance
Returns:
A dictionary containing the relevant model server properties.
"""
get_model_server_logs(self, uuid, follow=False, tail=None)
Get the logs of a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to get the logs of. |
required |
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that yields the logs of the model server. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model server is not found. |
Source code in zenml/model_deployers/base_model_deployer.py
def get_model_server_logs(
self,
uuid: UUID,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a model server.
Args:
uuid: UUID of the model server to get the logs of.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that yields the logs of the model server.
Raises:
RuntimeError: if the model server is not found.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
raise RuntimeError(f"No model server found with UUID {uuid}")
return services[0].get_logs(follow=follow, tail=tail)
load_service(self, service_id)
Load a service from a URI.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_id |
UUID |
The ID of the service to load. |
required |
Returns:
Type | Description |
---|---|
BaseService |
The loaded service. |
Source code in zenml/model_deployers/base_model_deployer.py
def load_service(
self,
service_id: UUID,
) -> BaseService:
"""Load a service from a URI.
Args:
service_id: The ID of the service to load.
Returns:
The loaded service.
"""
client = Client()
service = client.get_service(service_id)
return BaseDeploymentService.from_model(service)
perform_delete_model(self, service, timeout=300, force=False)
Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by find_model_server
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to delete. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by `find_model_server`.
Args:
service: The service to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
perform_deploy_model(self, id, config, timeout=300)
Abstract method to deploy a model.
Concrete model deployer subclasses must implement the following functionality in this method: - Detect if there is an existing model server instance running serving one or more previous versions of the same model - Deploy the model to the serving platform or update the existing model server instance to include the new model version - Return a Service object that is a representation of the external model server instance. The Service must implement basic operational state tracking and lifecycle management operations for the model server (e.g. start, stop, etc.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
UUID of the service that was originally used to deploy the model. |
required |
config |
ServiceConfig |
Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc. |
required |
timeout |
int |
The maximum time in seconds to wait for the model server to start serving the model. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The deployment Service object. |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Abstract method to deploy a model.
Concrete model deployer subclasses must implement the following
functionality in this method:
- Detect if there is an existing model server instance running serving
one or more previous versions of the same model
- Deploy the model to the serving platform or update the existing model
server instance to include the new model version
- Return a Service object that is a representation of the external model
server instance. The Service must implement basic operational state
tracking and lifecycle management operations for the model server (e.g.
start, stop, etc.)
Args:
id: UUID of the service that was originally used to deploy the model.
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
timeout: The maximum time in seconds to wait for the model server
to start serving the model.
Returns:
The deployment Service object.
"""
perform_start_model(self, service, timeout=300)
Abstract method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to start. If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Abstract method to start a model server.
Args:
service: The service to start.
timeout: timeout in seconds to wait for the service to start. If
set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
"""
perform_stop_model(self, service, timeout=300, force=False)
Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by find_model_server
and
it should be possible to start it again by calling start_model_server
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by `find_model_server` and
it should be possible to start it again by calling `start_model_server`.
Args:
service: The service to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
start_model_server(self, uuid, timeout=300)
Abstract method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to start. If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model server is not found. |
Source code in zenml/model_deployers/base_model_deployer.py
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Abstract method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to start. If
set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
RuntimeError: if the model server is not found.
"""
client = Client()
try:
service = self.find_model_server(service_uuid=uuid)[0]
updated_service = self.perform_start_model(service, timeout)
client.update_service(
id=updated_service.uuid,
admin_state=updated_service.admin_state,
status=updated_service.status.dict(),
endpoint=updated_service.endpoint.dict()
if updated_service.endpoint
else None,
)
except Exception as e:
raise RuntimeError(
f"Failed to start model server with UUID {uuid}: {e}"
) from e
stop_model_server(self, uuid, timeout=300, force=False)
Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by find_model_server
and
it should be possible to start it again by calling start_model_server
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model server is not found. |
Source code in zenml/model_deployers/base_model_deployer.py
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by `find_model_server` and
it should be possible to start it again by calling `start_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
Raises:
RuntimeError: if the model server is not found.
"""
client = Client()
try:
service = self.find_model_server(service_uuid=uuid)[0]
updated_service = self.perform_stop_model(service, timeout, force)
client.update_service(
id=updated_service.uuid,
admin_state=updated_service.admin_state,
status=updated_service.status.dict(),
endpoint=updated_service.endpoint.dict()
if updated_service.endpoint
else None,
)
except Exception as e:
raise RuntimeError(
f"Failed to stop model server with UUID {uuid}: {e}"
) from e
BaseModelDeployerConfig (StackComponentConfig)
pydantic-model
Base config for all model deployers.
Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerConfig(StackComponentConfig):
"""Base config for all model deployers."""
BaseModelDeployerFlavor (Flavor)
Base class for model deployer flavors.
Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerFlavor(Flavor):
"""Base class for model deployer flavors."""
@property
def type(self) -> StackComponentType:
"""Returns the flavor type.
Returns:
The flavor type.
"""
return StackComponentType.MODEL_DEPLOYER
@property
def config_class(self) -> Type[BaseModelDeployerConfig]:
"""Returns `BaseModelDeployerConfig` config class.
Returns:
The config class.
"""
return BaseModelDeployerConfig
@property
@abstractmethod
def implementation_class(self) -> Type[BaseModelDeployer]:
"""The class that implements the model deployer."""
config_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig]
property
readonly
Returns BaseModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig] |
The config class. |
implementation_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployer]
property
readonly
The class that implements the model deployer.
type: StackComponentType
property
readonly
Returns the flavor type.
Returns:
Type | Description |
---|---|
StackComponentType |
The flavor type. |
get_model_version_id_if_exists(model_name, model_version)
Get the model version id if it exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
Optional[str] |
The name of the model. |
required |
model_version |
Optional[str] |
The version of the model. |
required |
Returns:
Type | Description |
---|---|
Optional[uuid.UUID] |
The model version id if it exists. |
Source code in zenml/model_deployers/base_model_deployer.py
def get_model_version_id_if_exists(
model_name: Optional[str],
model_version: Optional[str],
) -> Optional[UUID]:
"""Get the model version id if it exists.
Args:
model_name: The name of the model.
model_version: The version of the model.
Returns:
The model version id if it exists.
"""
client = Client()
if model_name:
with contextlib.suppress(KeyError):
return client.get_model_version(
model_name_or_id=model_name,
model_version_name_or_number_or_id=model_version,
).id
return None