Model Deployers
zenml.model_deployers
special
Model deployers are stack components responsible for online model serving.
Online serving is the process of hosting and loading machine-learning models as part of a managed web service and providing access to the models through an API endpoint like HTTP or GRPC. Once deployed, you can send inference requests to the model through the web service's API and receive fast, low-latency responses.
Add a model deployer to your ZenML stack to be able to implement continuous model deployment pipelines that train models and continuously deploy them to a model prediction web service.
When present in a stack, the model deployer also acts as a registry for models that are served with ZenML. You can use the model deployer to list all models that are currently deployed for online inference or filtered according to a particular pipeline run or step, or to suspend, resume or delete an external model server managed through ZenML.
base_model_deployer
Base class for all ZenML model deployers.
BaseModelDeployer (StackComponent, ABC)
Base class for all ZenML model deployers.
The model deployer serves three major purposes:
-
It contains all the stack related configuration attributes required to interact with the remote model serving tool, service or platform (e.g. hostnames, URLs, references to credentials, other client related configuration parameters).
-
It implements the continuous deployment logic necessary to deploy models in a way that updates an existing model server that is already serving a previous version of the same model instead of creating a new model server for every new model version (see the
deploy_model
abstract method). This functionality can be consumed directly from ZenML pipeline steps, but it can also be used outside the pipeline to deploy ad hoc models. It is also usually coupled with a standard model deployer step, implemented by each integration, that hides the details of the deployment process away from the user. -
It acts as a ZenML BaseService registry, where every BaseService instance is used as an internal representation of a remote model server (see the
find_model_server
abstract method). To achieve this, it must be able to re-create the configuration of a BaseService from information that is persisted externally, alongside or even part of the remote model server configuration itself. For example, for model servers that are implemented as Kubernetes resources, the BaseService instances can be serialized and saved as Kubernetes resource annotations. This allows the model deployer to keep track of all externally running model servers and to re-create their corresponding BaseService instance representations at any given time. The model deployer also defines methods that implement basic life-cycle management on remote model servers outside the coverage of a pipeline (seestop_model_server
,start_model_server
anddelete_model_server
).
Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployer(StackComponent, ABC):
"""Base class for all ZenML model deployers.
The model deployer serves three major purposes:
1. It contains all the stack related configuration attributes required to
interact with the remote model serving tool, service or platform (e.g.
hostnames, URLs, references to credentials, other client related
configuration parameters).
2. It implements the continuous deployment logic necessary to deploy models
in a way that updates an existing model server that is already serving a
previous version of the same model instead of creating a new model server
for every new model version (see the `deploy_model` abstract method).
This functionality can be consumed directly from ZenML pipeline steps, but
it can also be used outside the pipeline to deploy ad hoc models. It is
also usually coupled with a standard model deployer step, implemented by
each integration, that hides the details of the deployment process away from
the user.
3. It acts as a ZenML BaseService registry, where every BaseService instance
is used as an internal representation of a remote model server (see the
`find_model_server` abstract method). To achieve this, it must be able to
re-create the configuration of a BaseService from information that is
persisted externally, alongside or even part of the remote model server
configuration itself. For example, for model servers that are implemented as
Kubernetes resources, the BaseService instances can be serialized and saved
as Kubernetes resource annotations. This allows the model deployer to keep
track of all externally running model servers and to re-create their
corresponding BaseService instance representations at any given time.
The model deployer also defines methods that implement basic life-cycle
management on remote model servers outside the coverage of a pipeline
(see `stop_model_server`, `start_model_server` and `delete_model_server`).
"""
NAME: ClassVar[str]
FLAVOR: ClassVar[Type["BaseModelDeployerFlavor"]]
@property
def config(self) -> BaseModelDeployerConfig:
"""Returns the `BaseModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(BaseModelDeployerConfig, self._config)
@classmethod
def get_active_model_deployer(cls) -> "BaseModelDeployer":
"""Get the model deployer registered in the active stack.
Returns:
The model deployer registered in the active stack.
Raises:
TypeError: if a model deployer is not part of the
active stack.
"""
flavor: BaseModelDeployerFlavor = cls.FLAVOR()
client = Client()
model_deployer = client.active_stack.model_deployer
if not model_deployer or not isinstance(model_deployer, cls):
raise TypeError(
f"The active stack needs to have a {cls.NAME} model "
f"deployer component registered to be able deploy models "
f"with {cls.NAME}. You can create a new stack with "
f"a {cls.NAME} model deployer component or update your "
f"active stack to add this component, e.g.:\n\n"
f" `zenml model-deployer register {flavor.name} "
f"--flavor={flavor.name} ...`\n"
f" `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
f" or:\n"
f" `zenml stack update -d {flavor.name}`\n\n"
)
return model_deployer
@abstractmethod
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Abstract method to deploy a model.
Concrete model deployer subclasses must implement the following
functionality in this method:
- Detect if there is an existing model server instance running serving
one or more previous versions of the same model
- Deploy the model to the serving platform or update the existing model
server instance to include the new model version
- Return a Service object that is a representation of the external model
server instance. The Service must implement basic operational state
tracking and lifecycle management operations for the model server (e.g.
start, stop, etc.)
Args:
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
replace: If True, it will replace any existing model server instances
that serve the same model. If False, it does not replace any
existing model server instance.
timeout: The maximum time in seconds to wait for the model server
to start serving the model.
Returns:
The deployment Service object.
"""
@staticmethod
@abstractmethod
def get_model_server_info(
service: BaseService,
) -> Dict[str, Optional[str]]:
"""Give implementation specific way to extract relevant model server properties for the user.
Args:
service: Integration-specific service instance
Returns:
A dictionary containing the relevant model server properties.
"""
@abstractmethod
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
run_name: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Abstract method to find one or more a model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: name of the pipeline that the deployed model was part
of.
run_name: Name of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
model_type: the implementation specific type/format of the deployed
model.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
@abstractmethod
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by `find_model_server` and
it should be possible to start it again by calling `start_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
@abstractmethod
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Abstract method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to start. If
set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
"""
@abstractmethod
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by `find_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
def get_model_server_logs(
self,
uuid: UUID,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a model server.
Args:
uuid: UUID of the model server to get the logs of.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that yields the logs of the model server.
Raises:
RuntimeError: if the model server is not found.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
raise RuntimeError(f"No model server found with UUID {uuid}")
return services[0].get_logs(follow=follow, tail=tail)
def get_step_run_metadata(
self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
"""Get component- and step-specific metadata after a step ran.
For model deployers, this extracts the prediction URL of the deployed
model.
Args:
info: Info about the step that was executed.
Returns:
A dictionary of metadata.
"""
existing_services = self.find_model_server(
run_name=info.run_name,
)
if existing_services:
existing_service = existing_services[0]
if (
isinstance(existing_service, BaseDeploymentService)
and existing_service.is_running
):
deployed_model_url = existing_service.prediction_url
return {METADATA_DEPLOYED_MODEL_URL: Uri(deployed_model_url)}
return {}
config: BaseModelDeployerConfig
property
readonly
Returns the BaseModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
BaseModelDeployerConfig |
The configuration. |
delete_model_server(self, uuid, timeout=300, force=False)
Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by find_model_server
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to delete a model server.
This operation is irreversible. A deleted model server must no longer
show up in the list of model servers returned by `find_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
deploy_model(self, config, replace=False, timeout=300)
Abstract method to deploy a model.
Concrete model deployer subclasses must implement the following functionality in this method: - Detect if there is an existing model server instance running serving one or more previous versions of the same model - Deploy the model to the serving platform or update the existing model server instance to include the new model version - Return a Service object that is a representation of the external model server instance. The Service must implement basic operational state tracking and lifecycle management operations for the model server (e.g. start, stop, etc.)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc. |
required |
replace |
bool |
If True, it will replace any existing model server instances that serve the same model. If False, it does not replace any existing model server instance. |
False |
timeout |
int |
The maximum time in seconds to wait for the model server to start serving the model. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The deployment Service object. |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Abstract method to deploy a model.
Concrete model deployer subclasses must implement the following
functionality in this method:
- Detect if there is an existing model server instance running serving
one or more previous versions of the same model
- Deploy the model to the serving platform or update the existing model
server instance to include the new model version
- Return a Service object that is a representation of the external model
server instance. The Service must implement basic operational state
tracking and lifecycle management operations for the model server (e.g.
start, stop, etc.)
Args:
config: Custom Service configuration parameters for the model
deployer. Can include the pipeline name, the run id, the step
name, the model name, the model uri, the model type etc.
replace: If True, it will replace any existing model server instances
that serve the same model. If False, it does not replace any
existing model server instance.
timeout: The maximum time in seconds to wait for the model server
to start serving the model.
Returns:
The deployment Service object.
"""
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, run_name=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)
Abstract method to find one or more a model servers that match the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
bool |
If true, only running services will be returned. |
False |
service_uuid |
Optional[uuid.UUID] |
The UUID of the service that was originally used to deploy the model. |
None |
pipeline_name |
Optional[str] |
name of the pipeline that the deployed model was part of. |
None |
run_name |
Optional[str] |
Name of the pipeline run which the deployed model was part of. |
None |
pipeline_step_name |
Optional[str] |
the name of the pipeline model deployment step that deployed the model. |
None |
model_name |
Optional[str] |
the name of the deployed model. |
None |
model_uri |
Optional[str] |
URI of the deployed model. |
None |
model_type |
Optional[str] |
the implementation specific type/format of the deployed model. |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Service objects representing model servers that match the input search criteria. |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
run_name: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Abstract method to find one or more a model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: name of the pipeline that the deployed model was part
of.
run_name: Name of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
model_type: the implementation specific type/format of the deployed
model.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
"""
get_active_model_deployer()
classmethod
Get the model deployer registered in the active stack.
Returns:
Type | Description |
---|---|
BaseModelDeployer |
The model deployer registered in the active stack. |
Exceptions:
Type | Description |
---|---|
TypeError |
if a model deployer is not part of the active stack. |
Source code in zenml/model_deployers/base_model_deployer.py
@classmethod
def get_active_model_deployer(cls) -> "BaseModelDeployer":
"""Get the model deployer registered in the active stack.
Returns:
The model deployer registered in the active stack.
Raises:
TypeError: if a model deployer is not part of the
active stack.
"""
flavor: BaseModelDeployerFlavor = cls.FLAVOR()
client = Client()
model_deployer = client.active_stack.model_deployer
if not model_deployer or not isinstance(model_deployer, cls):
raise TypeError(
f"The active stack needs to have a {cls.NAME} model "
f"deployer component registered to be able deploy models "
f"with {cls.NAME}. You can create a new stack with "
f"a {cls.NAME} model deployer component or update your "
f"active stack to add this component, e.g.:\n\n"
f" `zenml model-deployer register {flavor.name} "
f"--flavor={flavor.name} ...`\n"
f" `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
f" or:\n"
f" `zenml stack update -d {flavor.name}`\n\n"
)
return model_deployer
get_model_server_info(service)
staticmethod
Give implementation specific way to extract relevant model server properties for the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
Integration-specific service instance |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the relevant model server properties. |
Source code in zenml/model_deployers/base_model_deployer.py
@staticmethod
@abstractmethod
def get_model_server_info(
service: BaseService,
) -> Dict[str, Optional[str]]:
"""Give implementation specific way to extract relevant model server properties for the user.
Args:
service: Integration-specific service instance
Returns:
A dictionary containing the relevant model server properties.
"""
get_model_server_logs(self, uuid, follow=False, tail=None)
Get the logs of a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to get the logs of. |
required |
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that yields the logs of the model server. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the model server is not found. |
Source code in zenml/model_deployers/base_model_deployer.py
def get_model_server_logs(
self,
uuid: UUID,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a model server.
Args:
uuid: UUID of the model server to get the logs of.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that yields the logs of the model server.
Raises:
RuntimeError: if the model server is not found.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
raise RuntimeError(f"No model server found with UUID {uuid}")
return services[0].get_logs(follow=follow, tail=tail)
get_step_run_metadata(self, info)
Get component- and step-specific metadata after a step ran.
For model deployers, this extracts the prediction URL of the deployed model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that was executed. |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
A dictionary of metadata. |
Source code in zenml/model_deployers/base_model_deployer.py
def get_step_run_metadata(
self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
"""Get component- and step-specific metadata after a step ran.
For model deployers, this extracts the prediction URL of the deployed
model.
Args:
info: Info about the step that was executed.
Returns:
A dictionary of metadata.
"""
existing_services = self.find_model_server(
run_name=info.run_name,
)
if existing_services:
existing_service = existing_services[0]
if (
isinstance(existing_service, BaseDeploymentService)
and existing_service.is_running
):
deployed_model_url = existing_service.prediction_url
return {METADATA_DEPLOYED_MODEL_URL: Uri(deployed_model_url)}
return {}
start_model_server(self, uuid, timeout=300)
Abstract method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to start. If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Abstract method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to start. If
set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
"""
stop_model_server(self, uuid, timeout=300, force=False)
Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by find_model_server
and
it should be possible to start it again by calling start_model_server
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Abstract method to stop a model server.
This operation should be reversible. A stopped model server should still
show up in the list of model servers returned by `find_model_server` and
it should be possible to start it again by calling `start_model_server`.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
BaseModelDeployerConfig (StackComponentConfig)
pydantic-model
Base config for all model deployers.
Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerConfig(StackComponentConfig):
"""Base config for all model deployers."""
BaseModelDeployerFlavor (Flavor)
Base class for model deployer flavors.
Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerFlavor(Flavor):
"""Base class for model deployer flavors."""
@property
def type(self) -> StackComponentType:
"""Returns the flavor type.
Returns:
The flavor type.
"""
return StackComponentType.MODEL_DEPLOYER
@property
def config_class(self) -> Type[BaseModelDeployerConfig]:
"""Returns `BaseModelDeployerConfig` config class.
Returns:
The config class.
"""
return BaseModelDeployerConfig
@property
@abstractmethod
def implementation_class(self) -> Type[BaseModelDeployer]:
"""The class that implements the model deployer."""
config_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig]
property
readonly
Returns BaseModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig] |
The config class. |
implementation_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployer]
property
readonly
The class that implements the model deployer.
type: StackComponentType
property
readonly
Returns the flavor type.
Returns:
Type | Description |
---|---|
StackComponentType |
The flavor type. |