Skip to content

Bentoml

zenml.integrations.bentoml special

Initialization of the BentoML integration for ZenML.

The BentoML integration allows you to use the BentoML model serving to implement continuous model deployment.

BentoMLIntegration (Integration)

Definition of BentoML integration for ZenML.

Source code in zenml/integrations/bentoml/__init__.py
class BentoMLIntegration(Integration):
    """Definition of BentoML integration for ZenML."""

    NAME = BENTOML
    REQUIREMENTS = [
        "bentoml>=1.0.10",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activate the BentoML integration."""
        from zenml.integrations.bentoml import materializers  # noqa
        from zenml.integrations.bentoml import model_deployers  # noqa
        from zenml.integrations.bentoml import services  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for BentoML.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.bentoml.flavors import (
            BentoMLModelDeployerFlavor,
        )

        return [BentoMLModelDeployerFlavor]

activate() classmethod

Activate the BentoML integration.

Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the BentoML integration."""
    from zenml.integrations.bentoml import materializers  # noqa
    from zenml.integrations.bentoml import model_deployers  # noqa
    from zenml.integrations.bentoml import services  # noqa

flavors() classmethod

Declare the stack component flavors for BentoML.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for BentoML.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.bentoml.flavors import (
        BentoMLModelDeployerFlavor,
    )

    return [BentoMLModelDeployerFlavor]

constants

BentoML constants.

flavors special

BentoML integration flavors.

bentoml_model_deployer_flavor

BentoML model deployer flavor.

BentoMLModelDeployerConfig (BaseModelDeployerConfig)

Configuration for the BentoMLModelDeployer.

Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerConfig(BaseModelDeployerConfig):
    """Configuration for the BentoMLModelDeployer."""

    service_path: str = ""
BentoMLModelDeployerFlavor (BaseModelDeployerFlavor)

Flavor for the BentoML model deployer.

Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the BentoML model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return BENTOML_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png"

    @property
    def config_class(self) -> Type[BentoMLModelDeployerConfig]:
        """Returns `BentoMLModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return BentoMLModelDeployerConfig

    @property
    def implementation_class(self) -> Type["BentoMLModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.bentoml.model_deployers import (
            BentoMLModelDeployer,
        )

        return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] property readonly

Returns BentoMLModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[BentoMLModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[BentoMLModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

materializers special

Initialization of the BentoML Bento Materializer.

bentoml_bento_materializer

Materializer for BentoML Bento objects.

BentoMaterializer (BaseMaterializer)

Materializer for Bentoml Bento objects.

Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
class BentoMaterializer(BaseMaterializer):
    """Materializer for Bentoml Bento objects."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (bento.Bento,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA

    def load(self, data_type: Type[bento.Bento]) -> bento.Bento:
        """Read from artifact store and return a Bento object.

        Args:
            data_type: An bento.Bento type.

        Returns:
            An bento.Bento object.
        """
        with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
            # Copy from artifact store to temporary directory
            io_utils.copy_dir(self.uri, temp_dir)

            # Load the Bento from the temporary directory
            imported_bento = Bento.import_from(
                os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
            )

            # Try save the Bento to the local BentoML store
            try:
                _ = bentoml.get(imported_bento.tag)
            except BentoMLException:
                imported_bento.save()
            return imported_bento

    def save(self, bento: bento.Bento) -> None:
        """Write to artifact store.

        Args:
            bento: An bento.Bento object.
        """
        with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
            temp_bento_path = os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
            bentoml.export_bento(bento.tag, temp_bento_path)
            io_utils.copy_dir(temp_dir, self.uri)

    def extract_metadata(
        self, bento: bento.Bento
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `Bento` object.

        Args:
            bento: The `Bento` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return {
            "bento_info_name": bento.info.name,
            "bento_info_version": bento.info.version,
            "bento_tag_name": bento.tag.name,
            "bentoml_version": bento.info.bentoml_version,
        }
extract_metadata(self, bento)

Extract metadata from the given Bento object.

Parameters:

Name Type Description Default
bento bentoml._internal.bento.bento.Bento

The Bento object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def extract_metadata(
    self, bento: bento.Bento
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `Bento` object.

    Args:
        bento: The `Bento` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "bento_info_name": bento.info.name,
        "bento_info_version": bento.info.version,
        "bento_tag_name": bento.tag.name,
        "bentoml_version": bento.info.bentoml_version,
    }
load(self, data_type)

Read from artifact store and return a Bento object.

Parameters:

Name Type Description Default
data_type Type[bentoml._internal.bento.bento.Bento]

An bento.Bento type.

required

Returns:

Type Description
bentoml._internal.bento.bento.Bento

An bento.Bento object.

Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def load(self, data_type: Type[bento.Bento]) -> bento.Bento:
    """Read from artifact store and return a Bento object.

    Args:
        data_type: An bento.Bento type.

    Returns:
        An bento.Bento object.
    """
    with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
        # Copy from artifact store to temporary directory
        io_utils.copy_dir(self.uri, temp_dir)

        # Load the Bento from the temporary directory
        imported_bento = Bento.import_from(
            os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
        )

        # Try save the Bento to the local BentoML store
        try:
            _ = bentoml.get(imported_bento.tag)
        except BentoMLException:
            imported_bento.save()
        return imported_bento
save(self, bento)

Write to artifact store.

Parameters:

Name Type Description Default
bento bentoml._internal.bento.bento.Bento

An bento.Bento object.

required
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def save(self, bento: bento.Bento) -> None:
    """Write to artifact store.

    Args:
        bento: An bento.Bento object.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        temp_bento_path = os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
        bentoml.export_bento(bento.tag, temp_bento_path)
        io_utils.copy_dir(temp_dir, self.uri)

model_deployers special

Initialization of the BentoML Model Deployer.

bentoml_model_deployer

Implementation of the BentoML Model Deployer.

BentoMLModelDeployer (BaseModelDeployer)

BentoML model deployer stack component implementation.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployer(BaseModelDeployer):
    """BentoML model deployer stack component implementation."""

    NAME: ClassVar[str] = "BentoML"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = (
        BentoMLModelDeployerFlavor
    )

    _service_path: Optional[str] = None

    @property
    def config(self) -> BentoMLModelDeployerConfig:
        """Returns the `BentoMLModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(BentoMLModelDeployerConfig, self._config)

    @staticmethod
    def get_service_path(id_: UUID) -> str:
        """Get the path where local BentoML service information is stored.

        This includes the deployment service configuration, PID and log files
        are stored.

        Args:
            id_: The ID of the BentoML model deployer.

        Returns:
            The service path.
        """
        service_path = os.path.join(
            GlobalConfiguration().local_stores_path,
            str(id_),
        )
        create_dir_recursive_if_not_exists(service_path)
        return service_path

    @property
    def local_path(self) -> str:
        """Returns the path to the root directory.

        This is where all configurations for BentoML deployment daemon processes
        are stored.

        If the service path is not set in the config by the user, the path is
        set to a local default path according to the component ID.

        Returns:
            The path to the local service root directory.
        """
        if self._service_path is not None:
            return self._service_path

        if self.config.service_path:
            self._service_path = self.config.service_path
        else:
            self._service_path = self.get_service_path(self.id)

        create_dir_recursive_if_not_exists(self._service_path)
        return self._service_path

    @staticmethod
    def get_model_server_info(
        service_instance: BaseService,
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information on the model server.

        Args:
            service_instance: BentoML deployment service object

        Returns:
            A dictionary containing the model server information.

        Raises:
            ValueError: If the service type is not supported.
        """
        if (
            service_instance.SERVICE_TYPE.name
            == BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME
        ):
            service_instance = cast(
                BentoMLContainerDeploymentService, service_instance
            )
        elif (
            service_instance.SERVICE_TYPE.name
            == BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME
        ):
            service_instance = cast(
                BentoMLLocalDeploymentService, service_instance
            )
        else:
            raise ValueError(
                f"Unsupported service type: {service_instance.SERVICE_TYPE.name}"
            )

        predictions_apis_urls = ""
        if service_instance.prediction_apis_urls is not None:  # type: ignore
            predictions_apis_urls = ", ".join(
                [
                    api
                    for api in service_instance.prediction_apis_urls  # type: ignore
                    if api is not None
                ]
            )

        service_config = service_instance.config
        assert isinstance(
            service_config,
            (BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
        )

        service_status = service_instance.status
        assert isinstance(
            service_status, (ContainerServiceStatus, LocalDaemonServiceStatus)
        )

        return {
            "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
            "PREDICTION_URL": service_instance.get_prediction_url(),
            "BENTO_TAG": service_config.bento_tag,
            "MODEL_NAME": service_config.model_name,
            "MODEL_URI": service_config.model_uri,
            "BENTO_URI": service_config.bento_uri,
            "SERVICE_PATH": service_status.runtime_path,
            "DAEMON_PID": str(service_status.pid)
            if hasattr(service_status, "pid")
            else None,
            "PREDICTION_APIS_URLS": predictions_apis_urls,
        }

    def perform_deploy_model(
        self,
        id: UUID,
        config: ServiceConfig,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new BentoML deployment service or update an existing one.

        This should serve the supplied model and deployment configuration.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new BentoML
            deployment server to reflect the model and other configuration
            parameters specified in the supplied BentoML service `config`.

          * if `replace` is True, this method will first attempt to find an
            existing BentoML deployment service that is *equivalent* to the
            supplied configuration parameters. Two or more BentoML deployment
            services are considered equivalent if they have the same
            `pipeline_name`, `pipeline_step_name` and `model_name` configuration
            parameters. To put it differently, two BentoML deployment services
            are equivalent if they serve versions of the same model deployed by
            the same pipeline step. If an equivalent BentoML deployment is found,
            it will be updated in place to reflect the new configuration
            parameters.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new BentoML deployment
        server for each new model version. If multiple equivalent BentoML
        deployment servers are found, one is selected at random to be updated
        and the others are deleted.

        Args:
            id: the UUID of the BentoML model deployer.
            config: the configuration of the model to be deployed with BentoML.
            timeout: the timeout in seconds to wait for the BentoML server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the BentoML
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML BentoML deployment service object that can be used to
            interact with the BentoML model http server.
        """
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(f"Created a new BentoML deployment service: {service}")
        return service

    def _clean_up_existing_service(
        self,
        timeout: int,
        force: bool,
        existing_service: BaseService,
    ) -> None:
        # stop the older service
        existing_service.stop(timeout=timeout, force=force)

        # assert that the service is either a BentoMLLocalDeploymentService or a BentoMLContainerDeploymentService
        if not isinstance(
            existing_service,
            (BentoMLLocalDeploymentService, BentoMLContainerDeploymentService),
        ):
            raise ValueError(
                f"Unsupported service type: {type(existing_service)}"
            )

        # delete the old configuration file
        if existing_service.status.runtime_path:
            shutil.rmtree(existing_service.status.runtime_path)

    # the step will receive a config from the user that mentions the number
    # of workers etc.the step implementation will create a new config using
    # all values from the user and add values like pipeline name, model_uri
    def _create_new_service(
        self, id: UUID, timeout: int, config: ServiceConfig
    ) -> BaseService:
        """Creates a new BentoMLDeploymentService.

        Args:
            id: the ID of the BentoML deployment service to be created or updated.
            timeout: the timeout in seconds to wait for the BentoML server
                to be provisioned and successfully started or updated.
            config: the configuration of the model to be deployed with BentoML.

        Returns:
            The BentoMLDeploymentService object that can be used to interact
            with the BentoML model server.

        Raises:
            ValueError: If the service type is not supported.
        """
        assert isinstance(
            config,
            (BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
        )
        # set the root runtime path with the stack component's UUID
        config.root_runtime_path = self.local_path
        # create a new service for the new model
        # if the config is of type BentoMLLocalDeploymentConfig, create a
        # BentoMLLocalDeploymentService, otherwise create a
        # BentoMLContainerDeploymentService
        service: BaseService
        if isinstance(config, BentoMLLocalDeploymentConfig):
            service = BentoMLLocalDeploymentService(uuid=id, config=config)
        elif isinstance(config, BentoMLContainerDeploymentConfig):
            service = BentoMLContainerDeploymentService(uuid=id, config=config)
        else:
            raise ValueError(f"Unsupported service type: {type(config)}")
        service.start(timeout=timeout)

        return service

    def perform_stop_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> BaseService:
        """Method to stop a model server.

        Args:
            service: The service to stop.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.

        Returns:
            The stopped service.
        """
        service.stop(timeout=timeout, force=force)
        return service

    def perform_start_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Method to start a model server.

        Args:
            service: The service to start.
            timeout: Timeout in seconds to wait for the service to start.

        Returns:
            The started service.
        """
        service.start(timeout=timeout)
        return service

    def perform_delete_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to delete all configuration of a model server.

        Args:
            service: The service to delete.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        self._clean_up_existing_service(
            existing_service=service, timeout=timeout, force=force
        )
config: BentoMLModelDeployerConfig property readonly

Returns the BentoMLModelDeployerConfig config.

Returns:

Type Description
BentoMLModelDeployerConfig

The configuration.

local_path: str property readonly

Returns the path to the root directory.

This is where all configurations for BentoML deployment daemon processes are stored.

If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.

Returns:

Type Description
str

The path to the local service root directory.

FLAVOR (BaseModelDeployerFlavor)

Flavor for the BentoML model deployer.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the BentoML model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return BENTOML_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png"

    @property
    def config_class(self) -> Type[BentoMLModelDeployerConfig]:
        """Returns `BentoMLModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return BentoMLModelDeployerConfig

    @property
    def implementation_class(self) -> Type["BentoMLModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.bentoml.model_deployers import (
            BentoMLModelDeployer,
        )

        return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] property readonly

Returns BentoMLModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[BentoMLModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[BentoMLModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

get_model_server_info(service_instance) staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance BaseService

BentoML deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Exceptions:

Type Description
ValueError

If the service type is not supported.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_model_server_info(
    service_instance: BaseService,
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: BentoML deployment service object

    Returns:
        A dictionary containing the model server information.

    Raises:
        ValueError: If the service type is not supported.
    """
    if (
        service_instance.SERVICE_TYPE.name
        == BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME
    ):
        service_instance = cast(
            BentoMLContainerDeploymentService, service_instance
        )
    elif (
        service_instance.SERVICE_TYPE.name
        == BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME
    ):
        service_instance = cast(
            BentoMLLocalDeploymentService, service_instance
        )
    else:
        raise ValueError(
            f"Unsupported service type: {service_instance.SERVICE_TYPE.name}"
        )

    predictions_apis_urls = ""
    if service_instance.prediction_apis_urls is not None:  # type: ignore
        predictions_apis_urls = ", ".join(
            [
                api
                for api in service_instance.prediction_apis_urls  # type: ignore
                if api is not None
            ]
        )

    service_config = service_instance.config
    assert isinstance(
        service_config,
        (BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
    )

    service_status = service_instance.status
    assert isinstance(
        service_status, (ContainerServiceStatus, LocalDaemonServiceStatus)
    )

    return {
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "BENTO_TAG": service_config.bento_tag,
        "MODEL_NAME": service_config.model_name,
        "MODEL_URI": service_config.model_uri,
        "BENTO_URI": service_config.bento_uri,
        "SERVICE_PATH": service_status.runtime_path,
        "DAEMON_PID": str(service_status.pid)
        if hasattr(service_status, "pid")
        else None,
        "PREDICTION_APIS_URLS": predictions_apis_urls,
    }
get_service_path(id_) staticmethod

Get the path where local BentoML service information is stored.

This includes the deployment service configuration, PID and log files are stored.

Parameters:

Name Type Description Default
id_ UUID

The ID of the BentoML model deployer.

required

Returns:

Type Description
str

The service path.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
    """Get the path where local BentoML service information is stored.

    This includes the deployment service configuration, PID and log files
    are stored.

    Args:
        id_: The ID of the BentoML model deployer.

    Returns:
        The service path.
    """
    service_path = os.path.join(
        GlobalConfiguration().local_stores_path,
        str(id_),
    )
    create_dir_recursive_if_not_exists(service_path)
    return service_path
perform_delete_model(self, service, timeout=60, force=False)

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

60
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(self, id, config, timeout=60)

Create a new BentoML deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new BentoML deployment server to reflect the model and other configuration parameters specified in the supplied BentoML service config.

  • if replace is True, this method will first attempt to find an existing BentoML deployment service that is equivalent to the supplied configuration parameters. Two or more BentoML deployment services are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two BentoML deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent BentoML deployment is found, it will be updated in place to reflect the new configuration parameters.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new BentoML deployment server for each new model version. If multiple equivalent BentoML deployment servers are found, one is selected at random to be updated and the others are deleted.

Parameters:

Name Type Description Default
id UUID

the UUID of the BentoML model deployer.

required
config ServiceConfig

the configuration of the model to be deployed with BentoML.

required
timeout int

the timeout in seconds to wait for the BentoML server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the BentoML server is provisioned, without waiting for it to fully start.

60

Returns:

Type Description
BaseService

The ZenML BentoML deployment service object that can be used to interact with the BentoML model http server.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new BentoML deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new BentoML
        deployment server to reflect the model and other configuration
        parameters specified in the supplied BentoML service `config`.

      * if `replace` is True, this method will first attempt to find an
        existing BentoML deployment service that is *equivalent* to the
        supplied configuration parameters. Two or more BentoML deployment
        services are considered equivalent if they have the same
        `pipeline_name`, `pipeline_step_name` and `model_name` configuration
        parameters. To put it differently, two BentoML deployment services
        are equivalent if they serve versions of the same model deployed by
        the same pipeline step. If an equivalent BentoML deployment is found,
        it will be updated in place to reflect the new configuration
        parameters.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new BentoML deployment
    server for each new model version. If multiple equivalent BentoML
    deployment servers are found, one is selected at random to be updated
    and the others are deleted.

    Args:
        id: the UUID of the BentoML model deployer.
        config: the configuration of the model to be deployed with BentoML.
        timeout: the timeout in seconds to wait for the BentoML server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the BentoML
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML BentoML deployment service object that can be used to
        interact with the BentoML model http server.
    """
    service = self._create_new_service(
        id=id, timeout=timeout, config=config
    )
    logger.info(f"Created a new BentoML deployment service: {service}")
    return service
perform_start_model(self, service, timeout=60)

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

60

Returns:

Type Description
BaseService

The started service.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(self, service, timeout=60, force=False)

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

60
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service

services special

Initialization for BentoML services.

bentoml_container_deployment

Implementation for the BentoML container deployment service.

BentoMLContainerDeploymentConfig (ContainerServiceConfig)

BentoML container deployment configuration.

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentConfig(ContainerServiceConfig):
    """BentoML container deployment configuration."""

    model_name: str
    model_uri: str
    bento_tag: str
    bento_uri: Optional[str] = None
    platform: Optional[str] = None
    image: str = ""
    image_tag: Optional[str] = None
    features: Optional[List[str]] = None
    file: Optional[str] = None
    apis: List[str] = []
    working_dir: Optional[str] = None
    workers: int = 1
    backlog: int = 2048
    host: Optional[str] = None
    port: Optional[int] = None
BentoMLContainerDeploymentEndpoint (ContainerServiceEndpoint)

A service endpoint exposed by the BentoML container deployment service.

Attributes:

Name Type Description
config BentoMLContainerDeploymentEndpointConfig

service endpoint configuration

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentEndpoint(ContainerServiceEndpoint):
    """A service endpoint exposed by the BentoML container deployment service.

    Attributes:
        config: service endpoint configuration
    """

    config: BentoMLContainerDeploymentEndpointConfig

    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.

        Returns:
            the prediction URL for the endpoint
        """
        uri = self.status.uri
        if not uri:
            return None
        return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str] property readonly

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

BentoMLContainerDeploymentEndpointConfig (ContainerServiceEndpointConfig)

BentoML container deployment service configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentEndpointConfig(ContainerServiceEndpointConfig):
    """BentoML container deployment service configuration.

    Attributes:
        prediction_url_path: URI subpath for prediction requests
    """

    prediction_url_path: str
BentoMLContainerDeploymentService (ContainerService, BaseDeploymentService)

BentoML container deployment service.

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentService(
    ContainerService, BaseDeploymentService
):
    """BentoML container deployment service."""

    SERVICE_TYPE = ServiceType(
        name=BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME,
        type="model-serving",
        flavor="bentoml",
        description="BentoML container prediction service",
        logo_url="https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png",
    )

    config: BentoMLContainerDeploymentConfig
    endpoint: BentoMLContainerDeploymentEndpoint

    def __init__(
        self,
        config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]],
        **attrs: Any,
    ) -> None:
        """Initialize the BentoML deployment service.

        Args:
            config: service configuration
            attrs: additional attributes to set on the service
        """
        # ensure that the endpoint is created before the service is initialized
        # TODO [ENG-700]: implement a service factory or builder for BentoML
        #   deployment services
        if (
            isinstance(config, BentoMLContainerDeploymentConfig)
            and "endpoint" not in attrs
        ):
            endpoint = BentoMLContainerDeploymentEndpoint(
                config=BentoMLContainerDeploymentEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                    port=config.port or BENTOML_DEFAULT_PORT,
                    ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                    prediction_url_path=BENTOML_PREDICTION_URL_PATH,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    # override the is_running property to check if the bentoml container is running
    @property
    def is_running(self) -> bool:
        """Check if the service is currently running.

        This method will actively poll the external service to get its status
        and will return the result.

        Returns:
            True if the service is running and active (i.e. the endpoints are
            responsive, if any are configured), otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ACTIVE

    # override the container start method to use the root user
    def _start_container(self) -> None:
        """Start the service docker container associated with this service."""
        container = self.container

        if container:
            # the container exists, check if it is running
            if container.status == "running":
                logger.debug(
                    "Container for service '%s' is already running",
                    self,
                )
                return

            # the container is stopped or in an error state, remove it
            logger.debug(
                "Removing previous container for service '%s'",
                self,
            )
            container.remove(force=True)

        logger.debug("Starting container for service '%s'...", self)

        try:
            self.docker_client.images.get(self.config.image)
        except docker_errors.ImageNotFound:
            logger.debug(
                "Pulling container image '%s' for service '%s'...",
                self.config.image,
                self,
            )
            self.docker_client.images.pull(self.config.image)

        self._setup_runtime_path()

        ports: Dict[int, Optional[int]] = {}
        if self.endpoint:
            self.endpoint.prepare_for_start()
            if self.endpoint.status.port:
                ports[self.endpoint.status.port] = self.endpoint.status.port

        command, env = self._get_container_cmd()
        volumes = self._get_container_volumes()

        try:
            container = self.docker_client.containers.run(
                name=self.container_id,
                image=self.config.image,
                entrypoint=command,
                detach=True,
                volumes=volumes,
                environment=env,
                remove=False,
                auto_remove=False,
                ports=ports,
                user="root",
                labels={
                    "zenml-service-uuid": str(self.uuid),
                },
                working_dir="/home/bentoml/bento",
                extra_hosts={"host.docker.internal": "host-gateway"},
            )

            logger.debug(
                "Docker container for service '%s' started with ID: %s",
                self,
                self.container_id,
            )
        except docker_errors.DockerException as e:
            logger.error(
                "Docker container for service '%s' failed to start: %s",
                self,
                e,
            )

    def _containerize_and_push_bento(self) -> None:
        """Containerize the bento and push it to the container registry.

        Raises:
            Exception: If the bento containerization fails.
        """
        zenml_client = ZenMLClient()
        container_registry = zenml_client.active_stack.container_registry
        # a tuple of config image and image tag
        if self.config.image and self.config.image_tag:
            image_tag = (self.config.image, self.config.image_tag)
        else:
            # if container registry is present in the stack, name the image
            # with the container registry uri, else name the image with the bento tag
            if container_registry:
                image_name = (
                    f"{container_registry.config.uri}/{self.config.bento_tag}"
                )
                image_tag = (image_name,)  # type: ignore
                self.config.image = image_name
            else:
                # bentoml will use the bento tag as the name of the image
                image_tag = (self.config.bento_tag,)  # type: ignore
                self.config.image = self.config.bento_tag
        try:
            bentoml.container.build(
                bento_tag=self.config.bento_tag,
                backend="docker",  # hardcoding docker since container service only supports docker
                image_tag=image_tag,
                features=self.config.features,
                file=self.config.file,
                platform=self.config.platform,
            )

        except Exception as e:
            logger.error(f"Error containerizing the bento: {e}")
            raise e

        if container_registry:
            logger.info(
                f"Pushing bento to container registry {container_registry.config.uri}"
            )
            # push the bento to the image registry
            container_registry.push_image(self.config.image)
        else:
            logger.warning(
                "No container registry found in the active stack. "
                "Please add a container registry to your stack to push "
                "the bento to an image registry."
            )

    def provision(self) -> None:
        """Provision the service."""
        # containerize the bento
        self._containerize_and_push_bento()
        # run the container
        super().provision()

    def run(self) -> None:
        """Start the service.

        Raises:
            FileNotFoundError: If the bento file is not found.
            subprocess.CalledProcessError: If the bentoml serve command fails.
        """
        from bentoml._internal.service.loader import load

        logger.info("Starting BentoML container deployment service...")

        self.endpoint.prepare_for_start()

        if self.config.working_dir is None:
            if os.path.isdir(os.path.expanduser(self.config.bento_tag)):
                self.config.working_dir = os.path.expanduser(
                    self.config.bento_tag
                )
            else:
                self.config.working_dir = "."
        if sys.path[0] != self.config.working_dir:
            sys.path.insert(0, self.config.working_dir)

        _ = load(bento_identifier=".", working_dir=self.config.working_dir)
        # run bentoml serve command inside the container
        # Use subprocess for better control and error handling
        import subprocess

        try:
            subprocess.run(["bentoml", "serve"], check=True)
        except subprocess.CalledProcessError as e:
            logger.error(f"Failed to start BentoML service: {e}")
            raise
        except FileNotFoundError:
            logger.error(
                "BentoML command not found. Make sure it's installed and in the PATH."
            )
            raise

    @property
    def prediction_url(self) -> Optional[str]:
        """Get the URI where the http server is running.

        Returns:
            The URI where the http service can be accessed to get more information
            about the service and to make predictions.
        """
        if not self.is_running:
            return None
        return self.endpoint.prediction_url

    @property
    def prediction_apis_urls(self) -> Optional[List[str]]:
        """Get the URI where the prediction api services is answering requests.

        Returns:
            The URI where the prediction service apis can be contacted to process
            HTTP/REST inference requests, or None, if the service isn't running.
        """
        if not self.is_running:
            return None

        if self.config.apis:
            return [
                f"{self.endpoint.prediction_url}/{api}"
                for api in self.config.apis
            ]
        return None

    def predict(self, api_endpoint: str, data: Any) -> Any:
        """Make a prediction using the service.

        Args:
            data: data to make a prediction on
            api_endpoint: the api endpoint to make the prediction on

        Returns:
            The prediction result.

        Raises:
            Exception: if the service is not running
            ValueError: if the prediction endpoint is unknown.
        """
        if not self.is_running:
            raise Exception(
                "BentoML prediction service is not running. "
                "Please start the service before making predictions."
            )
        if self.endpoint.prediction_url is not None:
            client = Client.from_url(
                self.endpoint.prediction_url.replace("http://", "").rstrip("/")
            )
            result = client.call(api_endpoint, data)
        else:
            raise ValueError("No endpoint known for prediction.")
        return result
is_running: bool property readonly

Check if the service is currently running.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is running and active (i.e. the endpoints are responsive, if any are configured), otherwise False.

prediction_apis_urls: Optional[List[str]] property readonly

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property readonly

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information about the service and to make predictions.

__init__(self, config, **attrs) special

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[zenml.integrations.bentoml.services.bentoml_container_deployment.BentoMLContainerDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def __init__(
    self,
    config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLContainerDeploymentConfig)
        and "endpoint" not in attrs
    ):
        endpoint = BentoMLContainerDeploymentEndpoint(
            config=BentoMLContainerDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port or BENTOML_DEFAULT_PORT,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
model_post_init(/, self, context)

We need to both initialize private attributes and call the user-defined model_post_init method.

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
    """We need to both initialize private attributes and call the user-defined model_post_init
    method.
    """
    init_private_attributes(self, context)
    original_model_post_init(self, context)
predict(self, api_endpoint, data)

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required

Returns:

Type Description
Any

The prediction result.

Exceptions:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def predict(self, api_endpoint: str, data: Any) -> Any:
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is not None:
        client = Client.from_url(
            self.endpoint.prediction_url.replace("http://", "").rstrip("/")
        )
        result = client.call(api_endpoint, data)
    else:
        raise ValueError("No endpoint known for prediction.")
    return result
provision(self)

Provision the service.

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def provision(self) -> None:
    """Provision the service."""
    # containerize the bento
    self._containerize_and_push_bento()
    # run the container
    super().provision()
run(self)

Start the service.

Exceptions:

Type Description
FileNotFoundError

If the bento file is not found.

subprocess.CalledProcessError

If the bentoml serve command fails.

Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def run(self) -> None:
    """Start the service.

    Raises:
        FileNotFoundError: If the bento file is not found.
        subprocess.CalledProcessError: If the bentoml serve command fails.
    """
    from bentoml._internal.service.loader import load

    logger.info("Starting BentoML container deployment service...")

    self.endpoint.prepare_for_start()

    if self.config.working_dir is None:
        if os.path.isdir(os.path.expanduser(self.config.bento_tag)):
            self.config.working_dir = os.path.expanduser(
                self.config.bento_tag
            )
        else:
            self.config.working_dir = "."
    if sys.path[0] != self.config.working_dir:
        sys.path.insert(0, self.config.working_dir)

    _ = load(bento_identifier=".", working_dir=self.config.working_dir)
    # run bentoml serve command inside the container
    # Use subprocess for better control and error handling
    import subprocess

    try:
        subprocess.run(["bentoml", "serve"], check=True)
    except subprocess.CalledProcessError as e:
        logger.error(f"Failed to start BentoML service: {e}")
        raise
    except FileNotFoundError:
        logger.error(
            "BentoML command not found. Make sure it's installed and in the PATH."
        )
        raise

bentoml_local_deployment

Implementation for the BentoML local deployment service.

BentoMLDeploymentEndpoint (LocalDaemonServiceEndpoint)

A service endpoint exposed by the BentoML deployment daemon.

Attributes:

Name Type Description
config BentoMLDeploymentEndpointConfig

service endpoint configuration

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLDeploymentEndpoint(LocalDaemonServiceEndpoint):
    """A service endpoint exposed by the BentoML deployment daemon.

    Attributes:
        config: service endpoint configuration
    """

    config: BentoMLDeploymentEndpointConfig
    monitor: HTTPEndpointHealthMonitor

    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.

        Returns:
            the prediction URL for the endpoint
        """
        uri = self.status.uri
        if not uri:
            return None
        return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str] property readonly

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

BentoMLDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig)

BentoML deployment service configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
    """BentoML deployment service configuration.

    Attributes:
        prediction_url_path: URI subpath for prediction requests
    """

    prediction_url_path: str
BentoMLLocalDeploymentConfig (LocalDaemonServiceConfig)

BentoML model deployment configuration.

Attributes:

Name Type Description
model_name str

name of the model to deploy

model_uri str

URI of the model to deploy

port Optional[int]

port to expose the service on

bento_tag str

Bento package to deploy. A bento tag is a combination of the name of the bento and its version.

workers int

number of workers to use

backlog int

number of requests to queue

production bool

whether to run in production mode

working_dir str

working directory for the service

host Optional[str]

host to expose the service on

ssl_parameters Optional[zenml.integrations.bentoml.services.bentoml_local_deployment.SSLBentoMLParametersConfig]

SSL parameters for the Bentoml deployment

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLLocalDeploymentConfig(LocalDaemonServiceConfig):
    """BentoML model deployment configuration.

    Attributes:
        model_name: name of the model to deploy
        model_uri: URI of the model to deploy
        port: port to expose the service on
        bento_tag: Bento package to deploy. A bento tag is a combination of the
            name of the bento and its version.
        workers: number of workers to use
        backlog: number of requests to queue
        production: whether to run in production mode
        working_dir: working directory for the service
        host: host to expose the service on
        ssl_parameters: SSL parameters for the Bentoml deployment
    """

    model_name: str
    model_uri: str
    bento_tag: str
    bento_uri: Optional[str] = None
    apis: List[str] = []
    workers: int = 1
    port: Optional[int] = None
    backlog: int = 2048
    production: bool = False
    working_dir: str
    host: Optional[str] = None
    ssl_parameters: Optional[SSLBentoMLParametersConfig] = Field(
        default_factory=SSLBentoMLParametersConfig
    )
BentoMLLocalDeploymentService (LocalDaemonService, BaseDeploymentService)

BentoML deployment service used to start a local prediction server for BentoML models.

Attributes:

Name Type Description
SERVICE_TYPE ClassVar[zenml.services.service_type.ServiceType]

a service type descriptor with information describing the BentoML deployment service class

config BentoMLLocalDeploymentConfig

service configuration

endpoint BentoMLDeploymentEndpoint

optional service endpoint

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLLocalDeploymentService(LocalDaemonService, BaseDeploymentService):
    """BentoML deployment service used to start a local prediction server for BentoML models.

    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the BentoML deployment service class
        config: service configuration
        endpoint: optional service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name=BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME,
        type="model-serving",
        flavor="bentoml",
        description="BentoML local prediction service",
        logo_url="https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png",
    )

    config: BentoMLLocalDeploymentConfig
    endpoint: BentoMLDeploymentEndpoint

    def __init__(
        self,
        config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]],
        **attrs: Any,
    ) -> None:
        """Initialize the BentoML deployment service.

        Args:
            config: service configuration
            attrs: additional attributes to set on the service
        """
        # ensure that the endpoint is created before the service is initialized
        # TODO [ENG-700]: implement a service factory or builder for BentoML
        #   deployment services
        if (
            isinstance(config, BentoMLLocalDeploymentConfig)
            and "endpoint" not in attrs
        ):
            endpoint = BentoMLDeploymentEndpoint(
                config=BentoMLDeploymentEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                    port=config.port
                    if config.port is not None
                    else BENTOML_DEFAULT_PORT,
                    ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                    prediction_url_path=BENTOML_PREDICTION_URL_PATH,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    def run(self) -> None:
        """Start the service."""
        from bentoml import Service
        from bentoml._internal.service.loader import load

        logger.info(
            "Starting BentoML prediction service as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()
        ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
        # verify if to deploy in production mode or development mode
        logger.info("Running in production mode.")
        svc = load(
            bento_identifier=self.config.bento_tag,
            working_dir=self.config.working_dir or ".",
        )

        if isinstance(svc, Service):
            # bentoml<1.2
            from bentoml.serving import serve_http_production

            try:
                serve_http_production(
                    self.config.bento_tag,
                    working_dir=self.config.working_dir,
                    port=self.config.port,
                    api_workers=self.config.workers,
                    host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                    backlog=self.config.backlog,
                    ssl_certfile=ssl_params.ssl_certfile,
                    ssl_keyfile=ssl_params.ssl_keyfile,
                    ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                    ssl_version=ssl_params.ssl_version,
                    ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                    ssl_ca_certs=ssl_params.ssl_ca_certs,
                    ssl_ciphers=ssl_params.ssl_ciphers,
                )
            except KeyboardInterrupt:
                logger.info("Stopping BentoML prediction service...")
        else:
            # bentoml>=1.2
            from _bentoml_impl.server import serve_http  # type: ignore

            svc.inject_config()
            try:
                serve_http(
                    self.config.bento_tag,
                    working_dir=self.config.working_dir or ".",
                    host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                    port=self.config.port,
                    backlog=self.config.backlog,
                    ssl_certfile=ssl_params.ssl_certfile,
                    ssl_keyfile=ssl_params.ssl_keyfile,
                    ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                    ssl_version=ssl_params.ssl_version,
                    ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                    ssl_ca_certs=ssl_params.ssl_ca_certs,
                    ssl_ciphers=ssl_params.ssl_ciphers,
                )
            except KeyboardInterrupt:
                logger.info("Stopping BentoML prediction service...")

    @property
    def prediction_url(self) -> Optional[str]:
        """Get the URI where the http server is running.

        Returns:
            The URI where the http service can be accessed to get more information
            about the service and to make predictions.
        """
        if not self.is_running:
            return None
        return self.endpoint.prediction_url

    @property
    def prediction_apis_urls(self) -> Optional[List[str]]:
        """Get the URI where the prediction api services is answering requests.

        Returns:
            The URI where the prediction service apis can be contacted to process
            HTTP/REST inference requests, or None, if the service isn't running.
        """
        if not self.is_running:
            return None

        if self.config.apis:
            return [
                f"{self.endpoint.prediction_url}/{api}"
                for api in self.config.apis
            ]
        return None

    def predict(
        self, api_endpoint: str, data: "Any", sync: bool = True
    ) -> "Any":
        """Make a prediction using the service.

        Args:
            data: data to make a prediction on
            api_endpoint: the api endpoint to make the prediction on
            sync: if set to False, the prediction will be made asynchronously

        Returns:
            The prediction result.

        Raises:
            Exception: if the service is not running
            ValueError: if the prediction endpoint is unknown.
        """
        if not self.is_running:
            raise Exception(
                "BentoML prediction service is not running. "
                "Please start the service before making predictions."
            )
        if self.endpoint.prediction_url is None:
            raise ValueError("No endpoint known for prediction.")
        if sync:
            client = SyncHTTPClient(self.endpoint.prediction_url)
        else:
            client = AsyncHTTPClient(self.endpoint.prediction_url)
        result = client.call(api_endpoint, data)
        return result
prediction_apis_urls: Optional[List[str]] property readonly

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property readonly

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information about the service and to make predictions.

__init__(self, config, **attrs) special

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[zenml.integrations.bentoml.services.bentoml_local_deployment.BentoMLLocalDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
def __init__(
    self,
    config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLLocalDeploymentConfig)
        and "endpoint" not in attrs
    ):
        endpoint = BentoMLDeploymentEndpoint(
            config=BentoMLDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port
                if config.port is not None
                else BENTOML_DEFAULT_PORT,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
predict(self, api_endpoint, data, sync=True)

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required
sync bool

if set to False, the prediction will be made asynchronously

True

Returns:

Type Description
Any

The prediction result.

Exceptions:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
def predict(
    self, api_endpoint: str, data: "Any", sync: bool = True
) -> "Any":
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on
        sync: if set to False, the prediction will be made asynchronously

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is None:
        raise ValueError("No endpoint known for prediction.")
    if sync:
        client = SyncHTTPClient(self.endpoint.prediction_url)
    else:
        client = AsyncHTTPClient(self.endpoint.prediction_url)
    result = client.call(api_endpoint, data)
    return result
run(self)

Start the service.

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
def run(self) -> None:
    """Start the service."""
    from bentoml import Service
    from bentoml._internal.service.loader import load

    logger.info(
        "Starting BentoML prediction service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()
    ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
    # verify if to deploy in production mode or development mode
    logger.info("Running in production mode.")
    svc = load(
        bento_identifier=self.config.bento_tag,
        working_dir=self.config.working_dir or ".",
    )

    if isinstance(svc, Service):
        # bentoml<1.2
        from bentoml.serving import serve_http_production

        try:
            serve_http_production(
                self.config.bento_tag,
                working_dir=self.config.working_dir,
                port=self.config.port,
                api_workers=self.config.workers,
                host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                backlog=self.config.backlog,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info("Stopping BentoML prediction service...")
    else:
        # bentoml>=1.2
        from _bentoml_impl.server import serve_http  # type: ignore

        svc.inject_config()
        try:
            serve_http(
                self.config.bento_tag,
                working_dir=self.config.working_dir or ".",
                host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                port=self.config.port,
                backlog=self.config.backlog,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info("Stopping BentoML prediction service...")
SSLBentoMLParametersConfig (BaseModel)

BentoML SSL parameters configuration.

Attributes:

Name Type Description
ssl_certfile Optional[str]

SSL certificate file

ssl_keyfile Optional[str]

SSL key file

ssl_keyfile_password Optional[str]

SSL key file password

ssl_version Optional[int]

SSL version

ssl_cert_reqs Optional[int]

SSL certificate requirements

ssl_ca_certs Optional[str]

SSL CA certificates

ssl_ciphers Optional[str]

SSL ciphers

Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class SSLBentoMLParametersConfig(BaseModel):
    """BentoML SSL parameters configuration.

    Attributes:
        ssl_certfile: SSL certificate file
        ssl_keyfile: SSL key file
        ssl_keyfile_password: SSL key file password
        ssl_version: SSL version
        ssl_cert_reqs: SSL certificate requirements
        ssl_ca_certs: SSL CA certificates
        ssl_ciphers: SSL ciphers
    """

    ssl_certfile: Optional[str] = None
    ssl_keyfile: Optional[str] = None
    ssl_keyfile_password: Optional[str] = None
    ssl_version: Optional[int] = None
    ssl_cert_reqs: Optional[int] = None
    ssl_ca_certs: Optional[str] = None
    ssl_ciphers: Optional[str] = None

deployment_type

BentoML Service Deployment Types.

BentoMLDeploymentType (Enum)

BentoML Service Deployment Types.

Source code in zenml/integrations/bentoml/services/deployment_type.py
class BentoMLDeploymentType(Enum):
    """BentoML Service Deployment Types."""

    LOCAL = "local"
    CONTAINER = "container"

steps special

Initialization of the BentoML standard interface steps.

bento_builder

Implementation of the BentoML bento builder step.

bentoml_deployer

Implementation of the BentoML model deployer pipeline step.