Skip to content

Model Deployers

zenml.model_deployers special

Model deployers are stack components responsible for online model serving.

Online serving is the process of hosting and loading machine-learning models as part of a managed web service and providing access to the models through an API endpoint like HTTP or GRPC. Once deployed, you can send inference requests to the model through the web service's API and receive fast, low-latency responses.

Add a model deployer to your ZenML stack to be able to implement continuous model deployment pipelines that train models and continuously deploy them to a model prediction web service.

When present in a stack, the model deployer also acts as a registry for models that are served with ZenML. You can use the model deployer to list all models that are currently deployed for online inference or filtered according to a particular pipeline run or step, or to suspend, resume or delete an external model server managed through ZenML.

base_model_deployer

Base class for all ZenML model deployers.

BaseModelDeployer (StackComponent, ABC)

Base class for all ZenML model deployers.

The model deployer serves three major purposes:

  1. It contains all the stack related configuration attributes required to interact with the remote model serving tool, service or platform (e.g. hostnames, URLs, references to credentials, other client related configuration parameters).

  2. It implements the continuous deployment logic necessary to deploy models in a way that updates an existing model server that is already serving a previous version of the same model instead of creating a new model server for every new model version (see the deploy_model abstract method). This functionality can be consumed directly from ZenML pipeline steps, but it can also be used outside the pipeline to deploy ad hoc models. It is also usually coupled with a standard model deployer step, implemented by each integration, that hides the details of the deployment process away from the user.

  3. It acts as a ZenML BaseService registry, where every BaseService instance is used as an internal representation of a remote model server (see the find_model_server abstract method). To achieve this, it must be able to re-create the configuration of a BaseService from information that is persisted externally, alongside or even part of the remote model server configuration itself. For example, for model servers that are implemented as Kubernetes resources, the BaseService instances can be serialized and saved as Kubernetes resource annotations. This allows the model deployer to keep track of all externally running model servers and to re-create their corresponding BaseService instance representations at any given time. The model deployer also defines methods that implement basic life-cycle management on remote model servers outside the coverage of a pipeline (see stop_model_server, start_model_server and delete_model_server).

Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployer(StackComponent, ABC):
    """Base class for all ZenML model deployers.

    The model deployer serves three major purposes:

    1. It contains all the stack related configuration attributes required to
    interact with the remote model serving tool, service or platform (e.g.
    hostnames, URLs, references to credentials, other client related
    configuration parameters).

    2. It implements the continuous deployment logic necessary to deploy models
    in a way that updates an existing model server that is already serving a
    previous version of the same model instead of creating a new model server
    for every new model version (see the `deploy_model` abstract method).
    This functionality can be consumed directly from ZenML pipeline steps, but
    it can also be used outside the pipeline to deploy ad hoc models. It is
    also usually coupled with a standard model deployer step, implemented by
    each integration, that hides the details of the deployment process away from
    the user.

    3. It acts as a ZenML BaseService registry, where every BaseService instance
    is used as an internal representation of a remote model server (see the
    `find_model_server` abstract method). To achieve this, it must be able to
    re-create the configuration of a BaseService from information that is
    persisted externally, alongside or even part of the remote model server
    configuration itself. For example, for model servers that are implemented as
    Kubernetes resources, the BaseService instances can be serialized and saved
    as Kubernetes resource annotations. This allows the model deployer to keep
    track of all externally running model servers and to re-create their
    corresponding BaseService instance representations at any given time.
    The model deployer also defines methods that implement basic life-cycle
    management on remote model servers outside the coverage of a pipeline
    (see `stop_model_server`, `start_model_server` and `delete_model_server`).
    """

    NAME: ClassVar[str]
    FLAVOR: ClassVar[Type["BaseModelDeployerFlavor"]]

    @property
    def config(self) -> BaseModelDeployerConfig:
        """Returns the `BaseModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(BaseModelDeployerConfig, self._config)

    @classmethod
    def get_active_model_deployer(cls) -> "BaseModelDeployer":
        """Get the model deployer registered in the active stack.

        Returns:
            The model deployer registered in the active stack.

        Raises:
            TypeError: if a model deployer is not part of the
                active stack.
        """
        flavor: BaseModelDeployerFlavor = cls.FLAVOR()
        client = Client()
        model_deployer = client.active_stack.model_deployer
        if not model_deployer or not isinstance(model_deployer, cls):
            raise TypeError(
                f"The active stack needs to have a {cls.NAME} model "
                f"deployer component registered to be able deploy models "
                f"with {cls.NAME}. You can create a new stack with "
                f"a {cls.NAME} model deployer component or update your "
                f"active stack to add this component, e.g.:\n\n"
                f"  `zenml model-deployer register {flavor.name} "
                f"--flavor={flavor.name} ...`\n"
                f"  `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
                f"  or:\n"
                f"  `zenml stack update -d {flavor.name}`\n\n"
            )

        return model_deployer

    @abstractmethod
    def deploy_model(
        self,
        config: ServiceConfig,
        replace: bool = False,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Abstract method to deploy a model.

        Concrete model deployer subclasses must implement the following
        functionality in this method:
        - Detect if there is an existing model server instance running serving
        one or more previous versions of the same model
        - Deploy the model to the serving platform or update the existing model
        server instance to include the new model version
        - Return a Service object that is a representation of the external model
        server instance. The Service must implement basic operational state
        tracking and lifecycle management operations for the model server (e.g.
        start, stop, etc.)

        Args:
            config: Custom Service configuration parameters for the model
                deployer. Can include the pipeline name, the run id, the step
                name, the model name, the model uri, the model type etc.
            replace: If True, it will replace any existing model server instances
                that serve the same model. If False, it does not replace any
                existing model server instance.
            timeout: The maximum time in seconds to wait for the model server
                to start serving the model.

        Returns:
            The deployment Service object.
        """

    @staticmethod
    @abstractmethod
    def get_model_server_info(
        service: BaseService,
    ) -> Dict[str, Optional[str]]:
        """Give implementation specific way to extract relevant model server properties for the user.

        Args:
            service: Integration-specific service instance

        Returns:
            A dictionary containing the relevant model server properties.
        """

    @abstractmethod
    def find_model_server(
        self,
        running: bool = False,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        pipeline_run_id: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_uri: Optional[str] = None,
        model_type: Optional[str] = None,
    ) -> List[BaseService]:
        """Abstract method to find one or more a model servers that match the given criteria.

        Args:
            running: If true, only running services will be returned.
            service_uuid: The UUID of the service that was originally used
                to deploy the model.
            pipeline_name: name of the pipeline that the deployed model was part
                of.
            pipeline_run_id: ID of the pipeline run which the deployed model was
                part of.
            pipeline_step_name: the name of the pipeline model deployment step
                that deployed the model.
            model_name: the name of the deployed model.
            model_uri: URI of the deployed model.
            model_type: the implementation specific type/format of the deployed
                model.

        Returns:
            One or more Service objects representing model servers that match
            the input search criteria.
        """

    @abstractmethod
    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Abstract method to stop a model server.

        This operation should be reversible. A stopped model server should still
        show up in the list of model servers returned by `find_model_server` and
        it should be possible to start it again by calling `start_model_server`.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """

    @abstractmethod
    def start_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> None:
        """Abstract method to start a model server.

        Args:
            uuid: UUID of the model server to start.
            timeout: timeout in seconds to wait for the service to start. If
                set to 0, the method will return immediately after
                provisioning the service, without waiting for it to become
                active.
        """

    @abstractmethod
    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Abstract method to delete a model server.

        This operation is irreversible. A deleted model server must no longer
        show up in the list of model servers returned by `find_model_server`.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """

    def get_model_server_logs(
        self,
        uuid: UUID,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Get the logs of a model server.

        Args:
            uuid: UUID of the model server to get the logs of.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that yields the logs of the model server.

        Raises:
            RuntimeError: if the model server is not found.
        """
        services = self.find_model_server(service_uuid=uuid)
        if len(services) == 0:
            raise RuntimeError(f"No model server found with UUID {uuid}")
        return services[0].get_logs(follow=follow, tail=tail)

    def get_step_run_metadata(
        self, info: "StepRunInfo"
    ) -> Dict[str, "MetadataType"]:
        """Get component- and step-specific metadata after a step ran.

        For model deployers, this extracts the prediction URL of the deployed
        model.

        Args:
            info: Info about the step that was executed.

        Returns:
            A dictionary of metadata.
        """
        existing_services = self.find_model_server(
            pipeline_run_id=info.run_name,
        )
        if existing_services:
            existing_service = existing_services[0]
            if (
                isinstance(existing_service, BaseDeploymentService)
                and existing_service.is_running
            ):
                deployed_model_url = existing_service.prediction_url
                return {METADATA_DEPLOYED_MODEL_URL: Uri(deployed_model_url)}
        return {}
config: BaseModelDeployerConfig property readonly

Returns the BaseModelDeployerConfig config.

Returns:

Type Description
BaseModelDeployerConfig

The configuration.

delete_model_server(self, uuid, timeout=300, force=False)

Abstract method to delete a model server.

This operation is irreversible. A deleted model server must no longer show up in the list of model servers returned by find_model_server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Abstract method to delete a model server.

    This operation is irreversible. A deleted model server must no longer
    show up in the list of model servers returned by `find_model_server`.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
deploy_model(self, config, replace=False, timeout=300)

Abstract method to deploy a model.

Concrete model deployer subclasses must implement the following functionality in this method: - Detect if there is an existing model server instance running serving one or more previous versions of the same model - Deploy the model to the serving platform or update the existing model server instance to include the new model version - Return a Service object that is a representation of the external model server instance. The Service must implement basic operational state tracking and lifecycle management operations for the model server (e.g. start, stop, etc.)

Parameters:

Name Type Description Default
config ServiceConfig

Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc.

required
replace bool

If True, it will replace any existing model server instances that serve the same model. If False, it does not replace any existing model server instance.

False
timeout int

The maximum time in seconds to wait for the model server to start serving the model.

300

Returns:

Type Description
BaseService

The deployment Service object.

Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def deploy_model(
    self,
    config: ServiceConfig,
    replace: bool = False,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Abstract method to deploy a model.

    Concrete model deployer subclasses must implement the following
    functionality in this method:
    - Detect if there is an existing model server instance running serving
    one or more previous versions of the same model
    - Deploy the model to the serving platform or update the existing model
    server instance to include the new model version
    - Return a Service object that is a representation of the external model
    server instance. The Service must implement basic operational state
    tracking and lifecycle management operations for the model server (e.g.
    start, stop, etc.)

    Args:
        config: Custom Service configuration parameters for the model
            deployer. Can include the pipeline name, the run id, the step
            name, the model name, the model uri, the model type etc.
        replace: If True, it will replace any existing model server instances
            that serve the same model. If False, it does not replace any
            existing model server instance.
        timeout: The maximum time in seconds to wait for the model server
            to start serving the model.

    Returns:
        The deployment Service object.
    """
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)

Abstract method to find one or more a model servers that match the given criteria.

Parameters:

Name Type Description Default
running bool

If true, only running services will be returned.

False
service_uuid Optional[uuid.UUID]

The UUID of the service that was originally used to deploy the model.

None
pipeline_name Optional[str]

name of the pipeline that the deployed model was part of.

None
pipeline_run_id Optional[str]

ID of the pipeline run which the deployed model was part of.

None
pipeline_step_name Optional[str]

the name of the pipeline model deployment step that deployed the model.

None
model_name Optional[str]

the name of the deployed model.

None
model_uri Optional[str]

URI of the deployed model.

None
model_type Optional[str]

the implementation specific type/format of the deployed model.

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Service objects representing model servers that match the input search criteria.

Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def find_model_server(
    self,
    running: bool = False,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    pipeline_run_id: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_uri: Optional[str] = None,
    model_type: Optional[str] = None,
) -> List[BaseService]:
    """Abstract method to find one or more a model servers that match the given criteria.

    Args:
        running: If true, only running services will be returned.
        service_uuid: The UUID of the service that was originally used
            to deploy the model.
        pipeline_name: name of the pipeline that the deployed model was part
            of.
        pipeline_run_id: ID of the pipeline run which the deployed model was
            part of.
        pipeline_step_name: the name of the pipeline model deployment step
            that deployed the model.
        model_name: the name of the deployed model.
        model_uri: URI of the deployed model.
        model_type: the implementation specific type/format of the deployed
            model.

    Returns:
        One or more Service objects representing model servers that match
        the input search criteria.
    """
get_active_model_deployer() classmethod

Get the model deployer registered in the active stack.

Returns:

Type Description
BaseModelDeployer

The model deployer registered in the active stack.

Exceptions:

Type Description
TypeError

if a model deployer is not part of the active stack.

Source code in zenml/model_deployers/base_model_deployer.py
@classmethod
def get_active_model_deployer(cls) -> "BaseModelDeployer":
    """Get the model deployer registered in the active stack.

    Returns:
        The model deployer registered in the active stack.

    Raises:
        TypeError: if a model deployer is not part of the
            active stack.
    """
    flavor: BaseModelDeployerFlavor = cls.FLAVOR()
    client = Client()
    model_deployer = client.active_stack.model_deployer
    if not model_deployer or not isinstance(model_deployer, cls):
        raise TypeError(
            f"The active stack needs to have a {cls.NAME} model "
            f"deployer component registered to be able deploy models "
            f"with {cls.NAME}. You can create a new stack with "
            f"a {cls.NAME} model deployer component or update your "
            f"active stack to add this component, e.g.:\n\n"
            f"  `zenml model-deployer register {flavor.name} "
            f"--flavor={flavor.name} ...`\n"
            f"  `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
            f"  or:\n"
            f"  `zenml stack update -d {flavor.name}`\n\n"
        )

    return model_deployer
get_model_server_info(service) staticmethod

Give implementation specific way to extract relevant model server properties for the user.

Parameters:

Name Type Description Default
service BaseService

Integration-specific service instance

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the relevant model server properties.

Source code in zenml/model_deployers/base_model_deployer.py
@staticmethod
@abstractmethod
def get_model_server_info(
    service: BaseService,
) -> Dict[str, Optional[str]]:
    """Give implementation specific way to extract relevant model server properties for the user.

    Args:
        service: Integration-specific service instance

    Returns:
        A dictionary containing the relevant model server properties.
    """
get_model_server_logs(self, uuid, follow=False, tail=None)

Get the logs of a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to get the logs of.

required
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that yields the logs of the model server.

Exceptions:

Type Description
RuntimeError

if the model server is not found.

Source code in zenml/model_deployers/base_model_deployer.py
def get_model_server_logs(
    self,
    uuid: UUID,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Get the logs of a model server.

    Args:
        uuid: UUID of the model server to get the logs of.
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that yields the logs of the model server.

    Raises:
        RuntimeError: if the model server is not found.
    """
    services = self.find_model_server(service_uuid=uuid)
    if len(services) == 0:
        raise RuntimeError(f"No model server found with UUID {uuid}")
    return services[0].get_logs(follow=follow, tail=tail)
get_step_run_metadata(self, info)

Get component- and step-specific metadata after a step ran.

For model deployers, this extracts the prediction URL of the deployed model.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in zenml/model_deployers/base_model_deployer.py
def get_step_run_metadata(
    self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
    """Get component- and step-specific metadata after a step ran.

    For model deployers, this extracts the prediction URL of the deployed
    model.

    Args:
        info: Info about the step that was executed.

    Returns:
        A dictionary of metadata.
    """
    existing_services = self.find_model_server(
        pipeline_run_id=info.run_name,
    )
    if existing_services:
        existing_service = existing_services[0]
        if (
            isinstance(existing_service, BaseDeploymentService)
            and existing_service.is_running
        ):
            deployed_model_url = existing_service.prediction_url
            return {METADATA_DEPLOYED_MODEL_URL: Uri(deployed_model_url)}
    return {}
start_model_server(self, uuid, timeout=300)

Abstract method to start a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

timeout in seconds to wait for the service to start. If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

300
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def start_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
    """Abstract method to start a model server.

    Args:
        uuid: UUID of the model server to start.
        timeout: timeout in seconds to wait for the service to start. If
            set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.
    """
stop_model_server(self, uuid, timeout=300, force=False)

Abstract method to stop a model server.

This operation should be reversible. A stopped model server should still show up in the list of model servers returned by find_model_server and it should be possible to start it again by calling start_model_server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Abstract method to stop a model server.

    This operation should be reversible. A stopped model server should still
    show up in the list of model servers returned by `find_model_server` and
    it should be possible to start it again by calling `start_model_server`.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """

BaseModelDeployerConfig (StackComponentConfig) pydantic-model

Base config for all model deployers.

Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerConfig(StackComponentConfig):
    """Base config for all model deployers."""

BaseModelDeployerFlavor (Flavor)

Base class for model deployer flavors.

Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerFlavor(Flavor):
    """Base class for model deployer flavors."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.

        Returns:
            The flavor type.
        """
        return StackComponentType.MODEL_DEPLOYER

    @property
    def config_class(self) -> Type[BaseModelDeployerConfig]:
        """Returns `BaseModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return BaseModelDeployerConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type[BaseModelDeployer]:
        """The class that implements the model deployer."""
config_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig] property readonly

Returns BaseModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig]

The config class.

implementation_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployer] property readonly

The class that implements the model deployer.

type: StackComponentType property readonly

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.