Bentoml
zenml.integrations.bentoml
special
Initialization of the BentoML integration for ZenML.
The BentoML integration allows you to use the BentoML model serving to implement continuous model deployment.
BentoMLIntegration (Integration)
Definition of BentoML integration for ZenML.
Source code in zenml/integrations/bentoml/__init__.py
class BentoMLIntegration(Integration):
"""Definition of BentoML integration for ZenML."""
NAME = BENTOML
REQUIREMENTS = [
"bentoml>=1.0.10",
]
@classmethod
def activate(cls) -> None:
"""Activate the BentoML integration."""
from zenml.integrations.bentoml import materializers # noqa
from zenml.integrations.bentoml import model_deployers # noqa
from zenml.integrations.bentoml import services # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for BentoML.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.bentoml.flavors import (
BentoMLModelDeployerFlavor,
)
return [BentoMLModelDeployerFlavor]
activate()
classmethod
Activate the BentoML integration.
Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the BentoML integration."""
from zenml.integrations.bentoml import materializers # noqa
from zenml.integrations.bentoml import model_deployers # noqa
from zenml.integrations.bentoml import services # noqa
flavors()
classmethod
Declare the stack component flavors for BentoML.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for BentoML.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.bentoml.flavors import (
BentoMLModelDeployerFlavor,
)
return [BentoMLModelDeployerFlavor]
constants
BentoML constants.
flavors
special
BentoML integration flavors.
bentoml_model_deployer_flavor
BentoML model deployer flavor.
BentoMLModelDeployerConfig (BaseModelDeployerConfig)
Configuration for the BentoMLModelDeployer.
Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerConfig(BaseModelDeployerConfig):
"""Configuration for the BentoMLModelDeployer."""
service_path: str = ""
BentoMLModelDeployerFlavor (BaseModelDeployerFlavor)
Flavor for the BentoML model deployer.
Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the BentoML model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return BENTOML_MODEL_DEPLOYER_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png"
@property
def config_class(self) -> Type[BentoMLModelDeployerConfig]:
"""Returns `BentoMLModelDeployerConfig` config class.
Returns:
The config class.
"""
return BentoMLModelDeployerConfig
@property
def implementation_class(self) -> Type["BentoMLModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.bentoml.model_deployers import (
BentoMLModelDeployer,
)
return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]
property
readonly
Returns BentoMLModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[BentoMLModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BentoMLModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
materializers
special
Initialization of the BentoML Bento Materializer.
bentoml_bento_materializer
Materializer for BentoML Bento objects.
BentoMaterializer (BaseMaterializer)
Materializer for Bentoml Bento objects.
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
class BentoMaterializer(BaseMaterializer):
"""Materializer for Bentoml Bento objects."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (bento.Bento,)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
def load(self, data_type: Type[bento.Bento]) -> bento.Bento:
"""Read from artifact store and return a Bento object.
Args:
data_type: An bento.Bento type.
Returns:
An bento.Bento object.
"""
with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.uri, temp_dir)
# Load the Bento from the temporary directory
imported_bento = Bento.import_from(
os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
)
# Try save the Bento to the local BentoML store
try:
_ = bentoml.get(imported_bento.tag)
except BentoMLException:
imported_bento.save()
return imported_bento
def save(self, bento: bento.Bento) -> None:
"""Write to artifact store.
Args:
bento: An bento.Bento object.
"""
with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
temp_bento_path = os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
bentoml.export_bento(bento.tag, temp_bento_path)
io_utils.copy_dir(temp_dir, self.uri)
def extract_metadata(
self, bento: bento.Bento
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `Bento` object.
Args:
bento: The `Bento` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
return {
"bento_info_name": bento.info.name,
"bento_info_version": bento.info.version,
"bento_tag_name": bento.tag.name,
"bentoml_version": bento.info.bentoml_version,
}
extract_metadata(self, bento)
Extract metadata from the given Bento
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bento |
bentoml._internal.bento.bento.Bento |
The |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
The extracted metadata as a dictionary. |
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def extract_metadata(
self, bento: bento.Bento
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `Bento` object.
Args:
bento: The `Bento` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
return {
"bento_info_name": bento.info.name,
"bento_info_version": bento.info.version,
"bento_tag_name": bento.tag.name,
"bentoml_version": bento.info.bentoml_version,
}
load(self, data_type)
Read from artifact store and return a Bento object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[bentoml._internal.bento.bento.Bento] |
An bento.Bento type. |
required |
Returns:
Type | Description |
---|---|
bentoml._internal.bento.bento.Bento |
An bento.Bento object. |
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def load(self, data_type: Type[bento.Bento]) -> bento.Bento:
"""Read from artifact store and return a Bento object.
Args:
data_type: An bento.Bento type.
Returns:
An bento.Bento object.
"""
with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.uri, temp_dir)
# Load the Bento from the temporary directory
imported_bento = Bento.import_from(
os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
)
# Try save the Bento to the local BentoML store
try:
_ = bentoml.get(imported_bento.tag)
except BentoMLException:
imported_bento.save()
return imported_bento
save(self, bento)
Write to artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bento |
bentoml._internal.bento.bento.Bento |
An bento.Bento object. |
required |
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def save(self, bento: bento.Bento) -> None:
"""Write to artifact store.
Args:
bento: An bento.Bento object.
"""
with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
temp_bento_path = os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
bentoml.export_bento(bento.tag, temp_bento_path)
io_utils.copy_dir(temp_dir, self.uri)
model_deployers
special
Initialization of the BentoML Model Deployer.
bentoml_model_deployer
Implementation of the BentoML Model Deployer.
BentoMLModelDeployer (BaseModelDeployer)
BentoML model deployer stack component implementation.
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployer(BaseModelDeployer):
"""BentoML model deployer stack component implementation."""
NAME: ClassVar[str] = "BentoML"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = (
BentoMLModelDeployerFlavor
)
_service_path: Optional[str] = None
@property
def config(self) -> BentoMLModelDeployerConfig:
"""Returns the `BentoMLModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(BentoMLModelDeployerConfig, self._config)
@staticmethod
def get_service_path(id_: UUID) -> str:
"""Get the path where local BentoML service information is stored.
This includes the deployment service configuration, PID and log files
are stored.
Args:
id_: The ID of the BentoML model deployer.
Returns:
The service path.
"""
service_path = os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
create_dir_recursive_if_not_exists(service_path)
return service_path
@property
def local_path(self) -> str:
"""Returns the path to the root directory.
This is where all configurations for BentoML deployment daemon processes
are stored.
If the service path is not set in the config by the user, the path is
set to a local default path according to the component ID.
Returns:
The path to the local service root directory.
"""
if self._service_path is not None:
return self._service_path
if self.config.service_path:
self._service_path = self.config.service_path
else:
self._service_path = self.get_service_path(self.id)
create_dir_recursive_if_not_exists(self._service_path)
return self._service_path
@staticmethod
def get_model_server_info(
service_instance: BaseService,
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: BentoML deployment service object
Returns:
A dictionary containing the model server information.
Raises:
ValueError: If the service type is not supported.
"""
if (
service_instance.SERVICE_TYPE.name
== BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME
):
service_instance = cast(
BentoMLContainerDeploymentService, service_instance
)
elif (
service_instance.SERVICE_TYPE.name
== BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME
):
service_instance = cast(
BentoMLLocalDeploymentService, service_instance
)
else:
raise ValueError(
f"Unsupported service type: {service_instance.SERVICE_TYPE.name}"
)
predictions_apis_urls = ""
if service_instance.prediction_apis_urls is not None: # type: ignore
predictions_apis_urls = ", ".join(
[
api
for api in service_instance.prediction_apis_urls # type: ignore
if api is not None
]
)
service_config = service_instance.config
assert isinstance(
service_config,
(BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
)
service_status = service_instance.status
assert isinstance(
service_status, (ContainerServiceStatus, LocalDaemonServiceStatus)
)
return {
"HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
"PREDICTION_URL": service_instance.get_prediction_url(),
"BENTO_TAG": service_config.bento_tag,
"MODEL_NAME": service_config.model_name,
"MODEL_URI": service_config.model_uri,
"BENTO_URI": service_config.bento_uri,
"SERVICE_PATH": service_status.runtime_path,
"DAEMON_PID": str(service_status.pid)
if hasattr(service_status, "pid")
else None,
"PREDICTION_APIS_URLS": predictions_apis_urls,
}
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new BentoML deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new BentoML
deployment server to reflect the model and other configuration
parameters specified in the supplied BentoML service `config`.
* if `replace` is True, this method will first attempt to find an
existing BentoML deployment service that is *equivalent* to the
supplied configuration parameters. Two or more BentoML deployment
services are considered equivalent if they have the same
`pipeline_name`, `pipeline_step_name` and `model_name` configuration
parameters. To put it differently, two BentoML deployment services
are equivalent if they serve versions of the same model deployed by
the same pipeline step. If an equivalent BentoML deployment is found,
it will be updated in place to reflect the new configuration
parameters.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new BentoML deployment
server for each new model version. If multiple equivalent BentoML
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Args:
id: the UUID of the BentoML model deployer.
config: the configuration of the model to be deployed with BentoML.
timeout: the timeout in seconds to wait for the BentoML server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the BentoML
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML BentoML deployment service object that can be used to
interact with the BentoML model http server.
"""
service = self._create_new_service(
id=id, timeout=timeout, config=config
)
logger.info(f"Created a new BentoML deployment service: {service}")
return service
def _clean_up_existing_service(
self,
timeout: int,
force: bool,
existing_service: BaseService,
) -> None:
# stop the older service
existing_service.stop(timeout=timeout, force=force)
# assert that the service is either a BentoMLLocalDeploymentService or a BentoMLContainerDeploymentService
if not isinstance(
existing_service,
(BentoMLLocalDeploymentService, BentoMLContainerDeploymentService),
):
raise ValueError(
f"Unsupported service type: {type(existing_service)}"
)
# delete the old configuration file
if existing_service.status.runtime_path:
shutil.rmtree(existing_service.status.runtime_path)
# the step will receive a config from the user that mentions the number
# of workers etc.the step implementation will create a new config using
# all values from the user and add values like pipeline name, model_uri
def _create_new_service(
self, id: UUID, timeout: int, config: ServiceConfig
) -> BaseService:
"""Creates a new BentoMLDeploymentService.
Args:
id: the ID of the BentoML deployment service to be created or updated.
timeout: the timeout in seconds to wait for the BentoML server
to be provisioned and successfully started or updated.
config: the configuration of the model to be deployed with BentoML.
Returns:
The BentoMLDeploymentService object that can be used to interact
with the BentoML model server.
Raises:
ValueError: If the service type is not supported.
"""
assert isinstance(
config,
(BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
)
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
# create a new service for the new model
# if the config is of type BentoMLLocalDeploymentConfig, create a
# BentoMLLocalDeploymentService, otherwise create a
# BentoMLContainerDeploymentService
service: BaseService
if isinstance(config, BentoMLLocalDeploymentConfig):
service = BentoMLLocalDeploymentService(uuid=id, config=config)
elif isinstance(config, BentoMLContainerDeploymentConfig):
service = BentoMLContainerDeploymentService(uuid=id, config=config)
else:
raise ValueError(f"Unsupported service type: {type(config)}")
service.start(timeout=timeout)
return service
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Method to stop a model server.
Args:
service: The service to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
Returns:
The stopped service.
"""
service.stop(timeout=timeout, force=force)
return service
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Method to start a model server.
Args:
service: The service to start.
timeout: Timeout in seconds to wait for the service to start.
Returns:
The started service.
"""
service.start(timeout=timeout)
return service
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
service: The service to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
config: BentoMLModelDeployerConfig
property
readonly
Returns the BentoMLModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
BentoMLModelDeployerConfig |
The configuration. |
local_path: str
property
readonly
Returns the path to the root directory.
This is where all configurations for BentoML deployment daemon processes are stored.
If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.
Returns:
Type | Description |
---|---|
str |
The path to the local service root directory. |
FLAVOR (BaseModelDeployerFlavor)
Flavor for the BentoML model deployer.
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the BentoML model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return BENTOML_MODEL_DEPLOYER_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png"
@property
def config_class(self) -> Type[BentoMLModelDeployerConfig]:
"""Returns `BentoMLModelDeployerConfig` config class.
Returns:
The config class.
"""
return BentoMLModelDeployerConfig
@property
def implementation_class(self) -> Type["BentoMLModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.bentoml.model_deployers import (
BentoMLModelDeployer,
)
return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]
property
readonly
Returns BentoMLModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[BentoMLModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BentoMLModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
get_model_server_info(service_instance)
staticmethod
Return implementation specific information on the model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
BaseService |
BentoML deployment service object |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the model server information. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the service type is not supported. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_model_server_info(
service_instance: BaseService,
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: BentoML deployment service object
Returns:
A dictionary containing the model server information.
Raises:
ValueError: If the service type is not supported.
"""
if (
service_instance.SERVICE_TYPE.name
== BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME
):
service_instance = cast(
BentoMLContainerDeploymentService, service_instance
)
elif (
service_instance.SERVICE_TYPE.name
== BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME
):
service_instance = cast(
BentoMLLocalDeploymentService, service_instance
)
else:
raise ValueError(
f"Unsupported service type: {service_instance.SERVICE_TYPE.name}"
)
predictions_apis_urls = ""
if service_instance.prediction_apis_urls is not None: # type: ignore
predictions_apis_urls = ", ".join(
[
api
for api in service_instance.prediction_apis_urls # type: ignore
if api is not None
]
)
service_config = service_instance.config
assert isinstance(
service_config,
(BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
)
service_status = service_instance.status
assert isinstance(
service_status, (ContainerServiceStatus, LocalDaemonServiceStatus)
)
return {
"HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
"PREDICTION_URL": service_instance.get_prediction_url(),
"BENTO_TAG": service_config.bento_tag,
"MODEL_NAME": service_config.model_name,
"MODEL_URI": service_config.model_uri,
"BENTO_URI": service_config.bento_uri,
"SERVICE_PATH": service_status.runtime_path,
"DAEMON_PID": str(service_status.pid)
if hasattr(service_status, "pid")
else None,
"PREDICTION_APIS_URLS": predictions_apis_urls,
}
get_service_path(id_)
staticmethod
Get the path where local BentoML service information is stored.
This includes the deployment service configuration, PID and log files are stored.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
UUID |
The ID of the BentoML model deployer. |
required |
Returns:
Type | Description |
---|---|
str |
The service path. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
"""Get the path where local BentoML service information is stored.
This includes the deployment service configuration, PID and log files
are stored.
Args:
id_: The ID of the BentoML model deployer.
Returns:
The service path.
"""
service_path = os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
create_dir_recursive_if_not_exists(service_path)
return service_path
perform_delete_model(self, service, timeout=60, force=False)
Method to delete all configuration of a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to delete. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
60 |
force |
bool |
If True, force the service to stop. |
False |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
service: The service to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
perform_deploy_model(self, id, config, timeout=60)
Create a new BentoML deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new BentoML deployment server to reflect the model and other configuration parameters specified in the supplied BentoML serviceconfig
. -
if
replace
is True, this method will first attempt to find an existing BentoML deployment service that is equivalent to the supplied configuration parameters. Two or more BentoML deployment services are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two BentoML deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent BentoML deployment is found, it will be updated in place to reflect the new configuration parameters.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new BentoML deployment
server for each new model version. If multiple equivalent BentoML
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
the UUID of the BentoML model deployer. |
required |
config |
ServiceConfig |
the configuration of the model to be deployed with BentoML. |
required |
timeout |
int |
the timeout in seconds to wait for the BentoML server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the BentoML server is provisioned, without waiting for it to fully start. |
60 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML BentoML deployment service object that can be used to interact with the BentoML model http server. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new BentoML deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new BentoML
deployment server to reflect the model and other configuration
parameters specified in the supplied BentoML service `config`.
* if `replace` is True, this method will first attempt to find an
existing BentoML deployment service that is *equivalent* to the
supplied configuration parameters. Two or more BentoML deployment
services are considered equivalent if they have the same
`pipeline_name`, `pipeline_step_name` and `model_name` configuration
parameters. To put it differently, two BentoML deployment services
are equivalent if they serve versions of the same model deployed by
the same pipeline step. If an equivalent BentoML deployment is found,
it will be updated in place to reflect the new configuration
parameters.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new BentoML deployment
server for each new model version. If multiple equivalent BentoML
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Args:
id: the UUID of the BentoML model deployer.
config: the configuration of the model to be deployed with BentoML.
timeout: the timeout in seconds to wait for the BentoML server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the BentoML
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML BentoML deployment service object that can be used to
interact with the BentoML model http server.
"""
service = self._create_new_service(
id=id, timeout=timeout, config=config
)
logger.info(f"Created a new BentoML deployment service: {service}")
return service
perform_start_model(self, service, timeout=60)
Method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to start. |
required |
timeout |
int |
Timeout in seconds to wait for the service to start. |
60 |
Returns:
Type | Description |
---|---|
BaseService |
The started service. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Method to start a model server.
Args:
service: The service to start.
timeout: Timeout in seconds to wait for the service to start.
Returns:
The started service.
"""
service.start(timeout=timeout)
return service
perform_stop_model(self, service, timeout=60, force=False)
Method to stop a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to stop. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
60 |
force |
bool |
If True, force the service to stop. |
False |
Returns:
Type | Description |
---|---|
BaseService |
The stopped service. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Method to stop a model server.
Args:
service: The service to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
Returns:
The stopped service.
"""
service.stop(timeout=timeout, force=force)
return service
services
special
Initialization for BentoML services.
bentoml_container_deployment
Implementation for the BentoML container deployment service.
BentoMLContainerDeploymentConfig (ContainerServiceConfig)
BentoML container deployment configuration.
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentConfig(ContainerServiceConfig):
"""BentoML container deployment configuration."""
model_name: str
model_uri: str
bento_tag: str
bento_uri: Optional[str] = None
platform: Optional[str] = None
image: str = ""
image_tag: Optional[str] = None
features: Optional[List[str]] = None
file: Optional[str] = None
apis: List[str] = []
working_dir: Optional[str] = None
workers: int = 1
backlog: int = 2048
host: Optional[str] = None
port: Optional[int] = None
BentoMLContainerDeploymentEndpoint (ContainerServiceEndpoint)
A service endpoint exposed by the BentoML container deployment service.
Attributes:
Name | Type | Description |
---|---|---|
config |
BentoMLContainerDeploymentEndpointConfig |
service endpoint configuration |
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentEndpoint(ContainerServiceEndpoint):
"""A service endpoint exposed by the BentoML container deployment service.
Attributes:
config: service endpoint configuration
"""
config: BentoMLContainerDeploymentEndpointConfig
@property
def prediction_url(self) -> Optional[str]:
"""Gets the prediction URL for the endpoint.
Returns:
the prediction URL for the endpoint
"""
uri = self.status.uri
if not uri:
return None
return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str]
property
readonly
Gets the prediction URL for the endpoint.
Returns:
Type | Description |
---|---|
Optional[str] |
the prediction URL for the endpoint |
BentoMLContainerDeploymentEndpointConfig (ContainerServiceEndpointConfig)
BentoML container deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
prediction_url_path |
str |
URI subpath for prediction requests |
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentEndpointConfig(ContainerServiceEndpointConfig):
"""BentoML container deployment service configuration.
Attributes:
prediction_url_path: URI subpath for prediction requests
"""
prediction_url_path: str
BentoMLContainerDeploymentService (ContainerService, BaseDeploymentService)
BentoML container deployment service.
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
class BentoMLContainerDeploymentService(
ContainerService, BaseDeploymentService
):
"""BentoML container deployment service."""
SERVICE_TYPE = ServiceType(
name=BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME,
type="model-serving",
flavor="bentoml",
description="BentoML container prediction service",
logo_url="https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png",
)
config: BentoMLContainerDeploymentConfig
endpoint: BentoMLContainerDeploymentEndpoint
def __init__(
self,
config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the BentoML deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for BentoML
# deployment services
if (
isinstance(config, BentoMLContainerDeploymentConfig)
and "endpoint" not in attrs
):
endpoint = BentoMLContainerDeploymentEndpoint(
config=BentoMLContainerDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port or BENTOML_DEFAULT_PORT,
ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
prediction_url_path=BENTOML_PREDICTION_URL_PATH,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
# override the is_running property to check if the bentoml container is running
@property
def is_running(self) -> bool:
"""Check if the service is currently running.
This method will actively poll the external service to get its status
and will return the result.
Returns:
True if the service is running and active (i.e. the endpoints are
responsive, if any are configured), otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.ACTIVE
# override the container start method to use the root user
def _start_container(self) -> None:
"""Start the service docker container associated with this service."""
container = self.container
if container:
# the container exists, check if it is running
if container.status == "running":
logger.debug(
"Container for service '%s' is already running",
self,
)
return
# the container is stopped or in an error state, remove it
logger.debug(
"Removing previous container for service '%s'",
self,
)
container.remove(force=True)
logger.debug("Starting container for service '%s'...", self)
try:
self.docker_client.images.get(self.config.image)
except docker_errors.ImageNotFound:
logger.debug(
"Pulling container image '%s' for service '%s'...",
self.config.image,
self,
)
self.docker_client.images.pull(self.config.image)
self._setup_runtime_path()
ports: Dict[int, Optional[int]] = {}
if self.endpoint:
self.endpoint.prepare_for_start()
if self.endpoint.status.port:
ports[self.endpoint.status.port] = self.endpoint.status.port
command, env = self._get_container_cmd()
volumes = self._get_container_volumes()
try:
container = self.docker_client.containers.run(
name=self.container_id,
image=self.config.image,
entrypoint=command,
detach=True,
volumes=volumes,
environment=env,
remove=False,
auto_remove=False,
ports=ports,
user="root",
labels={
"zenml-service-uuid": str(self.uuid),
},
working_dir="/home/bentoml/bento",
extra_hosts={"host.docker.internal": "host-gateway"},
)
logger.debug(
"Docker container for service '%s' started with ID: %s",
self,
self.container_id,
)
except docker_errors.DockerException as e:
logger.error(
"Docker container for service '%s' failed to start: %s",
self,
e,
)
def _containerize_and_push_bento(self) -> None:
"""Containerize the bento and push it to the container registry.
Raises:
Exception: If the bento containerization fails.
"""
zenml_client = ZenMLClient()
container_registry = zenml_client.active_stack.container_registry
# a tuple of config image and image tag
if self.config.image and self.config.image_tag:
image_tag = (self.config.image, self.config.image_tag)
else:
# if container registry is present in the stack, name the image
# with the container registry uri, else name the image with the bento tag
if container_registry:
image_name = (
f"{container_registry.config.uri}/{self.config.bento_tag}"
)
image_tag = (image_name,) # type: ignore
self.config.image = image_name
else:
# bentoml will use the bento tag as the name of the image
image_tag = (self.config.bento_tag,) # type: ignore
self.config.image = self.config.bento_tag
try:
bentoml.container.build(
bento_tag=self.config.bento_tag,
backend="docker", # hardcoding docker since container service only supports docker
image_tag=image_tag,
features=self.config.features,
file=self.config.file,
platform=self.config.platform,
)
except Exception as e:
logger.error(f"Error containerizing the bento: {e}")
raise e
if container_registry:
logger.info(
f"Pushing bento to container registry {container_registry.config.uri}"
)
# push the bento to the image registry
container_registry.push_image(self.config.image)
else:
logger.warning(
"No container registry found in the active stack. "
"Please add a container registry to your stack to push "
"the bento to an image registry."
)
def provision(self) -> None:
"""Provision the service."""
# containerize the bento
self._containerize_and_push_bento()
# run the container
super().provision()
def run(self) -> None:
"""Start the service.
Raises:
FileNotFoundError: If the bento file is not found.
subprocess.CalledProcessError: If the bentoml serve command fails.
"""
from bentoml._internal.service.loader import load
logger.info("Starting BentoML container deployment service...")
self.endpoint.prepare_for_start()
if self.config.working_dir is None:
if os.path.isdir(os.path.expanduser(self.config.bento_tag)):
self.config.working_dir = os.path.expanduser(
self.config.bento_tag
)
else:
self.config.working_dir = "."
if sys.path[0] != self.config.working_dir:
sys.path.insert(0, self.config.working_dir)
_ = load(bento_identifier=".", working_dir=self.config.working_dir)
# run bentoml serve command inside the container
# Use subprocess for better control and error handling
import subprocess
try:
subprocess.run(["bentoml", "serve"], check=True)
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start BentoML service: {e}")
raise
except FileNotFoundError:
logger.error(
"BentoML command not found. Make sure it's installed and in the PATH."
)
raise
@property
def prediction_url(self) -> Optional[str]:
"""Get the URI where the http server is running.
Returns:
The URI where the http service can be accessed to get more information
about the service and to make predictions.
"""
if not self.is_running:
return None
return self.endpoint.prediction_url
@property
def prediction_apis_urls(self) -> Optional[List[str]]:
"""Get the URI where the prediction api services is answering requests.
Returns:
The URI where the prediction service apis can be contacted to process
HTTP/REST inference requests, or None, if the service isn't running.
"""
if not self.is_running:
return None
if self.config.apis:
return [
f"{self.endpoint.prediction_url}/{api}"
for api in self.config.apis
]
return None
def predict(self, api_endpoint: str, data: Any) -> Any:
"""Make a prediction using the service.
Args:
data: data to make a prediction on
api_endpoint: the api endpoint to make the prediction on
Returns:
The prediction result.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"BentoML prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is not None:
client = Client.from_url(
self.endpoint.prediction_url.replace("http://", "").rstrip("/")
)
result = client.call(api_endpoint, data)
else:
raise ValueError("No endpoint known for prediction.")
return result
is_running: bool
property
readonly
Check if the service is currently running.
This method will actively poll the external service to get its status and will return the result.
Returns:
Type | Description |
---|---|
bool |
True if the service is running and active (i.e. the endpoints are responsive, if any are configured), otherwise False. |
prediction_apis_urls: Optional[List[str]]
property
readonly
Get the URI where the prediction api services is answering requests.
Returns:
Type | Description |
---|---|
Optional[List[str]] |
The URI where the prediction service apis can be contacted to process HTTP/REST inference requests, or None, if the service isn't running. |
prediction_url: Optional[str]
property
readonly
Get the URI where the http server is running.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI where the http service can be accessed to get more information about the service and to make predictions. |
__init__(self, config, **attrs)
special
Initialize the BentoML deployment service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[zenml.integrations.bentoml.services.bentoml_container_deployment.BentoMLContainerDeploymentConfig, Dict[str, Any]] |
service configuration |
required |
attrs |
Any |
additional attributes to set on the service |
{} |
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def __init__(
self,
config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the BentoML deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for BentoML
# deployment services
if (
isinstance(config, BentoMLContainerDeploymentConfig)
and "endpoint" not in attrs
):
endpoint = BentoMLContainerDeploymentEndpoint(
config=BentoMLContainerDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port or BENTOML_DEFAULT_PORT,
ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
prediction_url_path=BENTOML_PREDICTION_URL_PATH,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
model_post_init(/, self, context)
We need to both initialize private attributes and call the user-defined model_post_init method.
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def wrapped_model_post_init(self: BaseModel, context: Any, /) -> None:
"""We need to both initialize private attributes and call the user-defined model_post_init
method.
"""
init_private_attributes(self, context)
original_model_post_init(self, context)
predict(self, api_endpoint, data)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
data to make a prediction on |
required |
api_endpoint |
str |
the api endpoint to make the prediction on |
required |
Returns:
Type | Description |
---|---|
Any |
The prediction result. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not running |
ValueError |
if the prediction endpoint is unknown. |
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def predict(self, api_endpoint: str, data: Any) -> Any:
"""Make a prediction using the service.
Args:
data: data to make a prediction on
api_endpoint: the api endpoint to make the prediction on
Returns:
The prediction result.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"BentoML prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is not None:
client = Client.from_url(
self.endpoint.prediction_url.replace("http://", "").rstrip("/")
)
result = client.call(api_endpoint, data)
else:
raise ValueError("No endpoint known for prediction.")
return result
provision(self)
Provision the service.
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def provision(self) -> None:
"""Provision the service."""
# containerize the bento
self._containerize_and_push_bento()
# run the container
super().provision()
run(self)
Start the service.
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the bento file is not found. |
subprocess.CalledProcessError |
If the bentoml serve command fails. |
Source code in zenml/integrations/bentoml/services/bentoml_container_deployment.py
def run(self) -> None:
"""Start the service.
Raises:
FileNotFoundError: If the bento file is not found.
subprocess.CalledProcessError: If the bentoml serve command fails.
"""
from bentoml._internal.service.loader import load
logger.info("Starting BentoML container deployment service...")
self.endpoint.prepare_for_start()
if self.config.working_dir is None:
if os.path.isdir(os.path.expanduser(self.config.bento_tag)):
self.config.working_dir = os.path.expanduser(
self.config.bento_tag
)
else:
self.config.working_dir = "."
if sys.path[0] != self.config.working_dir:
sys.path.insert(0, self.config.working_dir)
_ = load(bento_identifier=".", working_dir=self.config.working_dir)
# run bentoml serve command inside the container
# Use subprocess for better control and error handling
import subprocess
try:
subprocess.run(["bentoml", "serve"], check=True)
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start BentoML service: {e}")
raise
except FileNotFoundError:
logger.error(
"BentoML command not found. Make sure it's installed and in the PATH."
)
raise
bentoml_local_deployment
Implementation for the BentoML local deployment service.
BentoMLDeploymentEndpoint (LocalDaemonServiceEndpoint)
A service endpoint exposed by the BentoML deployment daemon.
Attributes:
Name | Type | Description |
---|---|---|
config |
BentoMLDeploymentEndpointConfig |
service endpoint configuration |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLDeploymentEndpoint(LocalDaemonServiceEndpoint):
"""A service endpoint exposed by the BentoML deployment daemon.
Attributes:
config: service endpoint configuration
"""
config: BentoMLDeploymentEndpointConfig
monitor: HTTPEndpointHealthMonitor
@property
def prediction_url(self) -> Optional[str]:
"""Gets the prediction URL for the endpoint.
Returns:
the prediction URL for the endpoint
"""
uri = self.status.uri
if not uri:
return None
return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str]
property
readonly
Gets the prediction URL for the endpoint.
Returns:
Type | Description |
---|---|
Optional[str] |
the prediction URL for the endpoint |
BentoMLDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig)
BentoML deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
prediction_url_path |
str |
URI subpath for prediction requests |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
"""BentoML deployment service configuration.
Attributes:
prediction_url_path: URI subpath for prediction requests
"""
prediction_url_path: str
BentoMLLocalDeploymentConfig (LocalDaemonServiceConfig)
BentoML model deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
name of the model to deploy |
model_uri |
str |
URI of the model to deploy |
port |
Optional[int] |
port to expose the service on |
bento_tag |
str |
Bento package to deploy. A bento tag is a combination of the name of the bento and its version. |
workers |
int |
number of workers to use |
backlog |
int |
number of requests to queue |
production |
bool |
whether to run in production mode |
working_dir |
str |
working directory for the service |
host |
Optional[str] |
host to expose the service on |
ssl_parameters |
Optional[zenml.integrations.bentoml.services.bentoml_local_deployment.SSLBentoMLParametersConfig] |
SSL parameters for the Bentoml deployment |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLLocalDeploymentConfig(LocalDaemonServiceConfig):
"""BentoML model deployment configuration.
Attributes:
model_name: name of the model to deploy
model_uri: URI of the model to deploy
port: port to expose the service on
bento_tag: Bento package to deploy. A bento tag is a combination of the
name of the bento and its version.
workers: number of workers to use
backlog: number of requests to queue
production: whether to run in production mode
working_dir: working directory for the service
host: host to expose the service on
ssl_parameters: SSL parameters for the Bentoml deployment
"""
model_name: str
model_uri: str
bento_tag: str
bento_uri: Optional[str] = None
apis: List[str] = []
workers: int = 1
port: Optional[int] = None
backlog: int = 2048
production: bool = False
working_dir: str
host: Optional[str] = None
ssl_parameters: Optional[SSLBentoMLParametersConfig] = Field(
default_factory=SSLBentoMLParametersConfig
)
BentoMLLocalDeploymentService (LocalDaemonService, BaseDeploymentService)
BentoML deployment service used to start a local prediction server for BentoML models.
Attributes:
Name | Type | Description |
---|---|---|
SERVICE_TYPE |
ClassVar[zenml.services.service_type.ServiceType] |
a service type descriptor with information describing the BentoML deployment service class |
config |
BentoMLLocalDeploymentConfig |
service configuration |
endpoint |
BentoMLDeploymentEndpoint |
optional service endpoint |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class BentoMLLocalDeploymentService(LocalDaemonService, BaseDeploymentService):
"""BentoML deployment service used to start a local prediction server for BentoML models.
Attributes:
SERVICE_TYPE: a service type descriptor with information describing
the BentoML deployment service class
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name=BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME,
type="model-serving",
flavor="bentoml",
description="BentoML local prediction service",
logo_url="https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/bentoml.png",
)
config: BentoMLLocalDeploymentConfig
endpoint: BentoMLDeploymentEndpoint
def __init__(
self,
config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the BentoML deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for BentoML
# deployment services
if (
isinstance(config, BentoMLLocalDeploymentConfig)
and "endpoint" not in attrs
):
endpoint = BentoMLDeploymentEndpoint(
config=BentoMLDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port
if config.port is not None
else BENTOML_DEFAULT_PORT,
ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
prediction_url_path=BENTOML_PREDICTION_URL_PATH,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
def run(self) -> None:
"""Start the service."""
from bentoml import Service
from bentoml._internal.service.loader import load
logger.info(
"Starting BentoML prediction service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
# verify if to deploy in production mode or development mode
logger.info("Running in production mode.")
svc = load(
bento_identifier=self.config.bento_tag,
working_dir=self.config.working_dir or ".",
)
if isinstance(svc, Service):
# bentoml<1.2
from bentoml.serving import serve_http_production
try:
serve_http_production(
self.config.bento_tag,
working_dir=self.config.working_dir,
port=self.config.port,
api_workers=self.config.workers,
host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
backlog=self.config.backlog,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info("Stopping BentoML prediction service...")
else:
# bentoml>=1.2
from _bentoml_impl.server import serve_http # type: ignore
svc.inject_config()
try:
serve_http(
self.config.bento_tag,
working_dir=self.config.working_dir or ".",
host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
port=self.config.port,
backlog=self.config.backlog,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info("Stopping BentoML prediction service...")
@property
def prediction_url(self) -> Optional[str]:
"""Get the URI where the http server is running.
Returns:
The URI where the http service can be accessed to get more information
about the service and to make predictions.
"""
if not self.is_running:
return None
return self.endpoint.prediction_url
@property
def prediction_apis_urls(self) -> Optional[List[str]]:
"""Get the URI where the prediction api services is answering requests.
Returns:
The URI where the prediction service apis can be contacted to process
HTTP/REST inference requests, or None, if the service isn't running.
"""
if not self.is_running:
return None
if self.config.apis:
return [
f"{self.endpoint.prediction_url}/{api}"
for api in self.config.apis
]
return None
def predict(
self, api_endpoint: str, data: "Any", sync: bool = True
) -> "Any":
"""Make a prediction using the service.
Args:
data: data to make a prediction on
api_endpoint: the api endpoint to make the prediction on
sync: if set to False, the prediction will be made asynchronously
Returns:
The prediction result.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"BentoML prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is None:
raise ValueError("No endpoint known for prediction.")
if sync:
client = SyncHTTPClient(self.endpoint.prediction_url)
else:
client = AsyncHTTPClient(self.endpoint.prediction_url)
result = client.call(api_endpoint, data)
return result
prediction_apis_urls: Optional[List[str]]
property
readonly
Get the URI where the prediction api services is answering requests.
Returns:
Type | Description |
---|---|
Optional[List[str]] |
The URI where the prediction service apis can be contacted to process HTTP/REST inference requests, or None, if the service isn't running. |
prediction_url: Optional[str]
property
readonly
Get the URI where the http server is running.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI where the http service can be accessed to get more information about the service and to make predictions. |
__init__(self, config, **attrs)
special
Initialize the BentoML deployment service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[zenml.integrations.bentoml.services.bentoml_local_deployment.BentoMLLocalDeploymentConfig, Dict[str, Any]] |
service configuration |
required |
attrs |
Any |
additional attributes to set on the service |
{} |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
def __init__(
self,
config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the BentoML deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for BentoML
# deployment services
if (
isinstance(config, BentoMLLocalDeploymentConfig)
and "endpoint" not in attrs
):
endpoint = BentoMLDeploymentEndpoint(
config=BentoMLDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port
if config.port is not None
else BENTOML_DEFAULT_PORT,
ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
prediction_url_path=BENTOML_PREDICTION_URL_PATH,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
predict(self, api_endpoint, data, sync=True)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
data to make a prediction on |
required |
api_endpoint |
str |
the api endpoint to make the prediction on |
required |
sync |
bool |
if set to False, the prediction will be made asynchronously |
True |
Returns:
Type | Description |
---|---|
Any |
The prediction result. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not running |
ValueError |
if the prediction endpoint is unknown. |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
def predict(
self, api_endpoint: str, data: "Any", sync: bool = True
) -> "Any":
"""Make a prediction using the service.
Args:
data: data to make a prediction on
api_endpoint: the api endpoint to make the prediction on
sync: if set to False, the prediction will be made asynchronously
Returns:
The prediction result.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"BentoML prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is None:
raise ValueError("No endpoint known for prediction.")
if sync:
client = SyncHTTPClient(self.endpoint.prediction_url)
else:
client = AsyncHTTPClient(self.endpoint.prediction_url)
result = client.call(api_endpoint, data)
return result
run(self)
Start the service.
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
def run(self) -> None:
"""Start the service."""
from bentoml import Service
from bentoml._internal.service.loader import load
logger.info(
"Starting BentoML prediction service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
# verify if to deploy in production mode or development mode
logger.info("Running in production mode.")
svc = load(
bento_identifier=self.config.bento_tag,
working_dir=self.config.working_dir or ".",
)
if isinstance(svc, Service):
# bentoml<1.2
from bentoml.serving import serve_http_production
try:
serve_http_production(
self.config.bento_tag,
working_dir=self.config.working_dir,
port=self.config.port,
api_workers=self.config.workers,
host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
backlog=self.config.backlog,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info("Stopping BentoML prediction service...")
else:
# bentoml>=1.2
from _bentoml_impl.server import serve_http # type: ignore
svc.inject_config()
try:
serve_http(
self.config.bento_tag,
working_dir=self.config.working_dir or ".",
host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
port=self.config.port,
backlog=self.config.backlog,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info("Stopping BentoML prediction service...")
SSLBentoMLParametersConfig (BaseModel)
BentoML SSL parameters configuration.
Attributes:
Name | Type | Description |
---|---|---|
ssl_certfile |
Optional[str] |
SSL certificate file |
ssl_keyfile |
Optional[str] |
SSL key file |
ssl_keyfile_password |
Optional[str] |
SSL key file password |
ssl_version |
Optional[int] |
SSL version |
ssl_cert_reqs |
Optional[int] |
SSL certificate requirements |
ssl_ca_certs |
Optional[str] |
SSL CA certificates |
ssl_ciphers |
Optional[str] |
SSL ciphers |
Source code in zenml/integrations/bentoml/services/bentoml_local_deployment.py
class SSLBentoMLParametersConfig(BaseModel):
"""BentoML SSL parameters configuration.
Attributes:
ssl_certfile: SSL certificate file
ssl_keyfile: SSL key file
ssl_keyfile_password: SSL key file password
ssl_version: SSL version
ssl_cert_reqs: SSL certificate requirements
ssl_ca_certs: SSL CA certificates
ssl_ciphers: SSL ciphers
"""
ssl_certfile: Optional[str] = None
ssl_keyfile: Optional[str] = None
ssl_keyfile_password: Optional[str] = None
ssl_version: Optional[int] = None
ssl_cert_reqs: Optional[int] = None
ssl_ca_certs: Optional[str] = None
ssl_ciphers: Optional[str] = None
deployment_type
BentoML Service Deployment Types.
BentoMLDeploymentType (Enum)
BentoML Service Deployment Types.
Source code in zenml/integrations/bentoml/services/deployment_type.py
class BentoMLDeploymentType(Enum):
"""BentoML Service Deployment Types."""
LOCAL = "local"
CONTAINER = "container"
steps
special
Initialization of the BentoML standard interface steps.
bento_builder
Implementation of the BentoML bento builder step.
bentoml_deployer
Implementation of the BentoML model deployer pipeline step.