Skip to content

Model Deployers

zenml.model_deployers special

Model deployers are stack components responsible for online model serving.

Online serving is the process of hosting and loading machine-learning models as part of a managed web service and providing access to the models through an API endpoint like HTTP or GRPC. Once deployed, you can send inference requests to the model through the web service's API and receive fast, low-latency responses.

Add a model deployer to your ZenML stack to be able to implement continuous model deployment pipelines that train models and continuously deploy them to a model prediction web service.

When present in a stack, the model deployer also acts as a registry for models that are served with ZenML. You can use the model deployer to list all models that are currently deployed for online inference or filtered according to a particular pipeline run or step, or to suspend, resume or delete an external model server managed through ZenML.

base_model_deployer

Base class for all ZenML model deployers.

BaseModelDeployer (StackComponent, ABC)

Base class for all ZenML model deployers.

The model deployer serves three major purposes:

  1. It contains all the stack related configuration attributes required to interact with the remote model serving tool, service or platform (e.g. hostnames, URLs, references to credentials, other client related configuration parameters).

  2. It implements the continuous deployment logic necessary to deploy models in a way that updates an existing model server that is already serving a previous version of the same model instead of creating a new model server for every new model version (see the deploy_model abstract method). This functionality can be consumed directly from ZenML pipeline steps, but it can also be used outside the pipeline to deploy ad hoc models. It is also usually coupled with a standard model deployer step, implemented by each integration, that hides the details of the deployment process away from the user.

  3. It acts as a ZenML BaseService registry, where every BaseService instance is used as an internal representation of a remote model server (see the find_model_server abstract method). To achieve this, it must be able to re-create the configuration of a BaseService from information that is persisted externally, alongside or even part of the remote model server configuration itself. For example, for model servers that are implemented as Kubernetes resources, the BaseService instances can be serialized and saved as Kubernetes resource annotations. This allows the model deployer to keep track of all externally running model servers and to re-create their corresponding BaseService instance representations at any given time. The model deployer also defines methods that implement basic life-cycle management on remote model servers outside the coverage of a pipeline (see stop_model_server, start_model_server and delete_model_server).

Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployer(StackComponent, ABC):
    """Base class for all ZenML model deployers.

    The model deployer serves three major purposes:

    1. It contains all the stack related configuration attributes required to
    interact with the remote model serving tool, service or platform (e.g.
    hostnames, URLs, references to credentials, other client related
    configuration parameters).

    2. It implements the continuous deployment logic necessary to deploy models
    in a way that updates an existing model server that is already serving a
    previous version of the same model instead of creating a new model server
    for every new model version (see the `deploy_model` abstract method).
    This functionality can be consumed directly from ZenML pipeline steps, but
    it can also be used outside the pipeline to deploy ad hoc models. It is
    also usually coupled with a standard model deployer step, implemented by
    each integration, that hides the details of the deployment process away from
    the user.

    3. It acts as a ZenML BaseService registry, where every BaseService instance
    is used as an internal representation of a remote model server (see the
    `find_model_server` abstract method). To achieve this, it must be able to
    re-create the configuration of a BaseService from information that is
    persisted externally, alongside or even part of the remote model server
    configuration itself. For example, for model servers that are implemented as
    Kubernetes resources, the BaseService instances can be serialized and saved
    as Kubernetes resource annotations. This allows the model deployer to keep
    track of all externally running model servers and to re-create their
    corresponding BaseService instance representations at any given time.
    The model deployer also defines methods that implement basic life-cycle
    management on remote model servers outside the coverage of a pipeline
    (see `stop_model_server`, `start_model_server` and `delete_model_server`).
    """

    NAME: ClassVar[str]
    FLAVOR: ClassVar[Type["BaseModelDeployerFlavor"]]

    @property
    def config(self) -> BaseModelDeployerConfig:
        """Returns the `BaseModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(BaseModelDeployerConfig, self._config)

    @classmethod
    def get_active_model_deployer(cls) -> "BaseModelDeployer":
        """Get the model deployer registered in the active stack.

        Returns:
            The model deployer registered in the active stack.

        Raises:
            TypeError: if a model deployer is not part of the
                active stack.
        """
        flavor: BaseModelDeployerFlavor = cls.FLAVOR()
        client = Client()
        model_deployer = client.active_stack.model_deployer
        if not model_deployer or not isinstance(model_deployer, cls):
            raise TypeError(
                f"The active stack needs to have a {cls.NAME} model "
                f"deployer component registered to be able deploy models "
                f"with {cls.NAME}. You can create a new stack with "
                f"a {cls.NAME} model deployer component or update your "
                f"active stack to add this component, e.g.:\n\n"
                f"  `zenml model-deployer register {flavor.name} "
                f"--flavor={flavor.name} ...`\n"
                f"  `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
                f"  or:\n"
                f"  `zenml stack update -d {flavor.name}`\n\n"
            )

        return model_deployer

    def deploy_model(
        self,
        config: ServiceConfig,
        service_type: ServiceType,
        replace: bool = False,
        continuous_deployment_mode: bool = False,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Deploy a model.

        the deploy_model method is the main entry point for deploying models
        using the model deployer. It is used to deploy a model to a model server
        instance that is running on a remote serving platform or service. The
        method is responsible for detecting if there is an existing model server
        instance running serving one or more previous versions of the same model
        and deploying the model to the serving platform or updating the existing
        model server instance to include the new model version. The method
        returns a Service object that is a representation of the external model
        server instance. The Service object must implement basic operational
        state tracking and lifecycle management operations for the model server
        (e.g. start, stop, etc.).

        Args:
            config: Custom Service configuration parameters for the model
                deployer. Can include the pipeline name, the run id, the step
                name, the model name, the model uri, the model type etc.
            replace: If True, it will replace any existing model server instances
                that serve the same model. If False, it does not replace any
                existing model server instance.
            continuous_deployment_mode: If True, it will replace any existing
                model server instances that serve the same model, regardless of
                the configuration. If False, it will only replace existing model
                server instances that serve the same model if the configuration
                is exactly the same.
            timeout: The maximum time in seconds to wait for the model server
                to start serving the model.
            service_type: The type of the service to deploy. If not provided,
                the default service type of the model deployer will be used.

        Raises:
            RuntimeError: if the model deployment fails.

        Returns:
            The deployment Service object.
        """
        # Instantiate the client
        client = Client()
        if not continuous_deployment_mode:
            # Find existing model server
            services = self.find_model_server(
                config=config.dict(),
                service_type=service_type,
            )
            if len(services) > 0:
                logger.info(
                    f"Existing model server found for {config.name or config.model_name} with the exact same configuration. Returning the existing service named {services[0].config.service_name}."
                )
                return services[0]
        else:
            # Find existing model server
            services = self.find_model_server(
                pipeline_name=config.pipeline_name,
                pipeline_step_name=config.pipeline_step_name,
                model_name=config.model_name,
                service_type=service_type,
            )
            if len(services) > 0:
                logger.info(
                    f"Existing model server found for {config.pipeline_name} and {config.pipeline_step_name}, since continuous deployment mode is enabled, replacing the existing service named {services[0].config.service_name}."
                )
                service = services[0]
                self.delete_model_server(service.uuid)
        logger.info(
            f"Deploying model server for {config.model_name} with the following configuration: {config.dict()}"
        )
        service_response = client.create_service(
            config=config,
            service_type=service_type,
            model_version_id=get_model_version_id_if_exists(
                config.model_name, config.model_version
            ),
        )
        try:
            service = self.perform_deploy_model(
                id=service_response.id,
                config=config,
                timeout=timeout,
            )
        except Exception as e:
            client.delete_service(service_response.id)
            raise RuntimeError(
                f"Failed to deploy model server for {config.model_name}: {e}"
            ) from e
        # Update the service in store
        client.update_service(
            id=service.uuid,
            name=service.config.service_name,
            service_source=service.dict().get("type"),
            admin_state=service.admin_state,
            status=service.status.dict(),
            endpoint=service.endpoint.dict() if service.endpoint else None,
            # labels=service.config.get_service_labels()  # TODO: fix labels in services and config
            prediction_url=service.get_prediction_url(),
            health_check_url=service.get_healthcheck_url(),
        )
        return service

    @abstractmethod
    def perform_deploy_model(
        self,
        id: UUID,
        config: ServiceConfig,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Abstract method to deploy a model.

        Concrete model deployer subclasses must implement the following
        functionality in this method:
        - Detect if there is an existing model server instance running serving
        one or more previous versions of the same model
        - Deploy the model to the serving platform or update the existing model
        server instance to include the new model version
        - Return a Service object that is a representation of the external model
        server instance. The Service must implement basic operational state
        tracking and lifecycle management operations for the model server (e.g.
        start, stop, etc.)

        Args:
            id: UUID of the service that was originally used to deploy the model.
            config: Custom Service configuration parameters for the model
                deployer. Can include the pipeline name, the run id, the step
                name, the model name, the model uri, the model type etc.
            timeout: The maximum time in seconds to wait for the model server
                to start serving the model.

        Returns:
            The deployment Service object.
        """

    @staticmethod
    @abstractmethod
    def get_model_server_info(
        service: BaseService,
    ) -> Dict[str, Optional[str]]:
        """Give implementation specific way to extract relevant model server properties for the user.

        Args:
            service: Integration-specific service instance

        Returns:
            A dictionary containing the relevant model server properties.
        """

    def find_model_server(
        self,
        config: Optional[Dict[str, Any]] = None,
        running: Optional[bool] = None,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        service_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_version: Optional[str] = None,
        service_type: Optional[ServiceType] = None,
        type: Optional[str] = None,
        flavor: Optional[str] = None,
        pipeline_run_id: Optional[str] = None,
    ) -> List[BaseService]:
        """Abstract method to find one or more a model servers that match the given criteria.

        Args:
            running: If true, only running services will be returned.
            service_uuid: The UUID of the service that was originally used
                to deploy the model.
            pipeline_step_name: The name of the pipeline step that was originally used
                to deploy the model.
            pipeline_name: The name of the pipeline that was originally used to deploy
                the model from the model registry.
            model_name: The name of the model that was originally used to deploy
                the model from the model registry.
            model_version: The version of the model that was originally used to
                deploy the model from the model registry.
            service_type: The type of the service to find.
            type: The type of the service to find.
            flavor: The flavor of the service to find.
            pipeline_run_id: The UUID of the pipeline run that was originally used
                to deploy the model.
            config: Custom Service configuration parameters for the model
                deployer. Can include the pipeline name, the run id, the step
                name, the model name, the model uri, the model type etc.
            service_name: The name of the service to find.

        Returns:
            One or more Service objects representing model servers that match
            the input search criteria.
        """
        client = Client()
        service_responses = client.list_services(
            sort_by="desc:created",
            id=service_uuid,
            running=running,
            service_name=service_name,
            pipeline_name=pipeline_name,
            pipeline_step_name=pipeline_step_name,
            model_version_id=get_model_version_id_if_exists(
                model_name, model_version
            ),
            pipeline_run_id=pipeline_run_id,
            config=config,
            type=type or service_type.type if service_type else None,
            flavor=flavor or service_type.flavor if service_type else None,
            hydrate=True,
        )
        services = []
        for service_response in service_responses.items:
            if not service_response.service_source:
                client.delete_service(service_response.id)
                continue
            service = BaseDeploymentService.from_model(service_response)
            service.update_status()
            if service.status.dict() != service_response.status:
                client.update_service(
                    id=service.uuid,
                    admin_state=service.admin_state,
                    status=service.status.dict(),
                    endpoint=service.endpoint.dict()
                    if service.endpoint
                    else None,
                )
            if running and not service.is_running:
                logger.warning(
                    f"Service {service.uuid} is in an unexpected state. "
                    f"Expected running={running}, but found running={service.is_running}."
                )
                continue
            services.append(service)
        return services

    @abstractmethod
    def perform_stop_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> BaseService:
        """Abstract method to stop a model server.

        This operation should be reversible. A stopped model server should still
        show up in the list of model servers returned by `find_model_server` and
        it should be possible to start it again by calling `start_model_server`.

        Args:
            service: The service to stop.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """

    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Abstract method to stop a model server.

        This operation should be reversible. A stopped model server should still
        show up in the list of model servers returned by `find_model_server` and
        it should be possible to start it again by calling `start_model_server`.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.

        Raises:
            RuntimeError: if the model server is not found.
        """
        client = Client()
        try:
            service = self.find_model_server(service_uuid=uuid)[0]
            updated_service = self.perform_stop_model(service, timeout, force)
            client.update_service(
                id=updated_service.uuid,
                admin_state=updated_service.admin_state,
                status=updated_service.status.dict(),
                endpoint=updated_service.endpoint.dict()
                if updated_service.endpoint
                else None,
            )
        except Exception as e:
            raise RuntimeError(
                f"Failed to stop model server with UUID {uuid}: {e}"
            ) from e

    @abstractmethod
    def perform_start_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Abstract method to start a model server.

        Args:
            service: The service to start.
            timeout: timeout in seconds to wait for the service to start. If
                set to 0, the method will return immediately after
                provisioning the service, without waiting for it to become
                active.
        """

    def start_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> None:
        """Abstract method to start a model server.

        Args:
            uuid: UUID of the model server to start.
            timeout: timeout in seconds to wait for the service to start. If
                set to 0, the method will return immediately after
                provisioning the service, without waiting for it to become
                active.

        Raises:
            RuntimeError: if the model server is not found.
        """
        client = Client()
        try:
            service = self.find_model_server(service_uuid=uuid)[0]
            updated_service = self.perform_start_model(service, timeout)
            client.update_service(
                id=updated_service.uuid,
                admin_state=updated_service.admin_state,
                status=updated_service.status.dict(),
                endpoint=updated_service.endpoint.dict()
                if updated_service.endpoint
                else None,
            )
        except Exception as e:
            raise RuntimeError(
                f"Failed to start model server with UUID {uuid}: {e}"
            ) from e

    @abstractmethod
    def perform_delete_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Abstract method to delete a model server.

        This operation is irreversible. A deleted model server must no longer
        show up in the list of model servers returned by `find_model_server`.

        Args:
            service: The service to delete.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """

    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Abstract method to delete a model server.

        This operation is irreversible. A deleted model server must no longer
        show up in the list of model servers returned by `find_model_server`.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.

        Raises:
            RuntimeError: if the model server is not found.
        """
        client = Client()
        try:
            service = self.find_model_server(service_uuid=uuid)[0]
            self.perform_delete_model(service, timeout, force)
            client.delete_service(uuid)
        except Exception as e:
            raise RuntimeError(
                f"Failed to delete model server with UUID {uuid}: {e}"
            ) from e

    def get_model_server_logs(
        self,
        uuid: UUID,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Get the logs of a model server.

        Args:
            uuid: UUID of the model server to get the logs of.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that yields the logs of the model server.

        Raises:
            RuntimeError: if the model server is not found.
        """
        services = self.find_model_server(service_uuid=uuid)
        if len(services) == 0:
            raise RuntimeError(f"No model server found with UUID {uuid}")
        return services[0].get_logs(follow=follow, tail=tail)

    def load_service(
        self,
        service_id: UUID,
    ) -> BaseService:
        """Load a service from a URI.

        Args:
            service_id: The ID of the service to load.

        Returns:
            The loaded service.
        """
        client = Client()
        service = client.get_service(service_id)
        return BaseDeploymentService.from_model(service)
config: BaseModelDeployerConfig property readonly

Returns the BaseModelDeployerConfig config.

Returns:

Type Description
BaseModelDeployerConfig

The configuration.

delete_model_server(self, uuid, timeout=300, force=False)

Abstract method to delete a model server.

This operation is irreversible. A deleted model server must no longer show up in the list of model servers returned by find_model_server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False

Exceptions:

Type Description
RuntimeError

if the model server is not found.

Source code in zenml/model_deployers/base_model_deployer.py
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Abstract method to delete a model server.

    This operation is irreversible. A deleted model server must no longer
    show up in the list of model servers returned by `find_model_server`.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.

    Raises:
        RuntimeError: if the model server is not found.
    """
    client = Client()
    try:
        service = self.find_model_server(service_uuid=uuid)[0]
        self.perform_delete_model(service, timeout, force)
        client.delete_service(uuid)
    except Exception as e:
        raise RuntimeError(
            f"Failed to delete model server with UUID {uuid}: {e}"
        ) from e
deploy_model(self, config, service_type, replace=False, continuous_deployment_mode=False, timeout=300)

Deploy a model.

the deploy_model method is the main entry point for deploying models using the model deployer. It is used to deploy a model to a model server instance that is running on a remote serving platform or service. The method is responsible for detecting if there is an existing model server instance running serving one or more previous versions of the same model and deploying the model to the serving platform or updating the existing model server instance to include the new model version. The method returns a Service object that is a representation of the external model server instance. The Service object must implement basic operational state tracking and lifecycle management operations for the model server (e.g. start, stop, etc.).

Parameters:

Name Type Description Default
config ServiceConfig

Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc.

required
replace bool

If True, it will replace any existing model server instances that serve the same model. If False, it does not replace any existing model server instance.

False
continuous_deployment_mode bool

If True, it will replace any existing model server instances that serve the same model, regardless of the configuration. If False, it will only replace existing model server instances that serve the same model if the configuration is exactly the same.

False
timeout int

The maximum time in seconds to wait for the model server to start serving the model.

300
service_type ServiceType

The type of the service to deploy. If not provided, the default service type of the model deployer will be used.

required

Exceptions:

Type Description
RuntimeError

if the model deployment fails.

Returns:

Type Description
BaseService

The deployment Service object.

Source code in zenml/model_deployers/base_model_deployer.py
def deploy_model(
    self,
    config: ServiceConfig,
    service_type: ServiceType,
    replace: bool = False,
    continuous_deployment_mode: bool = False,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Deploy a model.

    the deploy_model method is the main entry point for deploying models
    using the model deployer. It is used to deploy a model to a model server
    instance that is running on a remote serving platform or service. The
    method is responsible for detecting if there is an existing model server
    instance running serving one or more previous versions of the same model
    and deploying the model to the serving platform or updating the existing
    model server instance to include the new model version. The method
    returns a Service object that is a representation of the external model
    server instance. The Service object must implement basic operational
    state tracking and lifecycle management operations for the model server
    (e.g. start, stop, etc.).

    Args:
        config: Custom Service configuration parameters for the model
            deployer. Can include the pipeline name, the run id, the step
            name, the model name, the model uri, the model type etc.
        replace: If True, it will replace any existing model server instances
            that serve the same model. If False, it does not replace any
            existing model server instance.
        continuous_deployment_mode: If True, it will replace any existing
            model server instances that serve the same model, regardless of
            the configuration. If False, it will only replace existing model
            server instances that serve the same model if the configuration
            is exactly the same.
        timeout: The maximum time in seconds to wait for the model server
            to start serving the model.
        service_type: The type of the service to deploy. If not provided,
            the default service type of the model deployer will be used.

    Raises:
        RuntimeError: if the model deployment fails.

    Returns:
        The deployment Service object.
    """
    # Instantiate the client
    client = Client()
    if not continuous_deployment_mode:
        # Find existing model server
        services = self.find_model_server(
            config=config.dict(),
            service_type=service_type,
        )
        if len(services) > 0:
            logger.info(
                f"Existing model server found for {config.name or config.model_name} with the exact same configuration. Returning the existing service named {services[0].config.service_name}."
            )
            return services[0]
    else:
        # Find existing model server
        services = self.find_model_server(
            pipeline_name=config.pipeline_name,
            pipeline_step_name=config.pipeline_step_name,
            model_name=config.model_name,
            service_type=service_type,
        )
        if len(services) > 0:
            logger.info(
                f"Existing model server found for {config.pipeline_name} and {config.pipeline_step_name}, since continuous deployment mode is enabled, replacing the existing service named {services[0].config.service_name}."
            )
            service = services[0]
            self.delete_model_server(service.uuid)
    logger.info(
        f"Deploying model server for {config.model_name} with the following configuration: {config.dict()}"
    )
    service_response = client.create_service(
        config=config,
        service_type=service_type,
        model_version_id=get_model_version_id_if_exists(
            config.model_name, config.model_version
        ),
    )
    try:
        service = self.perform_deploy_model(
            id=service_response.id,
            config=config,
            timeout=timeout,
        )
    except Exception as e:
        client.delete_service(service_response.id)
        raise RuntimeError(
            f"Failed to deploy model server for {config.model_name}: {e}"
        ) from e
    # Update the service in store
    client.update_service(
        id=service.uuid,
        name=service.config.service_name,
        service_source=service.dict().get("type"),
        admin_state=service.admin_state,
        status=service.status.dict(),
        endpoint=service.endpoint.dict() if service.endpoint else None,
        # labels=service.config.get_service_labels()  # TODO: fix labels in services and config
        prediction_url=service.get_prediction_url(),
        health_check_url=service.get_healthcheck_url(),
    )
    return service
find_model_server(self, config=None, running=None, service_uuid=None, pipeline_name=None, pipeline_step_name=None, service_name=None, model_name=None, model_version=None, service_type=None, type=None, flavor=None, pipeline_run_id=None)

Abstract method to find one or more a model servers that match the given criteria.

Parameters:

Name Type Description Default
running Optional[bool]

If true, only running services will be returned.

None
service_uuid Optional[uuid.UUID]

The UUID of the service that was originally used to deploy the model.

None
pipeline_step_name Optional[str]

The name of the pipeline step that was originally used to deploy the model.

None
pipeline_name Optional[str]

The name of the pipeline that was originally used to deploy the model from the model registry.

None
model_name Optional[str]

The name of the model that was originally used to deploy the model from the model registry.

None
model_version Optional[str]

The version of the model that was originally used to deploy the model from the model registry.

None
service_type Optional[zenml.services.service_type.ServiceType]

The type of the service to find.

None
type Optional[str]

The type of the service to find.

None
flavor Optional[str]

The flavor of the service to find.

None
pipeline_run_id Optional[str]

The UUID of the pipeline run that was originally used to deploy the model.

None
config Optional[Dict[str, Any]]

Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc.

None
service_name Optional[str]

The name of the service to find.

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Service objects representing model servers that match the input search criteria.

Source code in zenml/model_deployers/base_model_deployer.py
def find_model_server(
    self,
    config: Optional[Dict[str, Any]] = None,
    running: Optional[bool] = None,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    service_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_version: Optional[str] = None,
    service_type: Optional[ServiceType] = None,
    type: Optional[str] = None,
    flavor: Optional[str] = None,
    pipeline_run_id: Optional[str] = None,
) -> List[BaseService]:
    """Abstract method to find one or more a model servers that match the given criteria.

    Args:
        running: If true, only running services will be returned.
        service_uuid: The UUID of the service that was originally used
            to deploy the model.
        pipeline_step_name: The name of the pipeline step that was originally used
            to deploy the model.
        pipeline_name: The name of the pipeline that was originally used to deploy
            the model from the model registry.
        model_name: The name of the model that was originally used to deploy
            the model from the model registry.
        model_version: The version of the model that was originally used to
            deploy the model from the model registry.
        service_type: The type of the service to find.
        type: The type of the service to find.
        flavor: The flavor of the service to find.
        pipeline_run_id: The UUID of the pipeline run that was originally used
            to deploy the model.
        config: Custom Service configuration parameters for the model
            deployer. Can include the pipeline name, the run id, the step
            name, the model name, the model uri, the model type etc.
        service_name: The name of the service to find.

    Returns:
        One or more Service objects representing model servers that match
        the input search criteria.
    """
    client = Client()
    service_responses = client.list_services(
        sort_by="desc:created",
        id=service_uuid,
        running=running,
        service_name=service_name,
        pipeline_name=pipeline_name,
        pipeline_step_name=pipeline_step_name,
        model_version_id=get_model_version_id_if_exists(
            model_name, model_version
        ),
        pipeline_run_id=pipeline_run_id,
        config=config,
        type=type or service_type.type if service_type else None,
        flavor=flavor or service_type.flavor if service_type else None,
        hydrate=True,
    )
    services = []
    for service_response in service_responses.items:
        if not service_response.service_source:
            client.delete_service(service_response.id)
            continue
        service = BaseDeploymentService.from_model(service_response)
        service.update_status()
        if service.status.dict() != service_response.status:
            client.update_service(
                id=service.uuid,
                admin_state=service.admin_state,
                status=service.status.dict(),
                endpoint=service.endpoint.dict()
                if service.endpoint
                else None,
            )
        if running and not service.is_running:
            logger.warning(
                f"Service {service.uuid} is in an unexpected state. "
                f"Expected running={running}, but found running={service.is_running}."
            )
            continue
        services.append(service)
    return services
get_active_model_deployer() classmethod

Get the model deployer registered in the active stack.

Returns:

Type Description
BaseModelDeployer

The model deployer registered in the active stack.

Exceptions:

Type Description
TypeError

if a model deployer is not part of the active stack.

Source code in zenml/model_deployers/base_model_deployer.py
@classmethod
def get_active_model_deployer(cls) -> "BaseModelDeployer":
    """Get the model deployer registered in the active stack.

    Returns:
        The model deployer registered in the active stack.

    Raises:
        TypeError: if a model deployer is not part of the
            active stack.
    """
    flavor: BaseModelDeployerFlavor = cls.FLAVOR()
    client = Client()
    model_deployer = client.active_stack.model_deployer
    if not model_deployer or not isinstance(model_deployer, cls):
        raise TypeError(
            f"The active stack needs to have a {cls.NAME} model "
            f"deployer component registered to be able deploy models "
            f"with {cls.NAME}. You can create a new stack with "
            f"a {cls.NAME} model deployer component or update your "
            f"active stack to add this component, e.g.:\n\n"
            f"  `zenml model-deployer register {flavor.name} "
            f"--flavor={flavor.name} ...`\n"
            f"  `zenml stack register <STACK-NAME> -d {flavor.name} ...`\n"
            f"  or:\n"
            f"  `zenml stack update -d {flavor.name}`\n\n"
        )

    return model_deployer
get_model_server_info(service) staticmethod

Give implementation specific way to extract relevant model server properties for the user.

Parameters:

Name Type Description Default
service BaseService

Integration-specific service instance

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the relevant model server properties.

Source code in zenml/model_deployers/base_model_deployer.py
@staticmethod
@abstractmethod
def get_model_server_info(
    service: BaseService,
) -> Dict[str, Optional[str]]:
    """Give implementation specific way to extract relevant model server properties for the user.

    Args:
        service: Integration-specific service instance

    Returns:
        A dictionary containing the relevant model server properties.
    """
get_model_server_logs(self, uuid, follow=False, tail=None)

Get the logs of a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to get the logs of.

required
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that yields the logs of the model server.

Exceptions:

Type Description
RuntimeError

if the model server is not found.

Source code in zenml/model_deployers/base_model_deployer.py
def get_model_server_logs(
    self,
    uuid: UUID,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Get the logs of a model server.

    Args:
        uuid: UUID of the model server to get the logs of.
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that yields the logs of the model server.

    Raises:
        RuntimeError: if the model server is not found.
    """
    services = self.find_model_server(service_uuid=uuid)
    if len(services) == 0:
        raise RuntimeError(f"No model server found with UUID {uuid}")
    return services[0].get_logs(follow=follow, tail=tail)
load_service(self, service_id)

Load a service from a URI.

Parameters:

Name Type Description Default
service_id UUID

The ID of the service to load.

required

Returns:

Type Description
BaseService

The loaded service.

Source code in zenml/model_deployers/base_model_deployer.py
def load_service(
    self,
    service_id: UUID,
) -> BaseService:
    """Load a service from a URI.

    Args:
        service_id: The ID of the service to load.

    Returns:
        The loaded service.
    """
    client = Client()
    service = client.get_service(service_id)
    return BaseDeploymentService.from_model(service)
perform_delete_model(self, service, timeout=300, force=False)

Abstract method to delete a model server.

This operation is irreversible. A deleted model server must no longer show up in the list of model servers returned by find_model_server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Abstract method to delete a model server.

    This operation is irreversible. A deleted model server must no longer
    show up in the list of model servers returned by `find_model_server`.

    Args:
        service: The service to delete.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
perform_deploy_model(self, id, config, timeout=300)

Abstract method to deploy a model.

Concrete model deployer subclasses must implement the following functionality in this method: - Detect if there is an existing model server instance running serving one or more previous versions of the same model - Deploy the model to the serving platform or update the existing model server instance to include the new model version - Return a Service object that is a representation of the external model server instance. The Service must implement basic operational state tracking and lifecycle management operations for the model server (e.g. start, stop, etc.)

Parameters:

Name Type Description Default
id UUID

UUID of the service that was originally used to deploy the model.

required
config ServiceConfig

Custom Service configuration parameters for the model deployer. Can include the pipeline name, the run id, the step name, the model name, the model uri, the model type etc.

required
timeout int

The maximum time in seconds to wait for the model server to start serving the model.

300

Returns:

Type Description
BaseService

The deployment Service object.

Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Abstract method to deploy a model.

    Concrete model deployer subclasses must implement the following
    functionality in this method:
    - Detect if there is an existing model server instance running serving
    one or more previous versions of the same model
    - Deploy the model to the serving platform or update the existing model
    server instance to include the new model version
    - Return a Service object that is a representation of the external model
    server instance. The Service must implement basic operational state
    tracking and lifecycle management operations for the model server (e.g.
    start, stop, etc.)

    Args:
        id: UUID of the service that was originally used to deploy the model.
        config: Custom Service configuration parameters for the model
            deployer. Can include the pipeline name, the run id, the step
            name, the model name, the model uri, the model type etc.
        timeout: The maximum time in seconds to wait for the model server
            to start serving the model.

    Returns:
        The deployment Service object.
    """
perform_start_model(self, service, timeout=300)

Abstract method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

timeout in seconds to wait for the service to start. If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

300
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Abstract method to start a model server.

    Args:
        service: The service to start.
        timeout: timeout in seconds to wait for the service to start. If
            set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.
    """
perform_stop_model(self, service, timeout=300, force=False)

Abstract method to stop a model server.

This operation should be reversible. A stopped model server should still show up in the list of model servers returned by find_model_server and it should be possible to start it again by calling start_model_server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/model_deployers/base_model_deployer.py
@abstractmethod
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Abstract method to stop a model server.

    This operation should be reversible. A stopped model server should still
    show up in the list of model servers returned by `find_model_server` and
    it should be possible to start it again by calling `start_model_server`.

    Args:
        service: The service to stop.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
start_model_server(self, uuid, timeout=300)

Abstract method to start a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

timeout in seconds to wait for the service to start. If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

300

Exceptions:

Type Description
RuntimeError

if the model server is not found.

Source code in zenml/model_deployers/base_model_deployer.py
def start_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
    """Abstract method to start a model server.

    Args:
        uuid: UUID of the model server to start.
        timeout: timeout in seconds to wait for the service to start. If
            set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.

    Raises:
        RuntimeError: if the model server is not found.
    """
    client = Client()
    try:
        service = self.find_model_server(service_uuid=uuid)[0]
        updated_service = self.perform_start_model(service, timeout)
        client.update_service(
            id=updated_service.uuid,
            admin_state=updated_service.admin_state,
            status=updated_service.status.dict(),
            endpoint=updated_service.endpoint.dict()
            if updated_service.endpoint
            else None,
        )
    except Exception as e:
        raise RuntimeError(
            f"Failed to start model server with UUID {uuid}: {e}"
        ) from e
stop_model_server(self, uuid, timeout=300, force=False)

Abstract method to stop a model server.

This operation should be reversible. A stopped model server should still show up in the list of model servers returned by find_model_server and it should be possible to start it again by calling start_model_server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False

Exceptions:

Type Description
RuntimeError

if the model server is not found.

Source code in zenml/model_deployers/base_model_deployer.py
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Abstract method to stop a model server.

    This operation should be reversible. A stopped model server should still
    show up in the list of model servers returned by `find_model_server` and
    it should be possible to start it again by calling `start_model_server`.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.

    Raises:
        RuntimeError: if the model server is not found.
    """
    client = Client()
    try:
        service = self.find_model_server(service_uuid=uuid)[0]
        updated_service = self.perform_stop_model(service, timeout, force)
        client.update_service(
            id=updated_service.uuid,
            admin_state=updated_service.admin_state,
            status=updated_service.status.dict(),
            endpoint=updated_service.endpoint.dict()
            if updated_service.endpoint
            else None,
        )
    except Exception as e:
        raise RuntimeError(
            f"Failed to stop model server with UUID {uuid}: {e}"
        ) from e

BaseModelDeployerConfig (StackComponentConfig) pydantic-model

Base config for all model deployers.

Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerConfig(StackComponentConfig):
    """Base config for all model deployers."""

BaseModelDeployerFlavor (Flavor)

Base class for model deployer flavors.

Source code in zenml/model_deployers/base_model_deployer.py
class BaseModelDeployerFlavor(Flavor):
    """Base class for model deployer flavors."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.

        Returns:
            The flavor type.
        """
        return StackComponentType.MODEL_DEPLOYER

    @property
    def config_class(self) -> Type[BaseModelDeployerConfig]:
        """Returns `BaseModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return BaseModelDeployerConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type[BaseModelDeployer]:
        """The class that implements the model deployer."""
config_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig] property readonly

Returns BaseModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.model_deployers.base_model_deployer.BaseModelDeployerConfig]

The config class.

implementation_class: Type[zenml.model_deployers.base_model_deployer.BaseModelDeployer] property readonly

The class that implements the model deployer.

type: StackComponentType property readonly

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.

get_model_version_id_if_exists(model_name, model_version)

Get the model version id if it exists.

Parameters:

Name Type Description Default
model_name Optional[str]

The name of the model.

required
model_version Optional[str]

The version of the model.

required

Returns:

Type Description
Optional[uuid.UUID]

The model version id if it exists.

Source code in zenml/model_deployers/base_model_deployer.py
def get_model_version_id_if_exists(
    model_name: Optional[str],
    model_version: Optional[str],
) -> Optional[UUID]:
    """Get the model version id if it exists.

    Args:
        model_name: The name of the model.
        model_version: The version of the model.

    Returns:
        The model version id if it exists.
    """
    client = Client()
    if model_name:
        with contextlib.suppress(KeyError):
            return client.get_model_version(
                model_name_or_id=model_name,
                model_version_name_or_number_or_id=model_version,
            ).id
    return None