Skip to content

Bentoml

zenml.integrations.bentoml

Initialization of the BentoML integration for ZenML.

The BentoML integration allows you to use the BentoML model serving to implement continuous model deployment.

Attributes

BENTOML = 'bentoml' module-attribute

BENTOML_MODEL_DEPLOYER_FLAVOR = 'bentoml' module-attribute

Classes

BentoMLIntegration

Bases: Integration

Definition of BentoML integration for ZenML.

Functions
activate() -> None classmethod

Activate the BentoML integration.

Source code in src/zenml/integrations/bentoml/__init__.py
36
37
38
39
40
41
@classmethod
def activate(cls) -> None:
    """Activate the BentoML integration."""
    from zenml.integrations.bentoml import materializers  # noqa
    from zenml.integrations.bentoml import model_deployers  # noqa
    from zenml.integrations.bentoml import services  # noqa
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for BentoML.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/bentoml/__init__.py
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for BentoML.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.bentoml.flavors import (
        BentoMLModelDeployerFlavor,
    )

    return [BentoMLModelDeployerFlavor]

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

constants

BentoML constants.

flavors

BentoML integration flavors.

Classes
BentoMLModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig

Configuration for the BentoMLModelDeployer.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
BentoMLModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Flavor for the BentoML model deployer.

Attributes
config_class: Type[BentoMLModelDeployerConfig] property

Returns BentoMLModelDeployerConfig config class.

Returns:

Type Description
Type[BentoMLModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[BentoMLModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[BentoMLModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Modules
bentoml_model_deployer_flavor

BentoML model deployer flavor.

Classes
BentoMLModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig

Configuration for the BentoMLModelDeployer.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
BentoMLModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Flavor for the BentoML model deployer.

Attributes
config_class: Type[BentoMLModelDeployerConfig] property

Returns BentoMLModelDeployerConfig config class.

Returns:

Type Description
Type[BentoMLModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[BentoMLModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[BentoMLModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

materializers

Initialization of the BentoML Bento Materializer.

Classes
Modules
bentoml_bento_materializer

Materializer for BentoML Bento objects.

Classes
BentoMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer for Bentoml Bento objects.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
extract_metadata(bento: bento.Bento) -> Dict[str, MetadataType]

Extract metadata from the given Bento object.

Parameters:

Name Type Description Default
bento Bento

The Bento object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def extract_metadata(
    self, bento: bento.Bento
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `Bento` object.

    Args:
        bento: The `Bento` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "bento_info_name": bento.info.name,
        "bento_info_version": bento.info.version,
        "bento_tag_name": bento.tag.name,
        "bentoml_version": bento.info.bentoml_version,
    }
load(data_type: Type[bento.Bento]) -> bento.Bento

Read from artifact store and return a Bento object.

Parameters:

Name Type Description Default
data_type Type[Bento]

An bento.Bento type.

required

Returns:

Type Description
Bento

An bento.Bento object.

Source code in src/zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def load(self, data_type: Type[bento.Bento]) -> bento.Bento:
    """Read from artifact store and return a Bento object.

    Args:
        data_type: An bento.Bento type.

    Returns:
        An bento.Bento object.
    """
    with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
        # Copy from artifact store to temporary directory
        io_utils.copy_dir(self.uri, temp_dir)

        # Load the Bento from the temporary directory
        imported_bento = Bento.import_from(
            os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
        )

        # Try save the Bento to the local BentoML store
        try:
            _ = bentoml.get(imported_bento.tag)
        except BentoMLException:
            imported_bento.save()
        return imported_bento
save(bento: bento.Bento) -> None

Write to artifact store.

Parameters:

Name Type Description Default
bento Bento

An bento.Bento object.

required
Source code in src/zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
66
67
68
69
70
71
72
73
74
75
def save(self, bento: bento.Bento) -> None:
    """Write to artifact store.

    Args:
        bento: An bento.Bento object.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        temp_bento_path = os.path.join(temp_dir, DEFAULT_BENTO_FILENAME)
        bentoml.export_bento(bento.tag, temp_bento_path)
        io_utils.copy_dir(temp_dir, self.uri)
Functions Modules

model_deployers

Initialization of the BentoML Model Deployer.

Classes
BentoMLModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

BentoML model deployer stack component implementation.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: BentoMLModelDeployerConfig property

Returns the BentoMLModelDeployerConfig config.

Returns:

Type Description
BentoMLModelDeployerConfig

The configuration.

local_path: str property

Returns the path to the root directory.

This is where all configurations for BentoML deployment daemon processes are stored.

If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.

Returns:

Type Description
str

The path to the local service root directory.

Functions
get_model_server_info(service_instance: BaseService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance BaseService

BentoML deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Raises:

Type Description
ValueError

If the service type is not supported.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@staticmethod
def get_model_server_info(
    service_instance: BaseService,
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: BentoML deployment service object

    Returns:
        A dictionary containing the model server information.

    Raises:
        ValueError: If the service type is not supported.
    """
    if (
        service_instance.SERVICE_TYPE.name
        == BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME
    ):
        service_instance = cast(
            BentoMLContainerDeploymentService, service_instance
        )
    elif (
        service_instance.SERVICE_TYPE.name
        == BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME
    ):
        service_instance = cast(
            BentoMLLocalDeploymentService, service_instance
        )
    else:
        raise ValueError(
            f"Unsupported service type: {service_instance.SERVICE_TYPE.name}"
        )

    predictions_apis_urls = ""
    if service_instance.prediction_apis_urls is not None:  # type: ignore
        predictions_apis_urls = ", ".join(
            [
                api
                for api in service_instance.prediction_apis_urls  # type: ignore
                if api is not None
            ]
        )

    service_config = service_instance.config
    assert isinstance(
        service_config,
        (BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
    )

    service_status = service_instance.status
    assert isinstance(
        service_status, (ContainerServiceStatus, LocalDaemonServiceStatus)
    )

    return {
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "BENTO_TAG": service_config.bento_tag,
        "MODEL_NAME": service_config.model_name,
        "MODEL_URI": service_config.model_uri,
        "BENTO_URI": service_config.bento_uri,
        "SERVICE_PATH": service_status.runtime_path,
        "DAEMON_PID": str(service_status.pid)
        if hasattr(service_status, "pid")
        else None,
        "PREDICTION_APIS_URLS": predictions_apis_urls,
    }
get_service_path(id_: UUID) -> str staticmethod

Get the path where local BentoML service information is stored.

This includes the deployment service configuration, PID and log files are stored.

Parameters:

Name Type Description Default
id_ UUID

The ID of the BentoML model deployer.

required

Returns:

Type Description
str

The service path.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@staticmethod
def get_service_path(id_: UUID) -> str:
    """Get the path where local BentoML service information is stored.

    This includes the deployment service configuration, PID and log files
    are stored.

    Args:
        id_: The ID of the BentoML model deployer.

    Returns:
        The service path.
    """
    service_path = os.path.join(
        GlobalConfiguration().local_stores_path,
        str(id_),
    )
    create_dir_recursive_if_not_exists(service_path)
    return service_path
perform_delete_model(service: BaseService, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT, force: bool = False) -> None

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_SERVICE_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False
Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT) -> BaseService

Create a new BentoML deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new BentoML deployment server to reflect the model and other configuration parameters specified in the supplied BentoML service config.

  • if replace is True, this method will first attempt to find an existing BentoML deployment service that is equivalent to the supplied configuration parameters. Two or more BentoML deployment services are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two BentoML deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent BentoML deployment is found, it will be updated in place to reflect the new configuration parameters.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new BentoML deployment server for each new model version. If multiple equivalent BentoML deployment servers are found, one is selected at random to be updated and the others are deleted.

Parameters:

Name Type Description Default
id UUID

the UUID of the BentoML model deployer.

required
config ServiceConfig

the configuration of the model to be deployed with BentoML.

required
timeout int

the timeout in seconds to wait for the BentoML server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the BentoML server is provisioned, without waiting for it to fully start.

DEFAULT_SERVICE_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML BentoML deployment service object that can be used to

BaseService

interact with the BentoML model http server.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new BentoML deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new BentoML
        deployment server to reflect the model and other configuration
        parameters specified in the supplied BentoML service `config`.

      * if `replace` is True, this method will first attempt to find an
        existing BentoML deployment service that is *equivalent* to the
        supplied configuration parameters. Two or more BentoML deployment
        services are considered equivalent if they have the same
        `pipeline_name`, `pipeline_step_name` and `model_name` configuration
        parameters. To put it differently, two BentoML deployment services
        are equivalent if they serve versions of the same model deployed by
        the same pipeline step. If an equivalent BentoML deployment is found,
        it will be updated in place to reflect the new configuration
        parameters.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new BentoML deployment
    server for each new model version. If multiple equivalent BentoML
    deployment servers are found, one is selected at random to be updated
    and the others are deleted.

    Args:
        id: the UUID of the BentoML model deployer.
        config: the configuration of the model to be deployed with BentoML.
        timeout: the timeout in seconds to wait for the BentoML server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the BentoML
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML BentoML deployment service object that can be used to
        interact with the BentoML model http server.
    """
    service = self._create_new_service(
        id=id, timeout=timeout, config=config
    )
    logger.info(f"Created a new BentoML deployment service: {service}")
    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT) -> BaseService

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

DEFAULT_SERVICE_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The started service.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(service: BaseService, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_SERVICE_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service
Modules
bentoml_model_deployer

Implementation of the BentoML Model Deployer.

Classes
BentoMLModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

BentoML model deployer stack component implementation.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: BentoMLModelDeployerConfig property

Returns the BentoMLModelDeployerConfig config.

Returns:

Type Description
BentoMLModelDeployerConfig

The configuration.

local_path: str property

Returns the path to the root directory.

This is where all configurations for BentoML deployment daemon processes are stored.

If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.

Returns:

Type Description
str

The path to the local service root directory.

Functions
get_model_server_info(service_instance: BaseService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance BaseService

BentoML deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Raises:

Type Description
ValueError

If the service type is not supported.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@staticmethod
def get_model_server_info(
    service_instance: BaseService,
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: BentoML deployment service object

    Returns:
        A dictionary containing the model server information.

    Raises:
        ValueError: If the service type is not supported.
    """
    if (
        service_instance.SERVICE_TYPE.name
        == BENTOML_CONTAINER_DEPLOYMENT_SERVICE_NAME
    ):
        service_instance = cast(
            BentoMLContainerDeploymentService, service_instance
        )
    elif (
        service_instance.SERVICE_TYPE.name
        == BENTOML_LOCAL_DEPLOYMENT_SERVICE_NAME
    ):
        service_instance = cast(
            BentoMLLocalDeploymentService, service_instance
        )
    else:
        raise ValueError(
            f"Unsupported service type: {service_instance.SERVICE_TYPE.name}"
        )

    predictions_apis_urls = ""
    if service_instance.prediction_apis_urls is not None:  # type: ignore
        predictions_apis_urls = ", ".join(
            [
                api
                for api in service_instance.prediction_apis_urls  # type: ignore
                if api is not None
            ]
        )

    service_config = service_instance.config
    assert isinstance(
        service_config,
        (BentoMLLocalDeploymentConfig, BentoMLContainerDeploymentConfig),
    )

    service_status = service_instance.status
    assert isinstance(
        service_status, (ContainerServiceStatus, LocalDaemonServiceStatus)
    )

    return {
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "BENTO_TAG": service_config.bento_tag,
        "MODEL_NAME": service_config.model_name,
        "MODEL_URI": service_config.model_uri,
        "BENTO_URI": service_config.bento_uri,
        "SERVICE_PATH": service_status.runtime_path,
        "DAEMON_PID": str(service_status.pid)
        if hasattr(service_status, "pid")
        else None,
        "PREDICTION_APIS_URLS": predictions_apis_urls,
    }
get_service_path(id_: UUID) -> str staticmethod

Get the path where local BentoML service information is stored.

This includes the deployment service configuration, PID and log files are stored.

Parameters:

Name Type Description Default
id_ UUID

The ID of the BentoML model deployer.

required

Returns:

Type Description
str

The service path.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@staticmethod
def get_service_path(id_: UUID) -> str:
    """Get the path where local BentoML service information is stored.

    This includes the deployment service configuration, PID and log files
    are stored.

    Args:
        id_: The ID of the BentoML model deployer.

    Returns:
        The service path.
    """
    service_path = os.path.join(
        GlobalConfiguration().local_stores_path,
        str(id_),
    )
    create_dir_recursive_if_not_exists(service_path)
    return service_path
perform_delete_model(service: BaseService, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT, force: bool = False) -> None

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_SERVICE_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False
Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT) -> BaseService

Create a new BentoML deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new BentoML deployment server to reflect the model and other configuration parameters specified in the supplied BentoML service config.

  • if replace is True, this method will first attempt to find an existing BentoML deployment service that is equivalent to the supplied configuration parameters. Two or more BentoML deployment services are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two BentoML deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent BentoML deployment is found, it will be updated in place to reflect the new configuration parameters.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new BentoML deployment server for each new model version. If multiple equivalent BentoML deployment servers are found, one is selected at random to be updated and the others are deleted.

Parameters:

Name Type Description Default
id UUID

the UUID of the BentoML model deployer.

required
config ServiceConfig

the configuration of the model to be deployed with BentoML.

required
timeout int

the timeout in seconds to wait for the BentoML server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the BentoML server is provisioned, without waiting for it to fully start.

DEFAULT_SERVICE_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML BentoML deployment service object that can be used to

BaseService

interact with the BentoML model http server.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new BentoML deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new BentoML
        deployment server to reflect the model and other configuration
        parameters specified in the supplied BentoML service `config`.

      * if `replace` is True, this method will first attempt to find an
        existing BentoML deployment service that is *equivalent* to the
        supplied configuration parameters. Two or more BentoML deployment
        services are considered equivalent if they have the same
        `pipeline_name`, `pipeline_step_name` and `model_name` configuration
        parameters. To put it differently, two BentoML deployment services
        are equivalent if they serve versions of the same model deployed by
        the same pipeline step. If an equivalent BentoML deployment is found,
        it will be updated in place to reflect the new configuration
        parameters.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new BentoML deployment
    server for each new model version. If multiple equivalent BentoML
    deployment servers are found, one is selected at random to be updated
    and the others are deleted.

    Args:
        id: the UUID of the BentoML model deployer.
        config: the configuration of the model to be deployed with BentoML.
        timeout: the timeout in seconds to wait for the BentoML server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the BentoML
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML BentoML deployment service object that can be used to
        interact with the BentoML model http server.
    """
    service = self._create_new_service(
        id=id, timeout=timeout, config=config
    )
    logger.info(f"Created a new BentoML deployment service: {service}")
    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT) -> BaseService

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

DEFAULT_SERVICE_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The started service.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(service: BaseService, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_SERVICE_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in src/zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service
Functions

services

Initialization for BentoML services.

Classes
BentoMLContainerDeploymentConfig(**data: Any)

Bases: ContainerServiceConfig

BentoML container deployment configuration.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
BentoMLContainerDeploymentService(config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]], **attrs: Any)

Bases: ContainerService, BaseDeploymentService

BentoML container deployment service.

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[BentoMLContainerDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLContainerDeploymentConfig)
        and "endpoint" not in attrs
    ):
        endpoint = BentoMLContainerDeploymentEndpoint(
            config=BentoMLContainerDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port or BENTOML_DEFAULT_PORT,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
Attributes
is_running: bool property

Check if the service is currently running.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is running and active (i.e. the endpoints are

bool

responsive, if any are configured), otherwise False.

prediction_apis_urls: Optional[List[str]] property

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process

Optional[List[str]]

HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information

Optional[str]

about the service and to make predictions.

Functions
predict(api_endpoint: str, data: Any) -> Any

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required

Returns:

Type Description
Any

The prediction result.

Raises:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def predict(self, api_endpoint: str, data: Any) -> Any:
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is not None:
        client = Client.from_url(
            self.endpoint.prediction_url.replace("http://", "").rstrip("/")
        )
        result = client.call(api_endpoint, data)
    else:
        raise ValueError("No endpoint known for prediction.")
    return result
provision() -> None

Provision the service.

Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
297
298
299
300
301
302
def provision(self) -> None:
    """Provision the service."""
    # containerize the bento
    self._containerize_and_push_bento()
    # run the container
    super().provision()
run() -> None

Start the service.

Raises:

Type Description
FileNotFoundError

If the bento file is not found.

CalledProcessError

If the bentoml serve command fails.

Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def run(self) -> None:
    """Start the service.

    Raises:
        FileNotFoundError: If the bento file is not found.
        subprocess.CalledProcessError: If the bentoml serve command fails.
    """
    from bentoml._internal.service.loader import load

    logger.info("Starting BentoML container deployment service...")

    self.endpoint.prepare_for_start()

    if self.config.working_dir is None:
        if os.path.isdir(os.path.expanduser(self.config.bento_tag)):
            self.config.working_dir = os.path.expanduser(
                self.config.bento_tag
            )
        else:
            self.config.working_dir = "."
    if sys.path[0] != self.config.working_dir:
        sys.path.insert(0, self.config.working_dir)

    _ = load(bento_identifier=".", working_dir=self.config.working_dir)
    # run bentoml serve command inside the container
    # Use subprocess for better control and error handling
    import subprocess

    try:
        subprocess.run(["bentoml", "serve"], check=True)
    except subprocess.CalledProcessError as e:
        logger.error(f"Failed to start BentoML service: {e}")
        raise
    except FileNotFoundError:
        logger.error(
            "BentoML command not found. Make sure it's installed and in the PATH."
        )
        raise
BentoMLDeploymentType

Bases: Enum

BentoML Service Deployment Types.

BentoMLLocalDeploymentConfig(**data: Any)

Bases: LocalDaemonServiceConfig

BentoML model deployment configuration.

Attributes:

Name Type Description
model_name str

name of the model to deploy

model_uri str

URI of the model to deploy

port Optional[int]

port to expose the service on

bento_tag str

Bento package to deploy. A bento tag is a combination of the name of the bento and its version.

workers int

number of workers to use

backlog int

number of requests to queue

production bool

whether to run in production mode

working_dir str

working directory for the service

host Optional[str]

host to expose the service on

ssl_parameters Optional[SSLBentoMLParametersConfig]

SSL parameters for the Bentoml deployment

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
BentoMLLocalDeploymentService(config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]], **attrs: Any)

Bases: LocalDaemonService, BaseDeploymentService

BentoML deployment service used to start a local prediction server for BentoML models.

Attributes:

Name Type Description
SERVICE_TYPE

a service type descriptor with information describing the BentoML deployment service class

config BentoMLLocalDeploymentConfig

service configuration

endpoint BentoMLDeploymentEndpoint

optional service endpoint

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[BentoMLLocalDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in src/zenml/integrations/bentoml/services/bentoml_local_deployment.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(
    self,
    config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLLocalDeploymentConfig)
        and "endpoint" not in attrs
    ):
        endpoint = BentoMLDeploymentEndpoint(
            config=BentoMLDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port
                if config.port is not None
                else BENTOML_DEFAULT_PORT,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
Attributes
prediction_apis_urls: Optional[List[str]] property

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process

Optional[List[str]]

HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information

Optional[str]

about the service and to make predictions.

Functions
predict(api_endpoint: str, data: Any, sync: bool = True) -> Any

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required
sync bool

if set to False, the prediction will be made asynchronously

True

Returns:

Type Description
Any

The prediction result.

Raises:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in src/zenml/integrations/bentoml/services/bentoml_local_deployment.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def predict(
    self, api_endpoint: str, data: "Any", sync: bool = True
) -> "Any":
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on
        sync: if set to False, the prediction will be made asynchronously

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is None:
        raise ValueError("No endpoint known for prediction.")
    if sync:
        client = SyncHTTPClient(self.endpoint.prediction_url)
    else:
        client = AsyncHTTPClient(self.endpoint.prediction_url)
    result = client.call(api_endpoint, data)
    return result
run() -> None

Start the service.

Source code in src/zenml/integrations/bentoml/services/bentoml_local_deployment.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def run(self) -> None:
    """Start the service."""
    from bentoml import Service
    from bentoml._internal.service.loader import load

    logger.info(
        "Starting BentoML prediction service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()
    ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
    # verify if to deploy in production mode or development mode
    logger.info("Running in production mode.")
    svc = load(
        bento_identifier=self.config.bento_tag,
        working_dir=self.config.working_dir or ".",
    )

    if isinstance(svc, Service):
        # bentoml<1.2
        from bentoml.serving import serve_http_production

        try:
            serve_http_production(
                self.config.bento_tag,
                working_dir=self.config.working_dir,
                port=self.config.port,
                api_workers=self.config.workers,
                host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                backlog=self.config.backlog,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info("Stopping BentoML prediction service...")
    else:
        # bentoml>=1.2
        from _bentoml_impl.server import serve_http  # type: ignore

        svc.inject_config()
        try:
            serve_http(
                self.config.bento_tag,
                working_dir=self.config.working_dir or ".",
                host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                port=self.config.port,
                backlog=self.config.backlog,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info("Stopping BentoML prediction service...")
Modules
bentoml_container_deployment

Implementation for the BentoML container deployment service.

Classes
BentoMLContainerDeploymentConfig(**data: Any)

Bases: ContainerServiceConfig

BentoML container deployment configuration.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
BentoMLContainerDeploymentEndpoint(*args: Any, **kwargs: Any)

Bases: ContainerServiceEndpoint

A service endpoint exposed by the BentoML container deployment service.

Attributes:

Name Type Description
config BentoMLContainerDeploymentEndpointConfig

service endpoint configuration

Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
prediction_url: Optional[str] property

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

BentoMLContainerDeploymentEndpointConfig

Bases: ContainerServiceEndpointConfig

BentoML container deployment service configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

BentoMLContainerDeploymentService(config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]], **attrs: Any)

Bases: ContainerService, BaseDeploymentService

BentoML container deployment service.

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[BentoMLContainerDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    config: Union[BentoMLContainerDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLContainerDeploymentConfig)
        and "endpoint" not in attrs
    ):
        endpoint = BentoMLContainerDeploymentEndpoint(
            config=BentoMLContainerDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port or BENTOML_DEFAULT_PORT,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
Attributes
is_running: bool property

Check if the service is currently running.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is running and active (i.e. the endpoints are

bool

responsive, if any are configured), otherwise False.

prediction_apis_urls: Optional[List[str]] property

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process

Optional[List[str]]

HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information

Optional[str]

about the service and to make predictions.

Functions
predict(api_endpoint: str, data: Any) -> Any

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required

Returns:

Type Description
Any

The prediction result.

Raises:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def predict(self, api_endpoint: str, data: Any) -> Any:
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is not None:
        client = Client.from_url(
            self.endpoint.prediction_url.replace("http://", "").rstrip("/")
        )
        result = client.call(api_endpoint, data)
    else:
        raise ValueError("No endpoint known for prediction.")
    return result
provision() -> None

Provision the service.

Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
297
298
299
300
301
302
def provision(self) -> None:
    """Provision the service."""
    # containerize the bento
    self._containerize_and_push_bento()
    # run the container
    super().provision()
run() -> None

Start the service.

Raises:

Type Description
FileNotFoundError

If the bento file is not found.

CalledProcessError

If the bentoml serve command fails.

Source code in src/zenml/integrations/bentoml/services/bentoml_container_deployment.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def run(self) -> None:
    """Start the service.

    Raises:
        FileNotFoundError: If the bento file is not found.
        subprocess.CalledProcessError: If the bentoml serve command fails.
    """
    from bentoml._internal.service.loader import load

    logger.info("Starting BentoML container deployment service...")

    self.endpoint.prepare_for_start()

    if self.config.working_dir is None:
        if os.path.isdir(os.path.expanduser(self.config.bento_tag)):
            self.config.working_dir = os.path.expanduser(
                self.config.bento_tag
            )
        else:
            self.config.working_dir = "."
    if sys.path[0] != self.config.working_dir:
        sys.path.insert(0, self.config.working_dir)

    _ = load(bento_identifier=".", working_dir=self.config.working_dir)
    # run bentoml serve command inside the container
    # Use subprocess for better control and error handling
    import subprocess

    try:
        subprocess.run(["bentoml", "serve"], check=True)
    except subprocess.CalledProcessError as e:
        logger.error(f"Failed to start BentoML service: {e}")
        raise
    except FileNotFoundError:
        logger.error(
            "BentoML command not found. Make sure it's installed and in the PATH."
        )
        raise
Functions
bentoml_local_deployment

Implementation for the BentoML local deployment service.

Classes
BentoMLDeploymentEndpoint(*args: Any, **kwargs: Any)

Bases: LocalDaemonServiceEndpoint

A service endpoint exposed by the BentoML deployment daemon.

Attributes:

Name Type Description
config BentoMLDeploymentEndpointConfig

service endpoint configuration

Source code in src/zenml/services/service_endpoint.py
111
112
113
114
115
116
117
118
119
120
121
122
123
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the service endpoint.

    Args:
        *args: positional arguments.
        **kwargs: keyword arguments.
    """
    super().__init__(*args, **kwargs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
prediction_url: Optional[str] property

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

BentoMLDeploymentEndpointConfig

Bases: LocalDaemonServiceEndpointConfig

BentoML deployment service configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

BentoMLLocalDeploymentConfig(**data: Any)

Bases: LocalDaemonServiceConfig

BentoML model deployment configuration.

Attributes:

Name Type Description
model_name str

name of the model to deploy

model_uri str

URI of the model to deploy

port Optional[int]

port to expose the service on

bento_tag str

Bento package to deploy. A bento tag is a combination of the name of the bento and its version.

workers int

number of workers to use

backlog int

number of requests to queue

production bool

whether to run in production mode

working_dir str

working directory for the service

host Optional[str]

host to expose the service on

ssl_parameters Optional[SSLBentoMLParametersConfig]

SSL parameters for the Bentoml deployment

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
BentoMLLocalDeploymentService(config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]], **attrs: Any)

Bases: LocalDaemonService, BaseDeploymentService

BentoML deployment service used to start a local prediction server for BentoML models.

Attributes:

Name Type Description
SERVICE_TYPE

a service type descriptor with information describing the BentoML deployment service class

config BentoMLLocalDeploymentConfig

service configuration

endpoint BentoMLDeploymentEndpoint

optional service endpoint

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[BentoMLLocalDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in src/zenml/integrations/bentoml/services/bentoml_local_deployment.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def __init__(
    self,
    config: Union[BentoMLLocalDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLLocalDeploymentConfig)
        and "endpoint" not in attrs
    ):
        endpoint = BentoMLDeploymentEndpoint(
            config=BentoMLDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port
                if config.port is not None
                else BENTOML_DEFAULT_PORT,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
Attributes
prediction_apis_urls: Optional[List[str]] property

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process

Optional[List[str]]

HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information

Optional[str]

about the service and to make predictions.

Functions
predict(api_endpoint: str, data: Any, sync: bool = True) -> Any

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required
sync bool

if set to False, the prediction will be made asynchronously

True

Returns:

Type Description
Any

The prediction result.

Raises:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in src/zenml/integrations/bentoml/services/bentoml_local_deployment.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def predict(
    self, api_endpoint: str, data: "Any", sync: bool = True
) -> "Any":
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on
        sync: if set to False, the prediction will be made asynchronously

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is None:
        raise ValueError("No endpoint known for prediction.")
    if sync:
        client = SyncHTTPClient(self.endpoint.prediction_url)
    else:
        client = AsyncHTTPClient(self.endpoint.prediction_url)
    result = client.call(api_endpoint, data)
    return result
run() -> None

Start the service.

Source code in src/zenml/integrations/bentoml/services/bentoml_local_deployment.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def run(self) -> None:
    """Start the service."""
    from bentoml import Service
    from bentoml._internal.service.loader import load

    logger.info(
        "Starting BentoML prediction service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()
    ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
    # verify if to deploy in production mode or development mode
    logger.info("Running in production mode.")
    svc = load(
        bento_identifier=self.config.bento_tag,
        working_dir=self.config.working_dir or ".",
    )

    if isinstance(svc, Service):
        # bentoml<1.2
        from bentoml.serving import serve_http_production

        try:
            serve_http_production(
                self.config.bento_tag,
                working_dir=self.config.working_dir,
                port=self.config.port,
                api_workers=self.config.workers,
                host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                backlog=self.config.backlog,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info("Stopping BentoML prediction service...")
    else:
        # bentoml>=1.2
        from _bentoml_impl.server import serve_http  # type: ignore

        svc.inject_config()
        try:
            serve_http(
                self.config.bento_tag,
                working_dir=self.config.working_dir or ".",
                host=self.config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                port=self.config.port,
                backlog=self.config.backlog,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info("Stopping BentoML prediction service...")
SSLBentoMLParametersConfig

Bases: BaseModel

BentoML SSL parameters configuration.

Attributes:

Name Type Description
ssl_certfile Optional[str]

SSL certificate file

ssl_keyfile Optional[str]

SSL key file

ssl_keyfile_password Optional[str]

SSL key file password

ssl_version Optional[int]

SSL version

ssl_cert_reqs Optional[int]

SSL certificate requirements

ssl_ca_certs Optional[str]

SSL CA certificates

ssl_ciphers Optional[str]

SSL ciphers

Functions
deployment_type

BentoML Service Deployment Types.

Classes
BentoMLDeploymentType

Bases: Enum

BentoML Service Deployment Types.

steps

Initialization of the BentoML standard interface steps.

Functions
bento_builder_step(model: UnmaterializedArtifact, model_name: str, model_type: str, service: str, version: Optional[str] = None, labels: Optional[Dict[str, str]] = None, description: Optional[str] = None, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None, python: Optional[Dict[str, Any]] = None, docker: Optional[Dict[str, Any]] = None, working_dir: Optional[str] = None) -> bento.Bento

Build a BentoML Model and Bento bundle.

This steps takes a model artifact of a trained or loaded ML model in a previous step and save it with BentoML, then build a BentoML bundle.

Parameters:

Name Type Description Default
model UnmaterializedArtifact

the model to be packaged.

required
model_name str

the name of the model to be packaged.

required
model_type str

the type of the model.

required
service str

the name of the BentoML service to be deployed.

required
version Optional[str]

the version of the model if given.

None
labels Optional[Dict[str, str]]

the labels of the model if given.

None
description Optional[str]

the description of the model if given.

None
include Optional[List[str]]

the files to be included in the BentoML bundle.

None
exclude Optional[List[str]]

the files to be excluded from the BentoML bundle.

None
python Optional[Dict[str, Any]]

dictionary for configuring Bento's python dependencies,

None
docker Optional[Dict[str, Any]]

dictionary for configuring Bento's docker image.

None
working_dir Optional[str]

the working directory of the BentoML bundle.

None

Returns:

Type Description
Bento

the BentoML Bento object.

Source code in src/zenml/integrations/bentoml/steps/bento_builder.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@step
def bento_builder_step(
    model: UnmaterializedArtifact,
    model_name: str,
    model_type: str,
    service: str,
    version: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    description: Optional[str] = None,
    include: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
    python: Optional[Dict[str, Any]] = None,
    docker: Optional[Dict[str, Any]] = None,
    working_dir: Optional[str] = None,
) -> bento.Bento:
    """Build a BentoML Model and Bento bundle.

    This steps takes a model artifact of a trained or loaded ML model in a
    previous step and save it with BentoML, then build a BentoML bundle.

    Args:
        model: the model to be packaged.
        model_name: the name of the model to be packaged.
        model_type: the type of the model.
        service: the name of the BentoML service to be deployed.
        version: the version of the model if given.
        labels: the labels of the model if given.
        description: the description of the model if given.
        include: the files to be included in the BentoML bundle.
        exclude: the files to be excluded from the BentoML bundle.
        python: dictionary for configuring Bento's python dependencies,
        docker: dictionary for configuring Bento's docker image.
        working_dir: the working directory of the BentoML bundle.

    Returns:
        the BentoML Bento object.
    """
    context = get_step_context()

    # save the model and bento uri as part of the bento labels
    labels = labels or {}
    labels["model_uri"] = model.uri
    labels["bento_uri"] = os.path.join(
        context.get_output_artifact_uri(), DEFAULT_BENTO_FILENAME
    )

    # Load the model from the model artifact
    model = load_artifact_from_response(model)

    # Save the model to a BentoML model based on the model type
    try:
        module = importlib.import_module(f".{model_type}", "bentoml")
        module.save_model(model_name, model, labels=labels)
    except importlib.metadata.PackageNotFoundError:
        bentoml.picklable_model.save_model(
            model_name,
            model,
        )

    # Build the BentoML bundle
    bento = bentos.build(
        service=service,
        models=[model_name],
        version=version,
        labels=labels,
        description=description,
        include=include,
        exclude=exclude,
        python=python,
        docker=docker,
        build_ctx=working_dir or source_utils.get_source_root(),
    )

    # Return the BentoML Bento bundle
    return bento
bentoml_model_deployer_step(bento: bento.Bento, model_name: str, port: int, deployment_type: BentoMLDeploymentType = BentoMLDeploymentType.LOCAL, deploy_decision: bool = True, workers: Optional[int] = 1, backlog: Optional[int] = 2048, production: bool = False, working_dir: Optional[str] = None, host: Optional[str] = None, image: Optional[str] = None, image_tag: Optional[str] = None, platform: Optional[str] = None, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, ssl_keyfile_password: Optional[str] = None, ssl_version: Optional[str] = None, ssl_cert_reqs: Optional[str] = None, ssl_ca_certs: Optional[str] = None, ssl_ciphers: Optional[str] = None, timeout: int = 30) -> BaseService

Model deployer pipeline step for BentoML.

This step deploys a given Bento to a local BentoML http prediction server.

Parameters:

Name Type Description Default
bento Bento

the bento artifact to deploy

required
model_name str

the name of the model to deploy.

required
port int

the port to use for the prediction service.

required
deployment_type BentoMLDeploymentType

the type of deployment to use. Either "local" or "container".

LOCAL
deploy_decision bool

whether to deploy the model or not

True
workers Optional[int]

number of workers to use for the prediction service

1
backlog Optional[int]

the number of requests to queue up before rejecting requests.

2048
production bool

whether to deploy the service in production mode.

False
working_dir Optional[str]

the working directory to use for the prediction service.

None
host Optional[str]

the host to use for the prediction service.

None
image Optional[str]

the image to use for the prediction service in the container deployment.

None
image_tag Optional[str]

the image tag to use for the prediction service in the container deployment.

None
platform Optional[str]

the platform to use to build the image for the container deployment.

None
ssl_certfile Optional[str]

the path to the ssl cert file.

None
ssl_keyfile Optional[str]

the path to the ssl key file.

None
ssl_keyfile_password Optional[str]

the password for the ssl key file.

None
ssl_version Optional[str]

the ssl version to use.

None
ssl_cert_reqs Optional[str]

the ssl cert requirements.

None
ssl_ca_certs Optional[str]

the path to the ssl ca certs.

None
ssl_ciphers Optional[str]

the ssl ciphers to use.

None
timeout int

the number of seconds to wait for the service to start/stop.

30

Returns:

Type Description
BaseService

BentoML deployment service

Source code in src/zenml/integrations/bentoml/steps/bentoml_deployer.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@step(enable_cache=True)
def bentoml_model_deployer_step(
    bento: bento.Bento,
    model_name: str,
    port: int,
    deployment_type: BentoMLDeploymentType = BentoMLDeploymentType.LOCAL,
    deploy_decision: bool = True,
    workers: Optional[int] = 1,
    backlog: Optional[int] = 2048,
    production: bool = False,
    working_dir: Optional[str] = None,
    host: Optional[str] = None,
    image: Optional[str] = None,
    image_tag: Optional[str] = None,
    platform: Optional[str] = None,
    ssl_certfile: Optional[str] = None,
    ssl_keyfile: Optional[str] = None,
    ssl_keyfile_password: Optional[str] = None,
    ssl_version: Optional[str] = None,
    ssl_cert_reqs: Optional[str] = None,
    ssl_ca_certs: Optional[str] = None,
    ssl_ciphers: Optional[str] = None,
    timeout: int = 30,
) -> BaseService:
    """Model deployer pipeline step for BentoML.

    This step deploys a given Bento to a local BentoML http prediction server.

    Args:
        bento: the bento artifact to deploy
        model_name: the name of the model to deploy.
        port: the port to use for the prediction service.
        deployment_type: the type of deployment to use. Either "local" or "container".
        deploy_decision: whether to deploy the model or not
        workers: number of workers to use for the prediction service
        backlog: the number of requests to queue up before rejecting requests.
        production: whether to deploy the service in production mode.
        working_dir: the working directory to use for the prediction service.
        host: the host to use for the prediction service.
        image: the image to use for the prediction service in the container deployment.
        image_tag: the image tag to use for the prediction service in the container deployment.
        platform: the platform to use to build the image for the container deployment.
        ssl_certfile: the path to the ssl cert file.
        ssl_keyfile: the path to the ssl key file.
        ssl_keyfile_password: the password for the ssl key file.
        ssl_version: the ssl version to use.
        ssl_cert_reqs: the ssl cert requirements.
        ssl_ca_certs: the path to the ssl ca certs.
        ssl_ciphers: the ssl ciphers to use.
        timeout: the number of seconds to wait for the service to start/stop.

    Returns:
        BentoML deployment service
    """
    # get the current active model deployer
    model_deployer = cast(
        BentoMLModelDeployer, BentoMLModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    step_context = get_step_context()
    pipeline_name = step_context.pipeline.name
    step_name = step_context.step_run.name

    # Return the apis endpoint of the defined service to use in the predict.
    # This is a workaround to get the endpoints of the service defined as functions
    # from the user code in the BentoML service.
    def service_apis(bento_tag: str) -> List[str]:
        # Add working dir in the bentoml load
        service = bentoml.load(
            bento_identifier=bento_tag,
            working_dir=working_dir or source_utils.get_source_root(),
        )
        apis = service.apis
        apis_paths = list(apis.keys())
        return apis_paths

    def create_deployment_config(
        deployment_type: BentoMLDeploymentType,
    ) -> Tuple[ServiceConfig, ServiceType]:
        common_config = {
            "model_name": model_name,
            "bento_tag": str(bento.tag),
            "model_uri": bento.info.labels.get("model_uri"),
            "bento_uri": bento.info.labels.get("bento_uri"),
            "apis": service_apis(str(bento.tag)),
            "host": host,
            "port": port,
            "pipeline_name": pipeline_name,
            "pipeline_step_name": step_name,
            "workers": workers,
            "backlog": backlog,
        }

        if deployment_type == BentoMLDeploymentType.CONTAINER:
            return BentoMLContainerDeploymentConfig(
                **common_config,
                image=image,
                image_tag=image_tag,
                platform=platform,
            ), BentoMLContainerDeploymentService.SERVICE_TYPE
        else:
            return BentoMLLocalDeploymentConfig(
                **common_config,
                working_dir=working_dir or source_utils.get_source_root(),
                ssl_parameters=SSLBentoMLParametersConfig(
                    ssl_certfile=ssl_certfile,
                    ssl_keyfile=ssl_keyfile,
                    ssl_keyfile_password=ssl_keyfile_password,
                    ssl_version=ssl_version,
                    ssl_cert_reqs=ssl_cert_reqs,
                    ssl_ca_certs=ssl_ca_certs,
                    ssl_ciphers=ssl_ciphers,
                ),
                production=production,
            ), BentoMLLocalDeploymentService.SERVICE_TYPE

    predictor_cfg, service_type = create_deployment_config(deployment_type)
    # fetch existing services with same pipeline name, step name and model name
    existing_services = model_deployer.find_model_server(
        config=predictor_cfg.model_dump(),
        service_type=service_type,
    )

    # Creating a new service with inactive state and status by default
    service: Optional[BaseService] = None
    if existing_services:
        if deployment_type == BentoMLDeploymentType.CONTAINER:
            service = cast(
                BentoMLContainerDeploymentService, existing_services[0]
            )
        else:
            service = cast(BentoMLLocalDeploymentService, existing_services[0])

    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{model_name}'..."
        )
        assert service is not None
        if not service.is_running:
            service.start(timeout=timeout)
        return service

    # create a new model deployment and replace an old one if it exists
    new_service: BaseService
    if deployment_type == BentoMLDeploymentType.CONTAINER:
        new_service = cast(
            BentoMLContainerDeploymentService,
            model_deployer.deploy_model(
                replace=True,
                config=cast(BentoMLContainerDeploymentConfig, predictor_cfg),
                timeout=timeout,
                service_type=service_type,
            ),
        )
    else:
        new_service = cast(
            BentoMLLocalDeploymentService,
            model_deployer.deploy_model(
                replace=True,
                config=cast(BentoMLLocalDeploymentConfig, predictor_cfg),
                timeout=timeout,
                service_type=service_type,
            ),
        )

    logger.info(
        f"BentoML deployment service started and reachable at:\n"
        f"    {new_service.prediction_url}\n"
    )

    return new_service
Modules
bento_builder

Implementation of the BentoML bento builder step.

Classes Functions
bento_builder_step(model: UnmaterializedArtifact, model_name: str, model_type: str, service: str, version: Optional[str] = None, labels: Optional[Dict[str, str]] = None, description: Optional[str] = None, include: Optional[List[str]] = None, exclude: Optional[List[str]] = None, python: Optional[Dict[str, Any]] = None, docker: Optional[Dict[str, Any]] = None, working_dir: Optional[str] = None) -> bento.Bento

Build a BentoML Model and Bento bundle.

This steps takes a model artifact of a trained or loaded ML model in a previous step and save it with BentoML, then build a BentoML bundle.

Parameters:

Name Type Description Default
model UnmaterializedArtifact

the model to be packaged.

required
model_name str

the name of the model to be packaged.

required
model_type str

the type of the model.

required
service str

the name of the BentoML service to be deployed.

required
version Optional[str]

the version of the model if given.

None
labels Optional[Dict[str, str]]

the labels of the model if given.

None
description Optional[str]

the description of the model if given.

None
include Optional[List[str]]

the files to be included in the BentoML bundle.

None
exclude Optional[List[str]]

the files to be excluded from the BentoML bundle.

None
python Optional[Dict[str, Any]]

dictionary for configuring Bento's python dependencies,

None
docker Optional[Dict[str, Any]]

dictionary for configuring Bento's docker image.

None
working_dir Optional[str]

the working directory of the BentoML bundle.

None

Returns:

Type Description
Bento

the BentoML Bento object.

Source code in src/zenml/integrations/bentoml/steps/bento_builder.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
@step
def bento_builder_step(
    model: UnmaterializedArtifact,
    model_name: str,
    model_type: str,
    service: str,
    version: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    description: Optional[str] = None,
    include: Optional[List[str]] = None,
    exclude: Optional[List[str]] = None,
    python: Optional[Dict[str, Any]] = None,
    docker: Optional[Dict[str, Any]] = None,
    working_dir: Optional[str] = None,
) -> bento.Bento:
    """Build a BentoML Model and Bento bundle.

    This steps takes a model artifact of a trained or loaded ML model in a
    previous step and save it with BentoML, then build a BentoML bundle.

    Args:
        model: the model to be packaged.
        model_name: the name of the model to be packaged.
        model_type: the type of the model.
        service: the name of the BentoML service to be deployed.
        version: the version of the model if given.
        labels: the labels of the model if given.
        description: the description of the model if given.
        include: the files to be included in the BentoML bundle.
        exclude: the files to be excluded from the BentoML bundle.
        python: dictionary for configuring Bento's python dependencies,
        docker: dictionary for configuring Bento's docker image.
        working_dir: the working directory of the BentoML bundle.

    Returns:
        the BentoML Bento object.
    """
    context = get_step_context()

    # save the model and bento uri as part of the bento labels
    labels = labels or {}
    labels["model_uri"] = model.uri
    labels["bento_uri"] = os.path.join(
        context.get_output_artifact_uri(), DEFAULT_BENTO_FILENAME
    )

    # Load the model from the model artifact
    model = load_artifact_from_response(model)

    # Save the model to a BentoML model based on the model type
    try:
        module = importlib.import_module(f".{model_type}", "bentoml")
        module.save_model(model_name, model, labels=labels)
    except importlib.metadata.PackageNotFoundError:
        bentoml.picklable_model.save_model(
            model_name,
            model,
        )

    # Build the BentoML bundle
    bento = bentos.build(
        service=service,
        models=[model_name],
        version=version,
        labels=labels,
        description=description,
        include=include,
        exclude=exclude,
        python=python,
        docker=docker,
        build_ctx=working_dir or source_utils.get_source_root(),
    )

    # Return the BentoML Bento bundle
    return bento
Modules
bentoml_deployer

Implementation of the BentoML model deployer pipeline step.

Classes Functions
bentoml_model_deployer_step(bento: bento.Bento, model_name: str, port: int, deployment_type: BentoMLDeploymentType = BentoMLDeploymentType.LOCAL, deploy_decision: bool = True, workers: Optional[int] = 1, backlog: Optional[int] = 2048, production: bool = False, working_dir: Optional[str] = None, host: Optional[str] = None, image: Optional[str] = None, image_tag: Optional[str] = None, platform: Optional[str] = None, ssl_certfile: Optional[str] = None, ssl_keyfile: Optional[str] = None, ssl_keyfile_password: Optional[str] = None, ssl_version: Optional[str] = None, ssl_cert_reqs: Optional[str] = None, ssl_ca_certs: Optional[str] = None, ssl_ciphers: Optional[str] = None, timeout: int = 30) -> BaseService

Model deployer pipeline step for BentoML.

This step deploys a given Bento to a local BentoML http prediction server.

Parameters:

Name Type Description Default
bento Bento

the bento artifact to deploy

required
model_name str

the name of the model to deploy.

required
port int

the port to use for the prediction service.

required
deployment_type BentoMLDeploymentType

the type of deployment to use. Either "local" or "container".

LOCAL
deploy_decision bool

whether to deploy the model or not

True
workers Optional[int]

number of workers to use for the prediction service

1
backlog Optional[int]

the number of requests to queue up before rejecting requests.

2048
production bool

whether to deploy the service in production mode.

False
working_dir Optional[str]

the working directory to use for the prediction service.

None
host Optional[str]

the host to use for the prediction service.

None
image Optional[str]

the image to use for the prediction service in the container deployment.

None
image_tag Optional[str]

the image tag to use for the prediction service in the container deployment.

None
platform Optional[str]

the platform to use to build the image for the container deployment.

None
ssl_certfile Optional[str]

the path to the ssl cert file.

None
ssl_keyfile Optional[str]

the path to the ssl key file.

None
ssl_keyfile_password Optional[str]

the password for the ssl key file.

None
ssl_version Optional[str]

the ssl version to use.

None
ssl_cert_reqs Optional[str]

the ssl cert requirements.

None
ssl_ca_certs Optional[str]

the path to the ssl ca certs.

None
ssl_ciphers Optional[str]

the ssl ciphers to use.

None
timeout int

the number of seconds to wait for the service to start/stop.

30

Returns:

Type Description
BaseService

BentoML deployment service

Source code in src/zenml/integrations/bentoml/steps/bentoml_deployer.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@step(enable_cache=True)
def bentoml_model_deployer_step(
    bento: bento.Bento,
    model_name: str,
    port: int,
    deployment_type: BentoMLDeploymentType = BentoMLDeploymentType.LOCAL,
    deploy_decision: bool = True,
    workers: Optional[int] = 1,
    backlog: Optional[int] = 2048,
    production: bool = False,
    working_dir: Optional[str] = None,
    host: Optional[str] = None,
    image: Optional[str] = None,
    image_tag: Optional[str] = None,
    platform: Optional[str] = None,
    ssl_certfile: Optional[str] = None,
    ssl_keyfile: Optional[str] = None,
    ssl_keyfile_password: Optional[str] = None,
    ssl_version: Optional[str] = None,
    ssl_cert_reqs: Optional[str] = None,
    ssl_ca_certs: Optional[str] = None,
    ssl_ciphers: Optional[str] = None,
    timeout: int = 30,
) -> BaseService:
    """Model deployer pipeline step for BentoML.

    This step deploys a given Bento to a local BentoML http prediction server.

    Args:
        bento: the bento artifact to deploy
        model_name: the name of the model to deploy.
        port: the port to use for the prediction service.
        deployment_type: the type of deployment to use. Either "local" or "container".
        deploy_decision: whether to deploy the model or not
        workers: number of workers to use for the prediction service
        backlog: the number of requests to queue up before rejecting requests.
        production: whether to deploy the service in production mode.
        working_dir: the working directory to use for the prediction service.
        host: the host to use for the prediction service.
        image: the image to use for the prediction service in the container deployment.
        image_tag: the image tag to use for the prediction service in the container deployment.
        platform: the platform to use to build the image for the container deployment.
        ssl_certfile: the path to the ssl cert file.
        ssl_keyfile: the path to the ssl key file.
        ssl_keyfile_password: the password for the ssl key file.
        ssl_version: the ssl version to use.
        ssl_cert_reqs: the ssl cert requirements.
        ssl_ca_certs: the path to the ssl ca certs.
        ssl_ciphers: the ssl ciphers to use.
        timeout: the number of seconds to wait for the service to start/stop.

    Returns:
        BentoML deployment service
    """
    # get the current active model deployer
    model_deployer = cast(
        BentoMLModelDeployer, BentoMLModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    step_context = get_step_context()
    pipeline_name = step_context.pipeline.name
    step_name = step_context.step_run.name

    # Return the apis endpoint of the defined service to use in the predict.
    # This is a workaround to get the endpoints of the service defined as functions
    # from the user code in the BentoML service.
    def service_apis(bento_tag: str) -> List[str]:
        # Add working dir in the bentoml load
        service = bentoml.load(
            bento_identifier=bento_tag,
            working_dir=working_dir or source_utils.get_source_root(),
        )
        apis = service.apis
        apis_paths = list(apis.keys())
        return apis_paths

    def create_deployment_config(
        deployment_type: BentoMLDeploymentType,
    ) -> Tuple[ServiceConfig, ServiceType]:
        common_config = {
            "model_name": model_name,
            "bento_tag": str(bento.tag),
            "model_uri": bento.info.labels.get("model_uri"),
            "bento_uri": bento.info.labels.get("bento_uri"),
            "apis": service_apis(str(bento.tag)),
            "host": host,
            "port": port,
            "pipeline_name": pipeline_name,
            "pipeline_step_name": step_name,
            "workers": workers,
            "backlog": backlog,
        }

        if deployment_type == BentoMLDeploymentType.CONTAINER:
            return BentoMLContainerDeploymentConfig(
                **common_config,
                image=image,
                image_tag=image_tag,
                platform=platform,
            ), BentoMLContainerDeploymentService.SERVICE_TYPE
        else:
            return BentoMLLocalDeploymentConfig(
                **common_config,
                working_dir=working_dir or source_utils.get_source_root(),
                ssl_parameters=SSLBentoMLParametersConfig(
                    ssl_certfile=ssl_certfile,
                    ssl_keyfile=ssl_keyfile,
                    ssl_keyfile_password=ssl_keyfile_password,
                    ssl_version=ssl_version,
                    ssl_cert_reqs=ssl_cert_reqs,
                    ssl_ca_certs=ssl_ca_certs,
                    ssl_ciphers=ssl_ciphers,
                ),
                production=production,
            ), BentoMLLocalDeploymentService.SERVICE_TYPE

    predictor_cfg, service_type = create_deployment_config(deployment_type)
    # fetch existing services with same pipeline name, step name and model name
    existing_services = model_deployer.find_model_server(
        config=predictor_cfg.model_dump(),
        service_type=service_type,
    )

    # Creating a new service with inactive state and status by default
    service: Optional[BaseService] = None
    if existing_services:
        if deployment_type == BentoMLDeploymentType.CONTAINER:
            service = cast(
                BentoMLContainerDeploymentService, existing_services[0]
            )
        else:
            service = cast(BentoMLLocalDeploymentService, existing_services[0])

    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{model_name}'..."
        )
        assert service is not None
        if not service.is_running:
            service.start(timeout=timeout)
        return service

    # create a new model deployment and replace an old one if it exists
    new_service: BaseService
    if deployment_type == BentoMLDeploymentType.CONTAINER:
        new_service = cast(
            BentoMLContainerDeploymentService,
            model_deployer.deploy_model(
                replace=True,
                config=cast(BentoMLContainerDeploymentConfig, predictor_cfg),
                timeout=timeout,
                service_type=service_type,
            ),
        )
    else:
        new_service = cast(
            BentoMLLocalDeploymentService,
            model_deployer.deploy_model(
                replace=True,
                config=cast(BentoMLLocalDeploymentConfig, predictor_cfg),
                timeout=timeout,
                service_type=service_type,
            ),
        )

    logger.info(
        f"BentoML deployment service started and reachable at:\n"
        f"    {new_service.prediction_url}\n"
    )

    return new_service
Modules