Skip to content

Seldon

zenml.integrations.seldon

Initialization of the Seldon integration.

The Seldon Core integration allows you to use the Seldon Core model serving platform to implement continuous model deployment.

Attributes

SELDON = 'seldon' module-attribute

SELDON_MODEL_DEPLOYER_FLAVOR = 'seldon' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

SeldonIntegration

Bases: Integration

Definition of Seldon Core integration for ZenML.

Functions
activate() -> None classmethod

Activate the Seldon Core integration.

Source code in src/zenml/integrations/seldon/__init__.py
38
39
40
41
42
@classmethod
def activate(cls) -> None:
    """Activate the Seldon Core integration."""
    from zenml.integrations.seldon import secret_schemas  # noqa
    from zenml.integrations.seldon import services  # noqa
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Seldon Core.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/seldon/__init__.py
44
45
46
47
48
49
50
51
52
53
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Seldon Core.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor

    return [SeldonModelDeployerFlavor]
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/seldon/__init__.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@classmethod
def get_requirements(cls, target_os: Optional[str] = None, python_version: Optional[str] = None
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    from zenml.integrations.numpy import NumpyIntegration

    return cls.REQUIREMENTS + \
        NumpyIntegration.get_requirements(target_os=target_os, python_version=python_version)

Modules

constants

Seldon constants.

custom_deployer

Initialization of ZenML custom deployer.

Classes
Modules
zenml_custom_model

Implements a custom model for the Seldon integration.

Classes
ZenMLCustomModel(model_name: str, model_uri: str, predict_func: str)

Custom model class for ZenML and Seldon.

This class is used to implement a custom model for the Seldon Core integration, which is used as the main entry point for custom code execution.

Attributes:

Name Type Description
name

The name of the model.

model_uri

The URI of the model.

predict_func

The predict function of the model.

Initializes a ZenMLCustomModel object.

Parameters:

Name Type Description Default
model_name str

The name of the model.

required
model_uri str

The URI of the model.

required
predict_func str

The predict function of the model.

required
Source code in src/zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    model_name: str,
    model_uri: str,
    predict_func: str,
):
    """Initializes a ZenMLCustomModel object.

    Args:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """
    self.name = model_name
    self.model_uri = model_uri
    self.predict_func = source_utils.load(predict_func)
    self.model = None
    self.ready = False
Functions
load() -> bool

Load the model.

This function loads the model into memory and sets the ready flag to True. The model is loaded using the materializer, by saving the information of the artifact to a file at the preparing time and loading it again at the prediction time by the materializer.

Returns:

Type Description
bool

True if the model was loaded successfully, False otherwise.

Source code in src/zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def load(self) -> bool:
    """Load the model.

    This function loads the model into memory and sets the ready flag to True.
    The model is loaded using the materializer, by saving the information of
    the artifact to a file at the preparing time and loading it again at the
    prediction time by the materializer.

    Returns:
        True if the model was loaded successfully, False otherwise.

    """
    try:
        from zenml.artifacts.utils import load_model_from_metadata

        self.model = load_model_from_metadata(self.model_uri)
    except Exception as e:
        logger.error("Failed to load model: {}".format(e))
        return False
    self.ready = True
    return self.ready
predict(X: Array_Like, features_names: Optional[List[str]], **kwargs: Any) -> Array_Like

Predict the given request.

The main predict function of the model. This function is called by the Seldon Core server when a request is received. Then inside this function, the user-defined predict function is called.

Parameters:

Name Type Description Default
X Array_Like

The request to predict in a dictionary.

required
features_names Optional[List[str]]

The names of the features.

required
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
Array_Like

The prediction dictionary.

Raises:

Type Description
Exception

If function could not be called.

NotImplementedError

If the model is not ready.

TypeError

If the request is not a dictionary.

Source code in src/zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def predict(
    self,
    X: Array_Like,
    features_names: Optional[List[str]],
    **kwargs: Any,
) -> Array_Like:
    """Predict the given request.

    The main predict function of the model. This function is called by the
    Seldon Core server when a request is received. Then inside this function,
    the user-defined predict function is called.

    Args:
        X: The request to predict in a dictionary.
        features_names: The names of the features.
        **kwargs: Additional arguments.

    Returns:
        The prediction dictionary.

    Raises:
        Exception: If function could not be called.
        NotImplementedError: If the model is not ready.
        TypeError: If the request is not a dictionary.
    """
    if self.predict_func is not None:
        try:
            prediction = {"predictions": self.predict_func(self.model, X)}
        except Exception as e:
            raise Exception("Failed to predict: {}".format(e))
        if isinstance(prediction, dict):
            return prediction
        else:
            raise TypeError(
                f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
            )
    else:
        raise NotImplementedError("Predict function is not implemented")
Functions
main(model_name: str, model_uri: str, predict_func: str) -> None

Main function for the custom model.

Within the deployment process, the built-in custom deployment step is used to to prepare the Seldon Core deployment with an entry point that calls this script, which then starts a subprocess to start the Seldon server and waits for requests.

The following is an example of the entry point:

entrypoint_command = [
    "python",
    "-m",
    "zenml.integrations.seldon.custom_deployer.zenml_custom_model",
    "--model_name",
    config.service_config.model_name,
    "--predict_func",
    config.custom_deploy_parameters.predict_function,
]

Parameters:

Name Type Description Default
model_name str

The name of the model.

required
model_uri str

The URI of the model.

required
predict_func str

The path to the predict function.

required
Source code in src/zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@click.command()
@click.option(
    "--model_uri",
    default=DEFAULT_LOCAL_MODEL_DIR,
    type=click.STRING,
    help="The directory where the model is stored locally.",
)
@click.option(
    "--model_name",
    default=DEFAULT_MODEL_NAME,
    required=True,
    type=click.STRING,
    help="The name of the model to deploy.",
)
@click.option(
    "--predict_func",
    required=True,
    type=click.STRING,
    help="The path to the custom predict function defined by the user.",
)
def main(model_name: str, model_uri: str, predict_func: str) -> None:
    """Main function for the custom model.

    Within the deployment process, the built-in custom deployment step is used to
    to prepare the Seldon Core deployment with an entry point that calls this script,
    which then starts a subprocess to start the Seldon server and waits for requests.

    The following is an example of the entry point:
    ```
    entrypoint_command = [
        "python",
        "-m",
        "zenml.integrations.seldon.custom_deployer.zenml_custom_model",
        "--model_name",
        config.service_config.model_name,
        "--predict_func",
        config.custom_deploy_parameters.predict_function,
    ]
    ```

    Args:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The path to the predict function.
    """
    command = [
        "seldon-core-microservice",
        "zenml.integrations.seldon.custom_deployer.zenml_custom_model.ZenMLCustomModel",
        "--service-type",
        "MODEL",
        "--parameters",
        (
            f'[{{"name":"model_uri","value":"{model_uri}","type":"STRING"}},'
            f'{{"name":"model_name","value":"{model_name}","type":"STRING"}},'
            f'{{"name":"predict_func","value":"{predict_func}","type":"STRING"}}]'
        ),
    ]
    try:
        subprocess.check_call(command)
    except subprocess.CalledProcessError as ProcessError:
        logger.error(
            f"Failed to start the seldon-core-microservice process. {ProcessError}"
        )
        return
Modules

flavors

Seldon integration flavors.

Classes
SeldonModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig

Config for the Seldon Model Deployer.

Attributes:

Name Type Description
kubernetes_context Optional[str]

the Kubernetes context to use to contact the remote Seldon Core installation. If not specified, the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_namespace Optional[str]

the Kubernetes namespace where the Seldon Core deployment servers are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is mandatory.

base_url str

the base URL of the Kubernetes ingress used to expose the Seldon Core deployment servers.

secret Optional[str]

the name of a ZenML secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored - see https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials).

kubernetes_secret_name Optional[str]

the name of the Kubernetes secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored) - This is used when the secret is not managed by ZenML and is already present in the Kubernetes cluster.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
SeldonModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Seldon Core model deployer flavor.

Attributes
config_class: Type[SeldonModelDeployerConfig] property

Returns SeldonModelDeployerConfig config class.

Returns:

Type Description
Type[SeldonModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SeldonModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[SeldonModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

Modules
seldon_model_deployer_flavor

Seldon model deployer flavor.

Classes
SeldonModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig

Config for the Seldon Model Deployer.

Attributes:

Name Type Description
kubernetes_context Optional[str]

the Kubernetes context to use to contact the remote Seldon Core installation. If not specified, the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is ignored.

kubernetes_namespace Optional[str]

the Kubernetes namespace where the Seldon Core deployment servers are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod). If the model deployer stack component is linked to a Kubernetes service connector, this field is mandatory.

base_url str

the base URL of the Kubernetes ingress used to expose the Seldon Core deployment servers.

secret Optional[str]

the name of a ZenML secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored - see https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials).

kubernetes_secret_name Optional[str]

the name of the Kubernetes secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored) - This is used when the secret is not managed by ZenML and is already present in the Kubernetes cluster.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
SeldonModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Seldon Core model deployer flavor.

Attributes
config_class: Type[SeldonModelDeployerConfig] property

Returns SeldonModelDeployerConfig config class.

Returns:

Type Description
Type[SeldonModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SeldonModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[SeldonModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

model_deployers

Initialization of the Seldon Model Deployer.

Classes
SeldonModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

Seldon Core model deployer stack component implementation.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: SeldonModelDeployerConfig property

Returns the SeldonModelDeployerConfig config.

Returns:

Type Description
SeldonModelDeployerConfig

The configuration.

kubernetes_secret_name: str property

Get the Kubernetes secret name associated with this model deployer.

If a pre-existing Kubernetes secret is configured for this model deployer, that name is returned to be used by all Seldon Core deployments associated with this model deployer.

Otherwise, a Kubernetes secret name is generated based on the ID of the active artifact store. The reason for this is that the same model deployer may be used to deploy models in combination with different artifact stores at the same time, and each artifact store may require different credentials to be accessed.

Returns:

Type Description
str

The name of a Kubernetes secret to be used with Seldon Core

str

deployments.

seldon_client: SeldonClient property

Get the Seldon Core client associated with this model deployer.

Returns:

Type Description
SeldonClient

The Seldon Core client.

Raises:

Type Description
RuntimeError

If the Kubernetes namespace is not configured when using a service connector to deploy models with Seldon Core.

validator: Optional[StackValidator] property

Ensures there is a container registry and image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
            build = BuildConfiguration(
                key=SELDON_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_model_server_info(service_instance: SeldonDeploymentService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance SeldonDeploymentService

Instance of a SeldonDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a SeldonDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.prediction_url,
        "MODEL_URI": service_instance.config.model_uri,
        "MODEL_NAME": service_instance.config.model_name,
        "SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
    }
perform_delete_model(service: BaseService, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> None

Delete a Seldon Core model deployment server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
force bool

if True, force the service to stop.

False
Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Delete a Seldon Core model deployment server.

    Args:
        service: The service to delete.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
    service = cast(SeldonDeploymentService, service)
    service.stop(timeout=timeout, force=force)

    if service.config.secret_name:
        # delete the Kubernetes secret used to store the authentication
        # information for the Seldon Core model server storage initializer
        # if no other Seldon Core model servers are using it
        self._delete_kubernetes_secret(service.config.secret_name)
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Create a new Seldon Core deployment or update an existing one.

noqa: DAR402

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new Seldon Core deployment server to reflect the model and other configuration parameters specified in the supplied Seldon deployment config.

  • if replace is True, this method will first attempt to find an existing Seldon Core deployment that is equivalent to the supplied configuration parameters. Two or more Seldon Core deployments are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two Seldon Core deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent Seldon Core deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing Seldon Core deployment to retain its prediction URL while performing a rolling update to serve a new model version.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new Seldon Core deployment server for each new model version. If multiple equivalent Seldon Core deployments are found, the most recently created deployment is selected to be updated and the others are deleted.

Parameters:

Name Type Description Default
id UUID

the UUID of the model server to deploy.

required
config ServiceConfig

the configuration of the model to be deployed with Seldon. Core

required
timeout int

the timeout in seconds to wait for the Seldon Core server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Seldon Core server is provisioned, without waiting for it to fully start.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML Seldon Core deployment service object that can be used to

BaseService

interact with the remote Seldon Core server.

Raises:

Type Description
SeldonClientError

if a Seldon Core client error is encountered while provisioning the Seldon Core deployment server.

RuntimeError

if timeout is set to a positive value that is exceeded while waiting for the Seldon Core deployment server to start, or if an operational failure is encountered before it reaches a ready state.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Seldon Core deployment or update an existing one.

    # noqa: DAR402

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new Seldon
        Core deployment server to reflect the model and other configuration
        parameters specified in the supplied Seldon deployment `config`.

      * if `replace` is True, this method will first attempt to find an
        existing Seldon Core deployment that is *equivalent* to the supplied
        configuration parameters. Two or more Seldon Core deployments are
        considered equivalent if they have the same `pipeline_name`,
        `pipeline_step_name` and `model_name` configuration parameters. To
        put it differently, two Seldon Core deployments are equivalent if
        they serve versions of the same model deployed by the same pipeline
        step. If an equivalent Seldon Core deployment is found, it will be
        updated in place to reflect the new configuration parameters. This
        allows an existing Seldon Core deployment to retain its prediction
        URL while performing a rolling update to serve a new model version.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new Seldon Core deployment
    server for each new model version. If multiple equivalent Seldon Core
    deployments are found, the most recently created deployment is selected
    to be updated and the others are deleted.

    Args:
        id: the UUID of the model server to deploy.
        config: the configuration of the model to be deployed with Seldon.
            Core
        timeout: the timeout in seconds to wait for the Seldon Core server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Seldon Core
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Seldon Core deployment service object that can be used to
        interact with the remote Seldon Core server.

    Raises:
        SeldonClientError: if a Seldon Core client error is encountered
            while provisioning the Seldon Core deployment server.
        RuntimeError: if `timeout` is set to a positive value that is
            exceeded while waiting for the Seldon Core deployment server
            to start, or if an operational failure is encountered before
            it reaches a ready state.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(SeldonDeploymentConfig, config)
        # if a custom Kubernetes secret is not explicitly specified in the
        # SeldonDeploymentConfig, try to create one from the ZenML secret
        # configured for the model deployer
        config.secret_name = (
            config.secret_name
            or self._create_or_update_kubernetes_secret()
        )
        # create a new service
        service = SeldonDeploymentService(uuid=id, config=config)
        logger.info(f"Creating a new Seldon deployment service: {service}")

        # start the service which in turn provisions the Seldon Core
        # deployment server and waits for it to reach a ready state
        service.start(timeout=timeout)

        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "is_custom_code_deployment": config.is_custom_deployment,
        }

    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Start a Seldon Core model deployment server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Raises:

Type Description
NotImplementedError

since we don't support starting Seldon Core model servers

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Start a Seldon Core model deployment server.

    Args:
        service: The service to start.
        timeout: timeout in seconds to wait for the service to become
            active. . If set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.

    Raises:
        NotImplementedError: since we don't support starting Seldon Core
            model servers
    """
    raise NotImplementedError(
        "Starting Seldon Core model servers is not implemented"
    )
perform_stop_model(service: BaseService, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Stop a Seldon Core model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

timeout in seconds to wait for the service to stop.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
force bool

if True, force the service to stop.

False

Raises:

Type Description
NotImplementedError

stopping Seldon Core model servers is not supported.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Stop a Seldon Core model server.

    Args:
        service: The service to stop.
        timeout: timeout in seconds to wait for the service to stop.
        force: if True, force the service to stop.

    Raises:
        NotImplementedError: stopping Seldon Core model servers is not
            supported.
    """
    raise NotImplementedError(
        "Stopping Seldon Core model servers is not implemented. Try "
        "deleting the Seldon Core model server instead."
    )
Modules
seldon_model_deployer

Implementation of the Seldon Model Deployer.

Classes
SeldonModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

Seldon Core model deployer stack component implementation.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: SeldonModelDeployerConfig property

Returns the SeldonModelDeployerConfig config.

Returns:

Type Description
SeldonModelDeployerConfig

The configuration.

kubernetes_secret_name: str property

Get the Kubernetes secret name associated with this model deployer.

If a pre-existing Kubernetes secret is configured for this model deployer, that name is returned to be used by all Seldon Core deployments associated with this model deployer.

Otherwise, a Kubernetes secret name is generated based on the ID of the active artifact store. The reason for this is that the same model deployer may be used to deploy models in combination with different artifact stores at the same time, and each artifact store may require different credentials to be accessed.

Returns:

Type Description
str

The name of a Kubernetes secret to be used with Seldon Core

str

deployments.

seldon_client: SeldonClient property

Get the Seldon Core client associated with this model deployer.

Returns:

Type Description
SeldonClient

The Seldon Core client.

Raises:

Type Description
RuntimeError

If the Kubernetes namespace is not configured when using a service connector to deploy models with Seldon Core.

validator: Optional[StackValidator] property

Ensures there is a container registry and image builder in the stack.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
            build = BuildConfiguration(
                key=SELDON_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_model_server_info(service_instance: SeldonDeploymentService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance SeldonDeploymentService

Instance of a SeldonDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a SeldonDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.prediction_url,
        "MODEL_URI": service_instance.config.model_uri,
        "MODEL_NAME": service_instance.config.model_name,
        "SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
    }
perform_delete_model(service: BaseService, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> None

Delete a Seldon Core model deployment server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
force bool

if True, force the service to stop.

False
Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Delete a Seldon Core model deployment server.

    Args:
        service: The service to delete.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
    service = cast(SeldonDeploymentService, service)
    service.stop(timeout=timeout, force=force)

    if service.config.secret_name:
        # delete the Kubernetes secret used to store the authentication
        # information for the Seldon Core model server storage initializer
        # if no other Seldon Core model servers are using it
        self._delete_kubernetes_secret(service.config.secret_name)
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Create a new Seldon Core deployment or update an existing one.

noqa: DAR402

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new Seldon Core deployment server to reflect the model and other configuration parameters specified in the supplied Seldon deployment config.

  • if replace is True, this method will first attempt to find an existing Seldon Core deployment that is equivalent to the supplied configuration parameters. Two or more Seldon Core deployments are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two Seldon Core deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent Seldon Core deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing Seldon Core deployment to retain its prediction URL while performing a rolling update to serve a new model version.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new Seldon Core deployment server for each new model version. If multiple equivalent Seldon Core deployments are found, the most recently created deployment is selected to be updated and the others are deleted.

Parameters:

Name Type Description Default
id UUID

the UUID of the model server to deploy.

required
config ServiceConfig

the configuration of the model to be deployed with Seldon. Core

required
timeout int

the timeout in seconds to wait for the Seldon Core server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Seldon Core server is provisioned, without waiting for it to fully start.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML Seldon Core deployment service object that can be used to

BaseService

interact with the remote Seldon Core server.

Raises:

Type Description
SeldonClientError

if a Seldon Core client error is encountered while provisioning the Seldon Core deployment server.

RuntimeError

if timeout is set to a positive value that is exceeded while waiting for the Seldon Core deployment server to start, or if an operational failure is encountered before it reaches a ready state.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Seldon Core deployment or update an existing one.

    # noqa: DAR402

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new Seldon
        Core deployment server to reflect the model and other configuration
        parameters specified in the supplied Seldon deployment `config`.

      * if `replace` is True, this method will first attempt to find an
        existing Seldon Core deployment that is *equivalent* to the supplied
        configuration parameters. Two or more Seldon Core deployments are
        considered equivalent if they have the same `pipeline_name`,
        `pipeline_step_name` and `model_name` configuration parameters. To
        put it differently, two Seldon Core deployments are equivalent if
        they serve versions of the same model deployed by the same pipeline
        step. If an equivalent Seldon Core deployment is found, it will be
        updated in place to reflect the new configuration parameters. This
        allows an existing Seldon Core deployment to retain its prediction
        URL while performing a rolling update to serve a new model version.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new Seldon Core deployment
    server for each new model version. If multiple equivalent Seldon Core
    deployments are found, the most recently created deployment is selected
    to be updated and the others are deleted.

    Args:
        id: the UUID of the model server to deploy.
        config: the configuration of the model to be deployed with Seldon.
            Core
        timeout: the timeout in seconds to wait for the Seldon Core server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Seldon Core
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Seldon Core deployment service object that can be used to
        interact with the remote Seldon Core server.

    Raises:
        SeldonClientError: if a Seldon Core client error is encountered
            while provisioning the Seldon Core deployment server.
        RuntimeError: if `timeout` is set to a positive value that is
            exceeded while waiting for the Seldon Core deployment server
            to start, or if an operational failure is encountered before
            it reaches a ready state.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(SeldonDeploymentConfig, config)
        # if a custom Kubernetes secret is not explicitly specified in the
        # SeldonDeploymentConfig, try to create one from the ZenML secret
        # configured for the model deployer
        config.secret_name = (
            config.secret_name
            or self._create_or_update_kubernetes_secret()
        )
        # create a new service
        service = SeldonDeploymentService(uuid=id, config=config)
        logger.info(f"Creating a new Seldon deployment service: {service}")

        # start the service which in turn provisions the Seldon Core
        # deployment server and waits for it to reach a ready state
        service.start(timeout=timeout)

        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "is_custom_code_deployment": config.is_custom_deployment,
        }

    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Start a Seldon Core model deployment server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Raises:

Type Description
NotImplementedError

since we don't support starting Seldon Core model servers

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Start a Seldon Core model deployment server.

    Args:
        service: The service to start.
        timeout: timeout in seconds to wait for the service to become
            active. . If set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.

    Raises:
        NotImplementedError: since we don't support starting Seldon Core
            model servers
    """
    raise NotImplementedError(
        "Starting Seldon Core model servers is not implemented"
    )
perform_stop_model(service: BaseService, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Stop a Seldon Core model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

timeout in seconds to wait for the service to stop.

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
force bool

if True, force the service to stop.

False

Raises:

Type Description
NotImplementedError

stopping Seldon Core model servers is not supported.

Source code in src/zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Stop a Seldon Core model server.

    Args:
        service: The service to stop.
        timeout: timeout in seconds to wait for the service to stop.
        force: if True, force the service to stop.

    Raises:
        NotImplementedError: stopping Seldon Core model servers is not
            supported.
    """
    raise NotImplementedError(
        "Stopping Seldon Core model servers is not implemented. Try "
        "deleting the Seldon Core model server instead."
    )
Functions

secret_schemas

Initialization for the Seldon secret schemas.

These are secret schemas that can be used to authenticate Seldon to the Artifact Store used to store served ML models.

Classes
SeldonAzureSecretSchema

Bases: BaseSecretSchema

Seldon Azure Blob Storage credentials.

Based on: https://rclone.org/azureblob/

Attributes:

Name Type Description
rclone_config_az_type Literal['azureblob']

the rclone config type. Must be set to "azureblob" for this schema.

rclone_config_az_env_auth bool

read credentials from runtime (environment variables or MSI).

rclone_config_az_account Optional[str]

storage Account Name. Leave blank to use SAS URL or MSI.

rclone_config_az_key Optional[str]

storage Account Key. Leave blank to use SAS URL or MSI.

rclone_config_az_sas_url Optional[str]

SAS URL for container level access only. Leave blank if using account/key or MSI.

rclone_config_az_use_msi bool

use a managed service identity to authenticate (only works in Azure).

rclone_config_az_client_secret Optional[str]

client secret for service principal authentication.

rclone_config_az_client_id Optional[str]

client id for service principal authentication.

rclone_config_az_tenant Optional[str]

tenant id for service principal authentication.

SeldonGSSecretSchema

Bases: BaseSecretSchema

Seldon GCS credentials.

Based on: https://rclone.org/googlecloudstorage/

Attributes:

Name Type Description
rclone_config_gs_type Literal['google cloud storage']

the rclone config type. Must be set to "google cloud storage" for this schema.

rclone_config_gs_client_id Optional[str]

OAuth client id.

rclone_config_gs_client_secret Optional[str]

OAuth client secret.

rclone_config_gs_token Optional[str]

OAuth Access Token as a JSON blob.

rclone_config_gs_project_number Optional[str]

project number.

rclone_config_gs_service_account_credentials Optional[str]

service account credentials JSON blob.

rclone_config_gs_anonymous bool

access public buckets and objects without credentials. Set to True if you just want to download files and don't configure credentials.

rclone_config_gs_auth_url Optional[str]

auth server URL.

SeldonS3SecretSchema

Bases: BaseSecretSchema

Seldon S3 credentials.

Based on: https://rclone.org/s3/#amazon-s3

Attributes:

Name Type Description
rclone_config_s3_type Literal['s3']

the rclone config type. Must be set to "s3" for this schema.

rclone_config_s3_provider str

the S3 provider (e.g. aws, ceph, minio).

rclone_config_s3_env_auth bool

get AWS credentials from EC2/ECS meta data (i.e. with IAM roles configuration). Only applies if access_key_id and secret_access_key are blank.

rclone_config_s3_access_key_id Optional[str]

AWS Access Key ID.

rclone_config_s3_secret_access_key Optional[str]

AWS Secret Access Key.

rclone_config_s3_session_token Optional[str]

AWS Session Token.

rclone_config_s3_region Optional[str]

region to connect to.

rclone_config_s3_endpoint Optional[str]

S3 API endpoint.

Modules
secret_schemas

Implementation for Seldon secret schemas.

Classes
SeldonAzureSecretSchema

Bases: BaseSecretSchema

Seldon Azure Blob Storage credentials.

Based on: https://rclone.org/azureblob/

Attributes:

Name Type Description
rclone_config_az_type Literal['azureblob']

the rclone config type. Must be set to "azureblob" for this schema.

rclone_config_az_env_auth bool

read credentials from runtime (environment variables or MSI).

rclone_config_az_account Optional[str]

storage Account Name. Leave blank to use SAS URL or MSI.

rclone_config_az_key Optional[str]

storage Account Key. Leave blank to use SAS URL or MSI.

rclone_config_az_sas_url Optional[str]

SAS URL for container level access only. Leave blank if using account/key or MSI.

rclone_config_az_use_msi bool

use a managed service identity to authenticate (only works in Azure).

rclone_config_az_client_secret Optional[str]

client secret for service principal authentication.

rclone_config_az_client_id Optional[str]

client id for service principal authentication.

rclone_config_az_tenant Optional[str]

tenant id for service principal authentication.

SeldonGSSecretSchema

Bases: BaseSecretSchema

Seldon GCS credentials.

Based on: https://rclone.org/googlecloudstorage/

Attributes:

Name Type Description
rclone_config_gs_type Literal['google cloud storage']

the rclone config type. Must be set to "google cloud storage" for this schema.

rclone_config_gs_client_id Optional[str]

OAuth client id.

rclone_config_gs_client_secret Optional[str]

OAuth client secret.

rclone_config_gs_token Optional[str]

OAuth Access Token as a JSON blob.

rclone_config_gs_project_number Optional[str]

project number.

rclone_config_gs_service_account_credentials Optional[str]

service account credentials JSON blob.

rclone_config_gs_anonymous bool

access public buckets and objects without credentials. Set to True if you just want to download files and don't configure credentials.

rclone_config_gs_auth_url Optional[str]

auth server URL.

SeldonS3SecretSchema

Bases: BaseSecretSchema

Seldon S3 credentials.

Based on: https://rclone.org/s3/#amazon-s3

Attributes:

Name Type Description
rclone_config_s3_type Literal['s3']

the rclone config type. Must be set to "s3" for this schema.

rclone_config_s3_provider str

the S3 provider (e.g. aws, ceph, minio).

rclone_config_s3_env_auth bool

get AWS credentials from EC2/ECS meta data (i.e. with IAM roles configuration). Only applies if access_key_id and secret_access_key are blank.

rclone_config_s3_access_key_id Optional[str]

AWS Access Key ID.

rclone_config_s3_secret_access_key Optional[str]

AWS Secret Access Key.

rclone_config_s3_session_token Optional[str]

AWS Session Token.

rclone_config_s3_region Optional[str]

region to connect to.

rclone_config_s3_endpoint Optional[str]

S3 API endpoint.

seldon_client

Implementation of the Seldon client for ZenML.

Classes
SeldonClient(context: Optional[str], namespace: Optional[str], kube_client: Optional[k8s_client.ApiClient] = None)

A client for interacting with Seldon Deployments.

Initialize a Seldon Core client.

Parameters:

Name Type Description Default
context Optional[str]

the Kubernetes context to use.

required
namespace Optional[str]

the Kubernetes namespace to use.

required
kube_client Optional[ApiClient]

a Kubernetes client to use.

None
Source code in src/zenml/integrations/seldon/seldon_client.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def __init__(
    self,
    context: Optional[str],
    namespace: Optional[str],
    kube_client: Optional[k8s_client.ApiClient] = None,
):
    """Initialize a Seldon Core client.

    Args:
        context: the Kubernetes context to use.
        namespace: the Kubernetes namespace to use.
        kube_client: a Kubernetes client to use.
    """
    self._namespace = namespace
    self._initialize_k8s_clients(context=context, kube_client=kube_client)
Attributes
namespace: str property

Returns the Kubernetes namespace in use by the client.

Returns:

Type Description
str

The Kubernetes namespace in use by the client.

Raises:

Type Description
RuntimeError

if the namespace has not been configured.

Functions
create_deployment(deployment: SeldonDeployment, poll_timeout: int = 0) -> SeldonDeployment

Create a Seldon Core deployment resource.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource to create

required
poll_timeout int

the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised.

0

Returns:

Type Description
SeldonDeployment

the created Seldon Core deployment resource with updated status.

Raises:

Type Description
SeldonDeploymentExistsError

if a deployment with the same name already exists.

SeldonClientError

if an unknown error occurs during the creation of the deployment.

Source code in src/zenml/integrations/seldon/seldon_client.py
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
def create_deployment(
    self,
    deployment: SeldonDeployment,
    poll_timeout: int = 0,
) -> SeldonDeployment:
    """Create a Seldon Core deployment resource.

    Args:
        deployment: the Seldon Core deployment resource to create
        poll_timeout: the maximum time to wait for the deployment to become
            available or to fail. If set to 0, the function will return
            immediately without checking the deployment status. If a timeout
            occurs and the deployment is still pending creation, it will
            be returned anyway and no exception will be raised.

    Returns:
        the created Seldon Core deployment resource with updated status.

    Raises:
        SeldonDeploymentExistsError: if a deployment with the same name
            already exists.
        SeldonClientError: if an unknown error occurs during the creation of
            the deployment.
    """
    try:
        logger.debug(f"Creating SeldonDeployment resource: {deployment}")

        # mark the deployment as managed by ZenML, to differentiate
        # between deployments that are created by ZenML and those that
        # are not
        deployment.mark_as_managed_by_zenml()

        body_deploy = deployment.model_dump(exclude_none=True)
        response = (
            self._custom_objects_api.create_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                body=body_deploy,
                _request_timeout=poll_timeout or None,
            )
        )
        logger.debug("Seldon Core API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when creating SeldonDeployment resource: %s", str(e)
        )
        if e.status == 409:
            raise SeldonDeploymentExistsError(
                f"A deployment with the name {deployment.name} "
                f"already exists in namespace {self._namespace}"
            )
        raise SeldonClientError(
            "Exception when creating SeldonDeployment resource"
        ) from e

    created_deployment = self.get_deployment(name=deployment.name)

    while poll_timeout > 0 and created_deployment.is_pending():
        time.sleep(5)
        poll_timeout -= 5
        created_deployment = self.get_deployment(name=deployment.name)

    return created_deployment
create_or_update_secret(name: str, secret_values: Dict[str, Any]) -> None

Create or update a Kubernetes Secret resource.

Parameters:

Name Type Description Default
name str

the name of the Secret resource to create.

required
secret_values Dict[str, Any]

secret key-values that should be stored in the Secret resource.

required

Raises:

Type Description
SeldonClientError

if an unknown error occurs during the creation of the secret.

ApiException

unexpected error.

Source code in src/zenml/integrations/seldon/seldon_client.py
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
def create_or_update_secret(
    self,
    name: str,
    secret_values: Dict[str, Any],
) -> None:
    """Create or update a Kubernetes Secret resource.

    Args:
        name: the name of the Secret resource to create.
        secret_values: secret key-values that should be
            stored in the Secret resource.

    Raises:
        SeldonClientError: if an unknown error occurs during the creation of
            the secret.
        k8s_client.rest.ApiException: unexpected error.
    """
    try:
        logger.debug(f"Creating Secret resource: {name}")

        secret_data = {
            k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
                "ascii"
            )
            for k, v in secret_values.items()
            if v is not None
        }

        secret = k8s_client.V1Secret(
            metadata=k8s_client.V1ObjectMeta(
                name=name,
                labels={"app": "zenml"},
            ),
            type="Opaque",
            data=secret_data,
        )

        try:
            # check if the secret is already present
            self._core_api.read_namespaced_secret(
                name=name,
                namespace=self._namespace,
            )
            # if we got this far, the secret is already present, update it
            # in place
            response = self._core_api.replace_namespaced_secret(
                name=name,
                namespace=self._namespace,
                body=secret,
            )
        except k8s_client.rest.ApiException as e:
            if e.status != 404:
                # if an error other than 404 is raised here, treat it
                # as an unexpected error
                raise
            response = self._core_api.create_namespaced_secret(
                namespace=self._namespace,
                body=secret,
            )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error("Exception when creating Secret resource: %s", str(e))
        raise SeldonClientError(
            "Exception when creating Secret resource"
        ) from e
delete_deployment(name: str, force: bool = False, poll_timeout: int = 0) -> None

Delete a Seldon Core deployment resource managed by ZenML.

Parameters:

Name Type Description Default
name str

the name of the Seldon Core deployment resource to delete.

required
force bool

if True, the deployment deletion will be forced (the graceful period will be set to zero).

False
poll_timeout int

the maximum time to wait for the deployment to be deleted. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment still exists, this method will return and no exception will be raised.

0

Raises:

Type Description
SeldonClientError

if an unknown error occurs during the deployment removal.

Source code in src/zenml/integrations/seldon/seldon_client.py
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
def delete_deployment(
    self,
    name: str,
    force: bool = False,
    poll_timeout: int = 0,
) -> None:
    """Delete a Seldon Core deployment resource managed by ZenML.

    Args:
        name: the name of the Seldon Core deployment resource to delete.
        force: if True, the deployment deletion will be forced (the graceful
            period will be set to zero).
        poll_timeout: the maximum time to wait for the deployment to be
            deleted. If set to 0, the function will return immediately
            without checking the deployment status. If a timeout
            occurs and the deployment still exists, this method will
            return and no exception will be raised.

    Raises:
        SeldonClientError: if an unknown error occurs during the deployment
            removal.
    """
    try:
        logger.debug(f"Deleting SeldonDeployment resource: {name}")

        # call `get_deployment` to check that the deployment exists
        # and is managed by ZenML. It will raise
        # a SeldonDeploymentNotFoundError otherwise
        self.get_deployment(name=name)

        response = (
            self._custom_objects_api.delete_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                name=name,
                _request_timeout=poll_timeout or None,
                grace_period_seconds=0 if force else None,
            )
        )
        logger.debug("Seldon Core API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when deleting SeldonDeployment resource %s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Exception when deleting SeldonDeployment resource {name}"
        ) from e

    while poll_timeout > 0:
        try:
            self.get_deployment(name=name)
        except SeldonDeploymentNotFoundError:
            return
        time.sleep(5)
        poll_timeout -= 5
delete_secret(name: str) -> None

Delete a Kubernetes Secret resource managed by ZenML.

Parameters:

Name Type Description Default
name str

the name of the Kubernetes Secret resource to delete.

required

Raises:

Type Description
SeldonClientError

if an unknown error occurs during the removal of the secret.

Source code in src/zenml/integrations/seldon/seldon_client.py
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
def delete_secret(
    self,
    name: str,
) -> None:
    """Delete a Kubernetes Secret resource managed by ZenML.

    Args:
        name: the name of the Kubernetes Secret resource to delete.

    Raises:
        SeldonClientError: if an unknown error occurs during the removal
            of the secret.
    """
    try:
        logger.debug(f"Deleting Secret resource: {name}")

        response = self._core_api.delete_namespaced_secret(
            name=name,
            namespace=self._namespace,
        )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            # the secret is no longer present, nothing to do
            return
        logger.error(
            "Exception when deleting Secret resource %s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Exception when deleting Secret resource {name}"
        ) from e
find_deployments(name: Optional[str] = None, labels: Optional[Dict[str, str]] = None, fields: Optional[Dict[str, str]] = None) -> List[SeldonDeployment]

Find all ZenML-managed Seldon Core deployment resources matching the given criteria.

Parameters:

Name Type Description Default
name Optional[str]

optional name of the deployment resource to find.

None
fields Optional[Dict[str, str]]

optional selector to restrict the list of returned Seldon deployments by their fields. Defaults to everything.

None
labels Optional[Dict[str, str]]

optional selector to restrict the list of returned Seldon deployments by their labels. Defaults to everything.

None

Returns:

Type Description
List[SeldonDeployment]

List of Seldon Core deployments that match the given criteria.

Raises:

Type Description
SeldonClientError

if an unknown error occurs while fetching the deployments.

Source code in src/zenml/integrations/seldon/seldon_client.py
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def find_deployments(
    self,
    name: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    fields: Optional[Dict[str, str]] = None,
) -> List[SeldonDeployment]:
    """Find all ZenML-managed Seldon Core deployment resources matching the given criteria.

    Args:
        name: optional name of the deployment resource to find.
        fields: optional selector to restrict the list of returned
            Seldon deployments by their fields. Defaults to everything.
        labels: optional selector to restrict the list of returned
            Seldon deployments by their labels. Defaults to everything.

    Returns:
        List of Seldon Core deployments that match the given criteria.

    Raises:
        SeldonClientError: if an unknown error occurs while fetching
            the deployments.
    """
    fields = fields or {}
    labels = labels or {}
    # always filter results to only include Seldon deployments managed
    # by ZenML
    labels["app"] = "zenml"
    if name:
        fields = {"metadata.name": name}
    field_selector = (
        ",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
    )
    label_selector = (
        ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
    )
    try:
        logger.debug(
            f"Searching SeldonDeployment resources with label selector "
            f"'{labels or ''}' and field selector '{fields or ''}'"
        )
        response = self._custom_objects_api.list_namespaced_custom_object(
            group="machinelearning.seldon.io",
            version="v1",
            namespace=self._namespace,
            plural="seldondeployments",
            field_selector=field_selector,
            label_selector=label_selector,
        )
        logger.debug(
            "Seldon Core API returned %s items", len(response["items"])
        )
        deployments = []
        for item in response.get("items") or []:
            try:
                deployments.append(SeldonDeployment(**item))
            except ValidationError as e:
                logger.error(
                    "Invalid Seldon Core deployment resource: %s\n%s",
                    str(e),
                    str(item),
                )
        return deployments
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when searching SeldonDeployment resources with "
            "label selector '%s' and field selector '%s': %s",
            label_selector or "",
            field_selector or "",
        )
        raise SeldonClientError(
            f"Unexpected exception when searching SeldonDeployment "
            f"with labels '{labels or ''}' and field '{fields or ''}'"
        ) from e
get_deployment(name: str) -> SeldonDeployment

Get a ZenML managed Seldon Core deployment resource by name.

Parameters:

Name Type Description Default
name str

the name of the Seldon Core deployment resource to fetch.

required

Returns:

Type Description
SeldonDeployment

The Seldon Core deployment resource.

Raises:

Type Description
SeldonDeploymentNotFoundError

if the deployment resource cannot be found or is not managed by ZenML.

SeldonClientError

if an unknown error occurs while fetching the deployment.

Source code in src/zenml/integrations/seldon/seldon_client.py
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
def get_deployment(self, name: str) -> SeldonDeployment:
    """Get a ZenML managed Seldon Core deployment resource by name.

    Args:
        name: the name of the Seldon Core deployment resource to fetch.

    Returns:
        The Seldon Core deployment resource.

    Raises:
        SeldonDeploymentNotFoundError: if the deployment resource cannot
            be found or is not managed by ZenML.
        SeldonClientError: if an unknown error occurs while fetching
            the deployment.
    """
    try:
        logger.debug(f"Retrieving SeldonDeployment resource: {name}")

        response = self._custom_objects_api.get_namespaced_custom_object(
            group="machinelearning.seldon.io",
            version="v1",
            namespace=self._namespace,
            plural="seldondeployments",
            name=name,
        )
        logger.debug("Seldon Core API response: %s", response)
        try:
            deployment = SeldonDeployment(**response)
        except ValidationError as e:
            logger.error(
                "Invalid Seldon Core deployment resource: %s\n%s",
                str(e),
                str(response),
            )
            raise SeldonDeploymentNotFoundError(
                f"SeldonDeployment resource {name} could not be parsed"
            )

        # Only Seldon deployments managed by ZenML are returned
        if not deployment.is_managed_by_zenml():
            raise SeldonDeploymentNotFoundError(
                f"Seldon Deployment {name} is not managed by ZenML"
            )
        return deployment

    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            raise SeldonDeploymentNotFoundError(
                f"SeldonDeployment resource not found: {name}"
            ) from e
        logger.error(
            "Exception when fetching SeldonDeployment resource %s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Unexpected exception when fetching SeldonDeployment "
            f"resource: {name}"
        ) from e
get_deployment_logs(name: str, follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Get the logs of a Seldon Core deployment resource.

Parameters:

Name Type Description Default
name str

the name of the Seldon Core deployment to get logs for.

required
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
None

A generator that can be accessed to get the service logs.

Yields:

Type Description
str

The next log line.

Raises:

Type Description
SeldonClientError

if an unknown error occurs while fetching the logs.

Source code in src/zenml/integrations/seldon/seldon_client.py
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
def get_deployment_logs(
    self,
    name: str,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Get the logs of a Seldon Core deployment resource.

    Args:
        name: the name of the Seldon Core deployment to get logs for.
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.

    Yields:
        The next log line.

    Raises:
        SeldonClientError: if an unknown error occurs while fetching
            the logs.
    """
    logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
    try:
        response = self._core_api.list_namespaced_pod(
            namespace=self._namespace,
            label_selector=f"seldon-deployment-id={name}",
        )
        logger.debug("Kubernetes API response: %s", response)
        pods = response.items
        if not pods:
            raise SeldonClientError(
                f"The Seldon Core deployment {name} is not currently "
                f"running: no Kubernetes pods associated with it were found"
            )
        pod = pods[0]
        pod_name = pod.metadata.name

        containers = [c.name for c in pod.spec.containers]
        init_containers = [c.name for c in pod.spec.init_containers]
        container_statuses = {
            c.name: c.started or c.restart_count
            for c in pod.status.container_statuses
        }

        container = "default"
        if container not in containers:
            container = containers[0]
        # some containers might not be running yet and have no logs to show,
        # so we need to filter them out
        if not container_statuses[container]:
            container = init_containers[0]

        logger.info(
            f"Retrieving logs for pod: `{pod_name}` and container "
            f"`{container}` in namespace `{self._namespace}`"
        )
        response = self._core_api.read_namespaced_pod_log(
            name=pod_name,
            namespace=self._namespace,
            container=container,
            follow=follow,
            tail_lines=tail,
            _preload_content=False,
        )
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when fetching logs for SeldonDeployment resource "
            "%s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Unexpected exception when fetching logs for SeldonDeployment "
            f"resource: {name}"
        ) from e

    try:
        while True:
            line = response.readline().decode("utf-8").rstrip("\n")
            if not line:
                return
            stop = yield line
            if stop:
                return
    finally:
        response.release_conn()
sanitize_labels(labels: Dict[str, str]) -> None staticmethod

Update the label values to be valid Kubernetes labels.

See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

Parameters:

Name Type Description Default
labels Dict[str, str]

the labels to sanitize.

required
Source code in src/zenml/integrations/seldon/seldon_client.py
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
    """Update the label values to be valid Kubernetes labels.

    See:
    https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

    Args:
        labels: the labels to sanitize.
    """
    for key, value in labels.items():
        # Kubernetes labels must be alphanumeric, no longer than
        # 63 characters, and must begin and end with an alphanumeric
        # character ([a-z0-9A-Z])
        labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
            "-_."
        )
update_deployment(deployment: SeldonDeployment, poll_timeout: int = 0) -> SeldonDeployment

Update a Seldon Core deployment resource.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource to update

required
poll_timeout int

the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised.

0

Returns:

Type Description
SeldonDeployment

the updated Seldon Core deployment resource with updated status.

Raises:

Type Description
SeldonClientError

if an unknown error occurs while updating the deployment.

Source code in src/zenml/integrations/seldon/seldon_client.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
def update_deployment(
    self,
    deployment: SeldonDeployment,
    poll_timeout: int = 0,
) -> SeldonDeployment:
    """Update a Seldon Core deployment resource.

    Args:
        deployment: the Seldon Core deployment resource to update
        poll_timeout: the maximum time to wait for the deployment to become
            available or to fail. If set to 0, the function will return
            immediately without checking the deployment status. If a timeout
            occurs and the deployment is still pending creation, it will
            be returned anyway and no exception will be raised.

    Returns:
        the updated Seldon Core deployment resource with updated status.

    Raises:
        SeldonClientError: if an unknown error occurs while updating the
            deployment.
    """
    try:
        logger.debug(
            f"Updating SeldonDeployment resource: {deployment.name}"
        )

        # mark the deployment as managed by ZenML, to differentiate
        # between deployments that are created by ZenML and those that
        # are not
        deployment.mark_as_managed_by_zenml()

        # call `get_deployment` to check that the deployment exists
        # and is managed by ZenML. It will raise
        # a SeldonDeploymentNotFoundError otherwise
        self.get_deployment(name=deployment.name)

        response = self._custom_objects_api.patch_namespaced_custom_object(
            group="machinelearning.seldon.io",
            version="v1",
            namespace=self._namespace,
            plural="seldondeployments",
            name=deployment.name,
            body=deployment.model_dump(exclude_none=True),
            _request_timeout=poll_timeout or None,
        )
        logger.debug("Seldon Core API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when updating SeldonDeployment resource: %s", str(e)
        )
        raise SeldonClientError(
            "Exception when creating SeldonDeployment resource"
        ) from e

    updated_deployment = self.get_deployment(name=deployment.name)

    while poll_timeout > 0 and updated_deployment.is_pending():
        time.sleep(5)
        poll_timeout -= 5
        updated_deployment = self.get_deployment(name=deployment.name)

    return updated_deployment
SeldonClientError

Bases: Exception

Base exception class for all exceptions raised by the SeldonClient.

SeldonClientTimeout

Bases: SeldonClientError

Raised when the Seldon client timed out while waiting for a resource to reach the expected status.

SeldonDeployment

Bases: BaseModel

A Seldon Core deployment CRD.

This is a Pydantic representation of some of the fields in the Seldon Core CRD (documented here: https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).

Note that not all fields are represented, only those that are relevant to the ZenML integration. The fields that are not represented are silently ignored when the Seldon Deployment is created or updated from an external SeldonDeployment CRD representation.

Attributes:

Name Type Description
kind Literal['SeldonDeployment']

Kubernetes kind field.

apiVersion Literal['machinelearning.seldon.io/v1']

Kubernetes apiVersion field.

metadata SeldonDeploymentMetadata

Kubernetes metadata field.

spec SeldonDeploymentSpec

Seldon Deployment spec entry.

status Optional[SeldonDeploymentStatus]

Seldon Deployment status.

Attributes
name: str property

Returns the name of this Seldon Deployment.

This is just a shortcut for self.metadata.name.

Returns:

Type Description
str

The name of this Seldon Deployment.

state: SeldonDeploymentStatusState property

The state of the Seldon Deployment.

Returns:

Type Description
SeldonDeploymentStatusState

The state of the Seldon Deployment.

Functions
build(name: Optional[str] = None, model_uri: Optional[str] = None, model_name: Optional[str] = None, implementation: Optional[str] = None, parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None, engineResources: Optional[SeldonResourceRequirements] = None, secret_name: Optional[str] = None, labels: Optional[Dict[str, str]] = None, annotations: Optional[Dict[str, str]] = None, is_custom_deployment: Optional[bool] = False, spec: Optional[Dict[Any, Any]] = None, serviceAccountName: Optional[str] = None) -> SeldonDeployment classmethod

Build a basic Seldon Deployment object.

Parameters:

Name Type Description Default
name Optional[str]

The name of the Seldon Deployment. If not explicitly passed, a unique name is autogenerated.

None
model_uri Optional[str]

The URI of the model.

None
model_name Optional[str]

The name of the model.

None
implementation Optional[str]

The implementation of the model.

None
parameters Optional[List[SeldonDeploymentPredictorParameter]]

The predictor graph parameters.

None
engineResources Optional[SeldonResourceRequirements]

The resources to be allocated to the model.

None
secret_name Optional[str]

The name of the Kubernetes secret containing environment variable values (e.g. with credentials for the artifact store) to use with the deployment service.

None
labels Optional[Dict[str, str]]

A dictionary of labels to apply to the Seldon Deployment.

None
annotations Optional[Dict[str, str]]

A dictionary of annotations to apply to the Seldon Deployment.

None
spec Optional[Dict[Any, Any]]

A Kubernetes pod spec to use for the Seldon Deployment.

None
is_custom_deployment Optional[bool]

Whether the Seldon Deployment is a custom or a built-in one.

False
serviceAccountName Optional[str]

The name of the service account to associate with the predictive unit container.

None

Returns:

Type Description
SeldonDeployment

A minimal SeldonDeployment object built from the provided

SeldonDeployment

parameters.

Source code in src/zenml/integrations/seldon/seldon_client.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
@classmethod
def build(
    cls,
    name: Optional[str] = None,
    model_uri: Optional[str] = None,
    model_name: Optional[str] = None,
    implementation: Optional[str] = None,
    parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None,
    engineResources: Optional[SeldonResourceRequirements] = None,
    secret_name: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    is_custom_deployment: Optional[bool] = False,
    spec: Optional[Dict[Any, Any]] = None,
    serviceAccountName: Optional[str] = None,
) -> "SeldonDeployment":
    """Build a basic Seldon Deployment object.

    Args:
        name: The name of the Seldon Deployment. If not explicitly passed,
            a unique name is autogenerated.
        model_uri: The URI of the model.
        model_name: The name of the model.
        implementation: The implementation of the model.
        parameters: The predictor graph parameters.
        engineResources: The resources to be allocated to the model.
        secret_name: The name of the Kubernetes secret containing
            environment variable values (e.g. with credentials for the
            artifact store) to use with the deployment service.
        labels: A dictionary of labels to apply to the Seldon Deployment.
        annotations: A dictionary of annotations to apply to the Seldon
            Deployment.
        spec: A Kubernetes pod spec to use for the Seldon Deployment.
        is_custom_deployment: Whether the Seldon Deployment is a custom
            or a built-in one.
        serviceAccountName: The name of the service account to associate
            with the predictive unit container.

    Returns:
        A minimal SeldonDeployment object built from the provided
        parameters.

    """
    if not name:
        name = f"zenml-{time.time()}"

    if labels is None:
        labels = {}
    if annotations is None:
        annotations = {}

    if is_custom_deployment:
        predictors = [
            SeldonDeploymentPredictor(
                name=model_name or "",
                graph=SeldonDeploymentPredictiveUnit(
                    name="classifier",
                    type=SeldonDeploymentPredictiveUnitType.MODEL,
                    parameters=parameters,
                    serviceAccountName=serviceAccountName,
                ),
                engineResources=engineResources,
                componentSpecs=[
                    SeldonDeploymentComponentSpecs(
                        spec=spec
                        # TODO [HIGH]: Add support for other component types (e.g. graph)
                    )
                ],
            )
        ]
    else:
        predictors = [
            SeldonDeploymentPredictor(
                name=model_name or "",
                graph=SeldonDeploymentPredictiveUnit(
                    name="classifier",
                    type=SeldonDeploymentPredictiveUnitType.MODEL,
                    modelUri=model_uri or "",
                    implementation=implementation or "",
                    envSecretRefName=secret_name,
                    parameters=parameters,
                    serviceAccountName=serviceAccountName,
                ),
                engineResources=engineResources,
            )
        ]

    return SeldonDeployment(
        metadata=SeldonDeploymentMetadata(
            name=name, labels=labels, annotations=annotations
        ),
        spec=SeldonDeploymentSpec(name=name, predictors=predictors),
    )
get_error() -> Optional[str]

Get a message describing the error, if in an error state.

Returns:

Type Description
Optional[str]

A message describing the error, if in an error state, otherwise

Optional[str]

None.

Source code in src/zenml/integrations/seldon/seldon_client.py
455
456
457
458
459
460
461
462
463
464
def get_error(self) -> Optional[str]:
    """Get a message describing the error, if in an error state.

    Returns:
        A message describing the error, if in an error state, otherwise
        None.
    """
    if self.status and self.is_failed():
        return self.status.description
    return None
get_pending_message() -> Optional[str]

Get a message describing the pending conditions of the Seldon Deployment.

Returns:

Type Description
Optional[str]

A message describing the pending condition of the Seldon

Optional[str]

Deployment, or None, if no conditions are pending.

Source code in src/zenml/integrations/seldon/seldon_client.py
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
def get_pending_message(self) -> Optional[str]:
    """Get a message describing the pending conditions of the Seldon Deployment.

    Returns:
        A message describing the pending condition of the Seldon
        Deployment, or None, if no conditions are pending.
    """
    if not self.status or not self.status.conditions:
        return None
    ready_condition_message = [
        c.message
        for c in self.status.conditions
        if c.type == "Ready" and not c.status
    ]
    if not ready_condition_message:
        return None
    return ready_condition_message[0]
is_available() -> bool

Checks if the Seldon Deployment is in an available state.

Returns:

Type Description
bool

True if the Seldon Deployment is available, False otherwise.

Source code in src/zenml/integrations/seldon/seldon_client.py
439
440
441
442
443
444
445
def is_available(self) -> bool:
    """Checks if the Seldon Deployment is in an available state.

    Returns:
        True if the Seldon Deployment is available, False otherwise.
    """
    return self.state == SeldonDeploymentStatusState.AVAILABLE
is_failed() -> bool

Checks if the Seldon Deployment is in a failed state.

Returns:

Type Description
bool

True if the Seldon Deployment is failed, False otherwise.

Source code in src/zenml/integrations/seldon/seldon_client.py
447
448
449
450
451
452
453
def is_failed(self) -> bool:
    """Checks if the Seldon Deployment is in a failed state.

    Returns:
        True if the Seldon Deployment is failed, False otherwise.
    """
    return self.state == SeldonDeploymentStatusState.FAILED
is_managed_by_zenml() -> bool

Checks if this Seldon Deployment is managed by ZenML.

The convention used to differentiate between SeldonDeployment instances that are managed by ZenML and those that are not is to set the app label value to zenml.

Returns:

Type Description
bool

True if the Seldon Deployment is managed by ZenML, False

bool

otherwise.

Source code in src/zenml/integrations/seldon/seldon_client.py
387
388
389
390
391
392
393
394
395
396
397
398
def is_managed_by_zenml(self) -> bool:
    """Checks if this Seldon Deployment is managed by ZenML.

    The convention used to differentiate between SeldonDeployment instances
    that are managed by ZenML and those that are not is to set the `app`
    label value to `zenml`.

    Returns:
        True if the Seldon Deployment is managed by ZenML, False
        otherwise.
    """
    return self.metadata.labels.get("app") == "zenml"
is_pending() -> bool

Checks if the Seldon Deployment is in a pending state.

Returns:

Type Description
bool

True if the Seldon Deployment is pending, False otherwise.

Source code in src/zenml/integrations/seldon/seldon_client.py
431
432
433
434
435
436
437
def is_pending(self) -> bool:
    """Checks if the Seldon Deployment is in a pending state.

    Returns:
        True if the Seldon Deployment is pending, False otherwise.
    """
    return self.state == SeldonDeploymentStatusState.CREATING
mark_as_managed_by_zenml() -> None

Marks this Seldon Deployment as managed by ZenML.

The convention used to differentiate between SeldonDeployment instances that are managed by ZenML and those that are not is to set the app label value to zenml.

Source code in src/zenml/integrations/seldon/seldon_client.py
400
401
402
403
404
405
406
407
def mark_as_managed_by_zenml(self) -> None:
    """Marks this Seldon Deployment as managed by ZenML.

    The convention used to differentiate between SeldonDeployment instances
    that are managed by ZenML and those that are not is to set the `app`
    label value to `zenml`.
    """
    self.metadata.labels["app"] = "zenml"
SeldonDeploymentComponentSpecs

Bases: BaseModel

Component specs for a Seldon Deployment.

Attributes:

Name Type Description
spec Optional[Dict[str, Any]]

the component spec.

SeldonDeploymentExistsError

Bases: SeldonClientError

Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists.

SeldonDeploymentMetadata

Bases: BaseModel

Metadata for a Seldon Deployment.

Attributes:

Name Type Description
name str

the name of the Seldon Deployment.

labels Dict[str, str]

Kubernetes labels for the Seldon Deployment.

annotations Dict[str, str]

Kubernetes annotations for the Seldon Deployment.

creationTimestamp Optional[str]

the creation timestamp of the Seldon Deployment.

SeldonDeploymentNotFoundError

Bases: SeldonClientError

Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML.

SeldonDeploymentPredictiveUnit

Bases: BaseModel

Seldon Deployment predictive unit.

Attributes:

Name Type Description
name str

the name of the predictive unit.

type Optional[SeldonDeploymentPredictiveUnitType]

predictive unit type.

implementation Optional[str]

the Seldon Core implementation used to serve the model.

modelUri Optional[str]

URI of the model (or models) to serve.

serviceAccountName Optional[str]

the name of the service account to associate with the predictive unit container.

envSecretRefName Optional[str]

the name of a Kubernetes secret that contains environment variables (e.g. credentials) to be configured for the predictive unit container.

children Optional[List[SeldonDeploymentPredictiveUnit]]

a list of child predictive units that together make up the model serving graph.

SeldonDeploymentPredictiveUnitType

Bases: StrEnum

Predictive unit types for a Seldon Deployment.

SeldonDeploymentPredictor

Bases: BaseModel

Seldon Deployment predictor.

Attributes:

Name Type Description
name str

the name of the predictor.

replicas int

the number of pod replicas for the predictor.

graph SeldonDeploymentPredictiveUnit

the serving graph composed of one or more predictive units.

SeldonDeploymentPredictorParameter

Bases: BaseModel

Parameter for Seldon Deployment predictor.

Attributes:

Name Type Description
name str

parameter name

type str

parameter, can be INT, FLOAT, DOUBLE, STRING, BOOL

value str

parameter value

SeldonDeploymentSpec

Bases: BaseModel

Spec for a Seldon Deployment.

Attributes:

Name Type Description
name str

the name of the Seldon Deployment.

protocol Optional[str]

the API protocol used for the Seldon Deployment.

predictors List[SeldonDeploymentPredictor]

a list of predictors that make up the serving graph.

replicas int

the default number of pod replicas used for the predictors.

SeldonDeploymentStatus

Bases: BaseModel

The status of a Seldon Deployment.

Attributes:

Name Type Description
state SeldonDeploymentStatusState

the current state of the Seldon Deployment.

description Optional[str]

a human-readable description of the current state.

replicas Optional[int]

the current number of running pod replicas

address Optional[SeldonDeploymentStatusAddress]

the address where the Seldon Deployment API can be accessed.

conditions List[SeldonDeploymentStatusCondition]

the list of Kubernetes conditions for the Seldon Deployment.

SeldonDeploymentStatusAddress

Bases: BaseModel

The status address for a Seldon Deployment.

Attributes:

Name Type Description
url str

the URL where the Seldon Deployment API can be accessed internally.

SeldonDeploymentStatusCondition

Bases: BaseModel

The Kubernetes status condition entry for a Seldon Deployment.

Attributes:

Name Type Description
type str

Type of runtime condition.

status bool

Status of the condition.

reason Optional[str]

Brief CamelCase string containing reason for the condition's last transition.

message Optional[str]

Human-readable message indicating details about last transition.

SeldonDeploymentStatusState

Bases: StrEnum

Possible state values for a Seldon Deployment.

SeldonResourceRequirements

Bases: BaseModel

Resource requirements for a Seldon deployed model.

Attributes:

Name Type Description
limits Dict[str, str]

an upper limit of resources to be used by the model

requests Dict[str, str]

resources requested by the model

Functions
create_seldon_core_custom_spec(model_uri: Optional[str], custom_docker_image: Optional[str], secret_name: Optional[str], command: Optional[List[str]], container_registry_secret_name: Optional[str] = None) -> k8s_client.V1PodSpec

Create a custom pod spec for the seldon core container.

Parameters:

Name Type Description Default
model_uri Optional[str]

The URI of the model to load.

required
custom_docker_image Optional[str]

The docker image to use.

required
secret_name Optional[str]

The name of the Kubernetes secret to use.

required
command Optional[List[str]]

The command to run in the container.

required
container_registry_secret_name Optional[str]

The name of the secret to use for docker image pull.

None

Returns:

Type Description
V1PodSpec

A pod spec for the seldon core container.

Source code in src/zenml/integrations/seldon/seldon_client.py
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
def create_seldon_core_custom_spec(
    model_uri: Optional[str],
    custom_docker_image: Optional[str],
    secret_name: Optional[str],
    command: Optional[List[str]],
    container_registry_secret_name: Optional[str] = None,
) -> k8s_client.V1PodSpec:
    """Create a custom pod spec for the seldon core container.

    Args:
        model_uri: The URI of the model to load.
        custom_docker_image: The docker image to use.
        secret_name: The name of the Kubernetes secret to use.
        command: The command to run in the container.
        container_registry_secret_name: The name of the secret to use for docker
            image pull.

    Returns:
        A pod spec for the seldon core container.
    """
    volume = k8s_client.V1Volume(
        name="classifier-provision-location",
        empty_dir={},
    )
    init_container = k8s_client.V1Container(
        name="classifier-model-initializer",
        image="seldonio/rclone-storage-initializer:1.14.0-dev",
        image_pull_policy="IfNotPresent",
        args=[model_uri, "/mnt/models"],
        volume_mounts=[
            k8s_client.V1VolumeMount(
                name="classifier-provision-location", mount_path="/mnt/models"
            )
        ],
    )
    if secret_name:
        init_container.env_from = [
            k8s_client.V1EnvFromSource(
                secret_ref=k8s_client.V1SecretEnvSource(
                    name=secret_name, optional=False
                )
            )
        ]
    container = k8s_client.V1Container(
        name="classifier",
        image=custom_docker_image,
        image_pull_policy="IfNotPresent",
        command=command,
        volume_mounts=[
            k8s_client.V1VolumeMount(
                name="classifier-provision-location",
                mount_path="/mnt/models",
                read_only=True,
            )
        ],
        ports=[
            k8s_client.V1ContainerPort(container_port=5000),
            k8s_client.V1ContainerPort(container_port=9000),
        ],
    )

    if container_registry_secret_name:
        image_pull_secret = k8s_client.V1LocalObjectReference(
            name=container_registry_secret_name
        )
        spec = k8s_client.V1PodSpec(
            volumes=[
                volume,
            ],
            init_containers=[
                init_container,
            ],
            image_pull_secrets=[image_pull_secret],
            containers=[container],
        )
    else:
        spec = k8s_client.V1PodSpec(
            volumes=[
                volume,
            ],
            init_containers=[
                init_container,
            ],
            containers=[container],
        )

    return api.sanitize_for_serialization(spec)

services

Initialization for Seldon services.

Classes
Modules
seldon_deployment

Implementation for the Seldon Deployment step.

Classes
SeldonDeploymentConfig(**data: Any)

Bases: ServiceConfig

Seldon Core deployment service configuration.

Attributes:

Name Type Description
model_uri str

URI of the model (or models) to serve.

model_name str

the name of the model. Multiple versions of the same model should use the same model name.

implementation str

the Seldon Core implementation used to serve the model. The implementation type can be one of the following: TENSORFLOW_SERVER, SKLEARN_SERVER, XGBOOST_SERVER, custom.

replicas int

number of replicas to use for the prediction service.

secret_name Optional[str]

the name of a Kubernetes secret containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store).

model_metadata Dict[str, Any]

optional model metadata information (see https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html).

extra_args Dict[str, Any]

additional arguments to pass to the Seldon Core deployment resource configuration.

is_custom_deployment Optional[bool]

whether the deployment is a custom deployment

spec Optional[Dict[Any, Any]]

custom Kubernetes resource specification for the Seldon Core

serviceAccountName Optional[str]

The name of the Service Account applied to the deployment.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
Functions
create_from_deployment(deployment: SeldonDeployment) -> SeldonDeploymentConfig classmethod

Recreate the configuration of a Seldon Core Service from a deployed instance.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource.

required

Returns:

Type Description
SeldonDeploymentConfig

The Seldon Core service configuration corresponding to the given

SeldonDeploymentConfig

Seldon Core deployment resource.

Raises:

Type Description
ValueError

if the given deployment resource does not contain the expected annotations, or it contains an invalid or incompatible Seldon Core service configuration.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@classmethod
def create_from_deployment(
    cls, deployment: SeldonDeployment
) -> "SeldonDeploymentConfig":
    """Recreate the configuration of a Seldon Core Service from a deployed instance.

    Args:
        deployment: the Seldon Core deployment resource.

    Returns:
        The Seldon Core service configuration corresponding to the given
        Seldon Core deployment resource.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected annotations, or it contains an invalid or
            incompatible Seldon Core service configuration.
    """
    config_data = deployment.metadata.annotations.get(
        "zenml.service_config"
    )
    if not config_data:
        raise ValueError(
            f"The given deployment resource does not contain a "
            f"'zenml.service_config' annotation: {deployment}"
        )
    try:
        service_config = cls.model_validate_json(config_data)
    except ValidationError as e:
        raise ValueError(
            f"The loaded Seldon Core deployment resource contains an "
            f"invalid or incompatible Seldon Core service configuration: "
            f"{config_data}"
        ) from e
    return service_config
get_seldon_deployment_annotations() -> Dict[str, str]

Generate annotations for the Seldon Core deployment from the service configuration.

The annotations are used to store additional information about the Seldon Core service that is associated with the deployment that is not available in the labels. One annotation particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote Seldon deployment.

Returns:

Type Description
Dict[str, str]

The annotations for the Seldon Core deployment.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def get_seldon_deployment_annotations(self) -> Dict[str, str]:
    """Generate annotations for the Seldon Core deployment from the service configuration.

    The annotations are used to store additional information about the
    Seldon Core service that is associated with the deployment that is
    not available in the labels. One annotation particularly important
    is the serialized Service configuration itself, which is used to
    recreate the service configuration from a remote Seldon deployment.

    Returns:
        The annotations for the Seldon Core deployment.
    """
    annotations = {
        "zenml.service_config": self.model_dump_json(),
        "zenml.version": __version__,
    }
    return annotations
get_seldon_deployment_labels() -> Dict[str, str]

Generate labels for the Seldon Core deployment from the service configuration.

These labels are attached to the Seldon Core deployment resource and may be used as label selectors in lookup operations.

Returns:

Type Description
Dict[str, str]

The labels for the Seldon Core deployment.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def get_seldon_deployment_labels(self) -> Dict[str, str]:
    """Generate labels for the Seldon Core deployment from the service configuration.

    These labels are attached to the Seldon Core deployment resource
    and may be used as label selectors in lookup operations.

    Returns:
        The labels for the Seldon Core deployment.
    """
    labels = {}
    if self.pipeline_name:
        labels["zenml.pipeline_name"] = self.pipeline_name
    if self.pipeline_step_name:
        labels["zenml.pipeline_step_name"] = self.pipeline_step_name
    if self.model_name:
        labels["zenml.model_name"] = self.model_name
    if self.model_uri:
        labels["zenml.model_uri"] = self.model_uri
    if self.implementation:
        labels["zenml.model_type"] = self.implementation
    if self.extra_args:
        for key, value in self.extra_args.items():
            labels[f"zenml.{key}"] = value
    SeldonClient.sanitize_labels(labels)
    return labels
SeldonDeploymentService(**attrs: Any)

Bases: BaseDeploymentService

A service that represents a Seldon Core deployment server.

Attributes:

Name Type Description
config SeldonDeploymentConfig

service configuration.

status SeldonDeploymentServiceStatus

service status.

Source code in src/zenml/services/service.py
188
189
190
191
192
193
194
195
196
197
198
def __init__(
    self,
    **attrs: Any,
) -> None:
    """Initialize the service instance.

    Args:
        **attrs: keyword arguments.
    """
    super().__init__(**attrs)
    self.config.name = self.config.name or self.__class__.__name__
Attributes
prediction_url: Optional[str] property

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if

Optional[str]

the service is not yet ready.

seldon_deployment_name: str property

Get the name of the Seldon Core deployment.

It should return the one that uniquely corresponds to this service instance.

Returns:

Type Description
str

The name of the Seldon Core deployment.

Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the Seldon Core deployment.

Returns:

Type Description
ServiceState

The operational state of the Seldon Core deployment and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the Seldon Core deployment.

    Returns:
        The operational state of the Seldon Core deployment and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    client = self._get_client()
    name = self.seldon_deployment_name
    try:
        deployment = client.get_deployment(name=name)
    except SeldonDeploymentNotFoundError:
        return (ServiceState.INACTIVE, "")

    if deployment.is_available():
        return (
            ServiceState.ACTIVE,
            f"Seldon Core deployment '{name}' is available",
        )

    if deployment.is_failed():
        return (
            ServiceState.ERROR,
            f"Seldon Core deployment '{name}' failed: "
            f"{deployment.get_error()}",
        )

    pending_message = deployment.get_pending_message() or ""
    return (
        ServiceState.PENDING_STARTUP,
        "Seldon Core deployment is being created: " + pending_message,
    )
create_from_deployment(deployment: SeldonDeployment) -> SeldonDeploymentService classmethod

Recreate a Seldon Core service from a Seldon Core deployment resource.

It should then update their operational status.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource.

required

Returns:

Type Description
SeldonDeploymentService

The Seldon Core service corresponding to the given

SeldonDeploymentService

Seldon Core deployment resource.

Raises:

Type Description
ValueError

if the given deployment resource does not contain the expected service_uuid label.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@classmethod
def create_from_deployment(
    cls, deployment: SeldonDeployment
) -> "SeldonDeploymentService":
    """Recreate a Seldon Core service from a Seldon Core deployment resource.

    It should then update their operational status.

    Args:
        deployment: the Seldon Core deployment resource.

    Returns:
        The Seldon Core service corresponding to the given
        Seldon Core deployment resource.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected service_uuid label.
    """
    config = SeldonDeploymentConfig.create_from_deployment(deployment)
    uuid = deployment.metadata.labels.get("zenml.service_uuid")
    if not uuid:
        raise ValueError(
            f"The given deployment resource does not contain a valid "
            f"'zenml.service_uuid' label: {deployment}"
        )
    service = cls(uuid=UUID(uuid), config=config)
    service.update_status()
    return service
deprovision(force: bool = False) -> None

Deprovision the remote Seldon Core deployment instance.

Parameters:

Name Type Description Default
force bool

if True, the remote deployment instance will be forcefully deprovisioned.

False
Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
321
322
323
324
325
326
327
328
329
330
331
332
333
def deprovision(self, force: bool = False) -> None:
    """Deprovision the remote Seldon Core deployment instance.

    Args:
        force: if True, the remote deployment instance will be
            forcefully deprovisioned.
    """
    client = self._get_client()
    name = self.seldon_deployment_name
    try:
        client.delete_deployment(name=name, force=force)
    except SeldonDeploymentNotFoundError:
        pass
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Get the logs of a Seldon Core model deployment.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
None

A generator that can be accessed to get the service logs.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
def get_logs(
    self,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Get the logs of a Seldon Core model deployment.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    return self._get_client().get_deployment_logs(
        self.seldon_deployment_name,
        follow=follow,
        tail=tail,
    )
predict(request: str) -> Any

Make a prediction using the service.

Parameters:

Name Type Description Default
request str

a numpy array representing the request

required

Returns:

Type Description
Any

A numpy array representing the prediction returned by the service.

Raises:

Type Description
Exception

if the service is not yet ready.

ValueError

if the prediction_url is not set.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def predict(self, request: str) -> Any:
    """Make a prediction using the service.

    Args:
        request: a numpy array representing the request

    Returns:
        A numpy array representing the prediction returned by the service.

    Raises:
        Exception: if the service is not yet ready.
        ValueError: if the prediction_url is not set.
    """
    if not self.is_running:
        raise Exception(
            "Seldon prediction service is not running. "
            "Please start the service before making predictions."
        )

    if self.prediction_url is None:
        raise ValueError("`self.prediction_url` is not set, cannot post.")

    if isinstance(request, str):
        request = json.loads(request)
    else:
        raise ValueError("Request must be a json string.")
    response = requests.post(  # nosec
        self.prediction_url,
        json={"data": {"ndarray": request}},
    )
    response.raise_for_status()
    return response.json()
provision() -> None

Provision or update remote Seldon Core deployment instance.

This should then match the current configuration.

Source code in src/zenml/integrations/seldon/services/seldon_deployment.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def provision(self) -> None:
    """Provision or update remote Seldon Core deployment instance.

    This should then match the current configuration.
    """
    client = self._get_client()

    name = self.seldon_deployment_name

    deployment = SeldonDeployment.build(
        name=name,
        model_uri=self.config.model_uri,
        model_name=self.config.model_name,
        implementation=self.config.implementation,
        parameters=self.config.parameters,
        engineResources=self.config.resources,
        secret_name=self.config.secret_name,
        labels=self._get_seldon_deployment_labels(),
        annotations=self.config.get_seldon_deployment_annotations(),
        is_custom_deployment=self.config.is_custom_deployment,
        spec=self.config.spec,
        serviceAccountName=self.config.serviceAccountName,
    )
    deployment.spec.replicas = self.config.replicas
    deployment.spec.predictors[0].replicas = self.config.replicas

    # check if the Seldon deployment already exists
    try:
        client.get_deployment(name=name)
        # update the existing deployment
        client.update_deployment(deployment)
    except SeldonDeploymentNotFoundError:
        # create the deployment
        client.create_deployment(deployment=deployment)
SeldonDeploymentServiceStatus

Bases: ServiceStatus

Seldon Core deployment service status.

Functions

steps

Initialization for Seldon steps.

Functions
Modules
seldon_deployer

Implementation of the Seldon Deployer step.

Classes Functions
seldon_custom_model_deployer_step(model: UnmaterializedArtifact, predict_function: str, service_config: SeldonDeploymentConfig, deploy_decision: bool = True, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> SeldonDeploymentService

Seldon Core custom model deployer pipeline step.

This step can be used in a pipeline to implement the the process required to deploy a custom model with Seldon Core.

Parameters:

Name Type Description Default
model UnmaterializedArtifact

the model artifact to deploy

required
predict_function str

Path to Python file containing predict function.

required
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration.

required
deploy_decision bool

whether to deploy the model or not

True
timeout int

the timeout in seconds to wait for the deployment to start

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Raises:

Type Description
ValueError

if the custom deployer is not defined

ValueError

if predict function path is not valid

TypeError

if predict function path is not a callable function

DoesNotExistException

if an entity does not exist raise an exception

RuntimeError

if the build is missing for the pipeline run

Returns:

Type Description
SeldonDeploymentService

Seldon Core deployment service

Source code in src/zenml/integrations/seldon/steps/seldon_deployer.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
@step(enable_cache=False, extra={SELDON_CUSTOM_DEPLOYMENT: True})
def seldon_custom_model_deployer_step(
    model: UnmaterializedArtifact,
    predict_function: str,
    service_config: SeldonDeploymentConfig,
    deploy_decision: bool = True,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> SeldonDeploymentService:
    """Seldon Core custom model deployer pipeline step.

    This step can be used in a pipeline to implement the
    the process required to deploy a custom model with Seldon Core.

    Args:
        model: the model artifact to deploy
        predict_function: Path to Python file containing predict function.
        service_config: Seldon Core deployment service configuration.
        deploy_decision: whether to deploy the model or not
        timeout: the timeout in seconds to wait for the deployment to start

    Raises:
        ValueError: if the custom deployer is not defined
        ValueError: if predict function path is not valid
        TypeError: if predict function path is not a callable function
        DoesNotExistException: if an entity does not exist raise an exception
        RuntimeError: if the build is missing for the pipeline run

    Returns:
        Seldon Core deployment service
    """
    # Verify that the predict function is valid
    try:
        loaded_predict_function = source_utils.load(predict_function)
    except AttributeError:
        raise ValueError("Predict function can't be found.")
    if not callable(loaded_predict_function):
        raise TypeError("Predict function must be callable.")

    # get the active model deployer
    model_deployer = cast(
        SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name, run id
    context = get_step_context()
    pipeline_name = context.pipeline.name
    step_name = context.step_run.name

    # update the step configuration with the real pipeline runtime information
    service_config.pipeline_name = pipeline_name
    service_config.pipeline_step_name = step_name
    service_config.is_custom_deployment = True

    # fetch existing services with the same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        config=service_config.model_dump()
    )
    # even when the deploy decision is negative if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not"
            f" meet the criteria. Reusing the last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{service_config.model_name}'..."
        )
        service = cast(SeldonDeploymentService, existing_services[0])
        # even when the deployment decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=timeout)
        return service

    # entrypoint for starting Seldon microservice deployment for custom model
    entrypoint_command = [
        "python",
        "-m",
        "zenml.integrations.seldon.custom_deployer.zenml_custom_model",
        "--model_name",
        service_config.model_name,
        "--predict_func",
        predict_function,
    ]

    # verify if there is an active stack before starting the service
    if not Client().active_stack:
        raise DoesNotExistException(
            "No active stack is available. "
            "Please make sure that you have registered and set a stack."
        )

    pipeline_run = context.pipeline_run
    if not pipeline_run.build:
        raise RuntimeError(
            f"Missing build for run {pipeline_run.id}. This is probably "
            "because the build was manually deleted."
        )

    image_name = pipeline_run.build.get_image(
        component_key=SELDON_DOCKER_IMAGE_KEY, step=step_name
    )

    # copy the model files to new specific directory for the deployment
    served_model_uri = os.path.join(
        context.get_output_artifact_uri(), "seldon"
    )
    fileio.makedirs(served_model_uri)
    io_utils.copy_dir(model.uri, served_model_uri)

    # save the model artifact metadata to the YAML file and copy it to the
    # deployment directory
    model_metadata_file = save_model_metadata(model)
    fileio.copy(
        model_metadata_file,
        os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
    )

    # prepare the service configuration for the deployment
    service_config = service_config.model_copy()
    service_config.model_uri = served_model_uri

    # create the specification for the custom deployment
    service_config.spec = create_seldon_core_custom_spec(
        model_uri=service_config.model_uri,
        custom_docker_image=image_name,
        secret_name=model_deployer.kubernetes_secret_name,
        command=entrypoint_command,
    ).to_dict()

    # deploy the service
    service = cast(
        SeldonDeploymentService,
        model_deployer.deploy_model(
            service_config,
            replace=True,
            timeout=timeout,
            service_type=SeldonDeploymentService.SERVICE_TYPE,
        ),
    )

    logger.info(
        f"Seldon Core deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service
seldon_mlflow_registry_deployer_step(service_config: SeldonDeploymentConfig, registry_model_name: Optional[str] = None, registry_model_version: Optional[str] = None, registry_model_stage: Optional[ModelVersionStage] = None, replace_existing: bool = True, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> SeldonDeploymentService

Seldon Core model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for a MLflow model with Seldon Core.

Parameters:

Name Type Description Default
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration

required
registry_model_name Optional[str]

name of the model in the model registry

None
registry_model_version Optional[str]

version of the model in the model registry

None
registry_model_stage Optional[ModelVersionStage]

stage of the model in the model registry

None
replace_existing bool

whether to replace an existing deployment of the model with the same name, this is used only when the model is deployed from a model registry stored model.

True
timeout int

the timeout in seconds to wait for the deployment to start

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
SeldonDeploymentService

Seldon Core deployment service

Raises:

Type Description
ValueError

if registry_model_name is not provided

ValueError

if neither registry_model_version nor registry_model_stage is provided

ValueError

if the MLflow experiment tracker is not available in the active stack

LookupError

if no model version is found in the MLflow model registry.

Source code in src/zenml/integrations/seldon/steps/seldon_deployer.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
@step(enable_cache=False)
def seldon_mlflow_registry_deployer_step(
    service_config: SeldonDeploymentConfig,
    registry_model_name: Optional[str] = None,
    registry_model_version: Optional[str] = None,
    registry_model_stage: Optional[ModelVersionStage] = None,
    replace_existing: bool = True,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> SeldonDeploymentService:
    """Seldon Core model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment for a MLflow model with Seldon Core.

    Args:
        service_config: Seldon Core deployment service configuration
        registry_model_name: name of the model in the model registry
        registry_model_version: version of the model in the model registry
        registry_model_stage: stage of the model in the model registry
        replace_existing: whether to replace an existing deployment of the
            model with the same name, this is used only when the model is
            deployed from a model registry stored model.
        timeout: the timeout in seconds to wait for the deployment to start

    Returns:
        Seldon Core deployment service

    Raises:
        ValueError: if registry_model_name is not provided
        ValueError: if neither registry_model_version nor
            registry_model_stage is provided
        ValueError: if the MLflow experiment tracker is not available in the
            active stack
        LookupError: if no model version is found in the MLflow model registry.
    """
    # import here to avoid failing the pipeline if the step is not used

    # check if the MLflow experiment tracker, MLflow model registry and
    # Seldon Core model deployer are available
    if not registry_model_name:
        raise ValueError(
            "registry_model_name must be provided to the MLflow "
            "model registry deployer step."
        )
    elif not registry_model_version and not registry_model_stage:
        raise ValueError(
            "Either registry_model_version or registry_model_stage must "
            "be provided in addition to registry_model_name to the MLflow "
            "model registry deployer step. Since the "
            "mlflow_model_registry_deployer_step is used in conjunction with "
            "the mlflow_model_registry."
        )
    if service_config.implementation != "MLFLOW_SERVER":
        raise ValueError(
            "This step only allows MLFLOW_SERVER implementation with seldon"
        )
    # Get the active model deployer
    model_deployer = cast(
        SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
    )

    # fetch the MLflow model registry
    model_registry = Client().active_stack.model_registry
    assert model_registry is not None

    # fetch the model version
    model_version = None
    if registry_model_version:
        try:
            model_version = model_registry.get_model_version(
                name=registry_model_name,
                version=registry_model_version,
            )
        except KeyError:
            model_version = None
    elif registry_model_stage:
        model_version = model_registry.get_latest_model_version(
            name=registry_model_name,
            stage=registry_model_stage,
        )
    if not model_version:
        raise LookupError(
            f"No Model Version found for model name "
            f"{registry_model_name} and version "
            f"{registry_model_version} or stage "
            f"{registry_model_stage}"
        )
    if model_version.model_format != MLFLOW_MODEL_FORMAT:
        raise ValueError(
            f"Model version {model_version.version} of model "
            f"{model_version.registered_model.name} is not an MLflow model."
            f"Only MLflow models can be deployed with Seldon Core using "
            f"this step."
        )
    # Prepare the service configuration
    service_config = service_config
    service_config.extra_args["registry_model_name"] = (
        model_version.registered_model.name,
    )
    service_config.extra_args["registry_model_version"] = (
        model_version.version,
    )
    service_config.extra_args["registry_model_stage"] = (
        model_version.stage.value,
    )
    service_config.model_uri = model_registry.get_model_uri_artifact_store(
        model_version=model_version,
    )
    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = (
        model_deployer.find_model_server(
            model_name=model_version.registered_model.name,
        )
        if replace_existing
        else []
    )
    # even when the deploy decision is negative, if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if existing_services:
        service = cast(SeldonDeploymentService, existing_services[0])
        # We need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if service.config.model_uri == service_config.model_uri:
            if not service.is_running:
                service.start()
            return service
        else:
            # stop the existing service
            service.stop()

    # invoke the Seldon Core model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    service = cast(
        SeldonDeploymentService,
        model_deployer.deploy_model(
            service_config,
            replace=True,
            timeout=timeout,
            service_type=SeldonDeploymentService.SERVICE_TYPE,
        ),
    )

    logger.info(
        f"Seldon deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service
seldon_model_deployer_step(model: UnmaterializedArtifact, service_config: SeldonDeploymentConfig, deploy_decision: bool = True, timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT) -> SeldonDeploymentService

Seldon Core model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for a ML model with Seldon Core.

Parameters:

Name Type Description Default
model UnmaterializedArtifact

the model artifact to deploy

required
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration.

required
deploy_decision bool

whether to deploy the model or not

True
timeout int

the timeout in seconds to wait for the deployment to start

DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
SeldonDeploymentService

Seldon Core deployment service

Source code in src/zenml/integrations/seldon/steps/seldon_deployer.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
@step(enable_cache=False)
def seldon_model_deployer_step(
    model: UnmaterializedArtifact,
    service_config: SeldonDeploymentConfig,
    deploy_decision: bool = True,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> SeldonDeploymentService:
    """Seldon Core model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment for a ML model with Seldon Core.

    Args:
        model: the model artifact to deploy
        service_config: Seldon Core deployment service configuration.
        deploy_decision: whether to deploy the model or not
        timeout: the timeout in seconds to wait for the deployment to start

    Returns:
        Seldon Core deployment service
    """
    model_deployer = cast(
        SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    context = get_step_context()
    pipeline_name = context.pipeline.name
    step_name = context.step_run.name

    # update the step configuration with the real pipeline runtime information
    service_config = service_config.model_copy()
    service_config.pipeline_name = pipeline_name
    service_config.pipeline_step_name = step_name

    def prepare_service_config(model_uri: str) -> SeldonDeploymentConfig:
        """Prepare the model files for model serving.

        This creates and returns a Seldon service configuration for the model.

        This function ensures that the model files are in the correct format
        and file structure required by the Seldon Core server implementation
        used for model serving.

        Args:
            model_uri: the URI of the model artifact being served

        Returns:
            The URL to the model ready for serving.

        Raises:
            RuntimeError: if the model files were not found
        """
        served_model_uri = os.path.join(
            context.get_output_artifact_uri(), "seldon"
        )
        fileio.makedirs(served_model_uri)

        # TODO [ENG-773]: determine how to formalize how models are organized into
        #   folders and sub-folders depending on the model type/format and the
        #   Seldon Core protocol used to serve the model.

        # TODO [ENG-791]: auto-detect built-in Seldon server implementation
        #   from the model artifact type

        # TODO [ENG-792]: validate the model artifact type against the
        #   supported built-in Seldon server implementations
        if service_config.implementation == "TENSORFLOW_SERVER":
            # the TensorFlow server expects model artifacts to be
            # stored in numbered subdirectories, each representing a model
            # version
            io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
        elif service_config.implementation == "SKLEARN_SERVER":
            # the sklearn server expects model artifacts to be
            # stored in a file called model.joblib
            model_uri = os.path.join(model.uri, "model")
            if not fileio.exists(model.uri):
                raise RuntimeError(
                    f"Expected sklearn model artifact was not found at "
                    f"{model_uri}"
                )
            fileio.copy(
                model_uri, os.path.join(served_model_uri, "model.joblib")
            )
        else:
            # default treatment for all other server implementations is to
            # simply reuse the model from the artifact store path where it
            # is originally stored
            served_model_uri = model_uri

        service_config.model_uri = served_model_uri
        return service_config

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        config=service_config.model_dump()
    )

    # even when the deploy decision is negative, if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{service_config.model_name}'..."
        )
        service = cast(SeldonDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=timeout)
        return service

    # invoke the Seldon Core model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    service_config = prepare_service_config(model.uri)
    service = cast(
        SeldonDeploymentService,
        model_deployer.deploy_model(
            service_config,
            replace=True,
            timeout=timeout,
            service_type=SeldonDeploymentService.SERVICE_TYPE,
        ),
    )

    logger.info(
        f"Seldon deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service
Modules