Skip to content

Databricks

zenml.integrations.databricks

Initialization of the Databricks integration for ZenML.

Attributes

DATABRICKS = 'databricks' module-attribute

DATABRICKS_MODEL_DEPLOYER_FLAVOR = 'databricks' module-attribute

DATABRICKS_ORCHESTRATOR_FLAVOR = 'databricks' module-attribute

DATABRICKS_SERVICE_ARTIFACT = 'databricks_deployment_service' module-attribute

Classes

DatabricksIntegration

Bases: Integration

Definition of Databricks Integration for ZenML.

Functions
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Databricks integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/databricks/__init__.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Databricks integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.databricks.flavors import (
        DatabricksOrchestratorFlavor,
        DatabricksModelDeployerFlavor,
    )

    return [
        DatabricksOrchestratorFlavor,
        DatabricksModelDeployerFlavor,
    ]
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/databricks/__init__.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@classmethod
def get_requirements(
    cls, target_os: Optional[str] = None, python_version: Optional[str] = None
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    from zenml.integrations.numpy import NumpyIntegration
    from zenml.integrations.pandas import PandasIntegration

    return cls.REQUIREMENTS + \
        NumpyIntegration.get_requirements(target_os=target_os, python_version=python_version) + \
        PandasIntegration.get_requirements(target_os=target_os, python_version=python_version)

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

flavors

Databricks integration flavors.

Classes
DatabricksModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig

Configuration for the Databricks model deployer.

Attributes:

Name Type Description
host str

Databricks host.

secret_name Optional[str]

Secret name to use for authentication.

client_id Optional[str]

Databricks client id.

client_secret Optional[str]

Databricks client secret.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
DatabricksModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Databricks Endpoint model deployer flavor.

Attributes
config_class: Type[DatabricksModelDeployerConfig] property

Returns DatabricksModelDeployerConfig config class.

Returns:

Type Description
Type[DatabricksModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DatabricksModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[DatabricksModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

DatabricksOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, DatabricksOrchestratorSettings

Databricks orchestrator base config.

Attributes:

Name Type Description
host str

Databricks host.

client_id str

Databricks client id.

client_secret str

Databricks client secret.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

DatabricksOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Databricks orchestrator flavor.

Attributes
config_class: Type[DatabricksOrchestratorConfig] property

Returns KubeflowOrchestratorConfig config class.

Returns:

Type Description
Type[DatabricksOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DatabricksOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[DatabricksOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Modules
databricks_model_deployer_flavor

Databricks model deployer flavor.

Classes
DatabricksBaseConfig

Bases: BaseModel

Databricks Inference Endpoint configuration.

DatabricksModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig

Configuration for the Databricks model deployer.

Attributes:

Name Type Description
host str

Databricks host.

secret_name Optional[str]

Secret name to use for authentication.

client_id Optional[str]

Databricks client id.

client_secret Optional[str]

Databricks client secret.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
DatabricksModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Databricks Endpoint model deployer flavor.

Attributes
config_class: Type[DatabricksModelDeployerConfig] property

Returns DatabricksModelDeployerConfig config class.

Returns:

Type Description
Type[DatabricksModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DatabricksModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[DatabricksModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Functions
databricks_orchestrator_flavor

Databricks orchestrator base config and settings.

Classes
DatabricksAvailabilityType

Bases: StrEnum

Databricks availability type.

DatabricksOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, DatabricksOrchestratorSettings

Databricks orchestrator base config.

Attributes:

Name Type Description
host str

Databricks host.

client_id str

Databricks client id.

client_secret str

Databricks client secret.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_local: bool property

Checks if this stack component is running locally.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property

Checks if this stack component is running remotely.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

DatabricksOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Databricks orchestrator flavor.

Attributes
config_class: Type[DatabricksOrchestratorConfig] property

Returns KubeflowOrchestratorConfig config class.

Returns:

Type Description
Type[DatabricksOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[DatabricksOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[DatabricksOrchestrator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

DatabricksOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Databricks orchestrator base settings.

Attributes:

Name Type Description
spark_version Optional[str]

Spark version.

num_workers Optional[int]

Number of workers.

node_type_id Optional[str]

Node type id.

policy_id Optional[str]

Policy id.

autotermination_minutes Optional[int]

Autotermination minutes.

autoscale Tuple[int, int]

Autoscale.

single_user_name Optional[str]

Single user name.

spark_conf Optional[Dict[str, str]]

Spark configuration.

spark_env_vars Optional[Dict[str, str]]

Spark environment variables.

schedule_timezone Optional[str]

Schedule timezone.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions

model_deployers

Initialization of the Databricks model deployers.

Classes
DatabricksModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

Databricks endpoint model deployer.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: DatabricksModelDeployerConfig property

Config class for the Databricks Model deployer settings class.

Returns:

Type Description
DatabricksModelDeployerConfig

The configuration.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote artifact

Optional[StackValidator]

store.

Functions
get_model_server_info(service_instance: DatabricksDeploymentService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance DatabricksDeploymentService

Instance of a DatabricksDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "DatabricksDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a DatabricksDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
    }
perform_delete_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> None

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False
Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    service = cast(DatabricksDeploymentService, service)
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Create a new Databricks deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

Parameters:

Name Type Description Default
id UUID

the UUID of the model to be deployed with Databricks.

required
config ServiceConfig

the configuration of the model to be deployed with Databricks.

required
timeout int

the timeout in seconds to wait for the Databricks endpoint to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Databricks server is provisioned, without waiting for it to fully start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML Databricks deployment service object that can be used to

BaseService

interact with the remote Databricks inference endpoint server.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Databricks deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    Args:
        id: the UUID of the model to be deployed with Databricks.
        config: the configuration of the model to be deployed with Databricks.
        timeout: the timeout in seconds to wait for the Databricks endpoint
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Databricks
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Databricks deployment service object that can be used to
        interact with the remote Databricks inference endpoint server.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(DatabricksDeploymentConfig, config)
        # create a new DatabricksDeploymentService instance
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(
            f"Creating a new Databricks inference endpoint service: {service}"
        )
        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
        }

    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The started service.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service
Modules
databricks_model_deployer

Implementation of the Databricks Model Deployer.

Classes
DatabricksModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

Databricks endpoint model deployer.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: DatabricksModelDeployerConfig property

Config class for the Databricks Model deployer settings class.

Returns:

Type Description
DatabricksModelDeployerConfig

The configuration.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote artifact

Optional[StackValidator]

store.

Functions
get_model_server_info(service_instance: DatabricksDeploymentService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance DatabricksDeploymentService

Instance of a DatabricksDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "DatabricksDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a DatabricksDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
    }
perform_delete_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> None

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False
Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    service = cast(DatabricksDeploymentService, service)
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Create a new Databricks deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

Parameters:

Name Type Description Default
id UUID

the UUID of the model to be deployed with Databricks.

required
config ServiceConfig

the configuration of the model to be deployed with Databricks.

required
timeout int

the timeout in seconds to wait for the Databricks endpoint to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Databricks server is provisioned, without waiting for it to fully start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML Databricks deployment service object that can be used to

BaseService

interact with the remote Databricks inference endpoint server.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Databricks deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    Args:
        id: the UUID of the model to be deployed with Databricks.
        config: the configuration of the model to be deployed with Databricks.
        timeout: the timeout in seconds to wait for the Databricks endpoint
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Databricks
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Databricks deployment service object that can be used to
        interact with the remote Databricks inference endpoint server.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(DatabricksDeploymentConfig, config)
        # create a new DatabricksDeploymentService instance
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(
            f"Creating a new Databricks inference endpoint service: {service}"
        )
        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
        }

    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The started service.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in src/zenml/integrations/databricks/model_deployers/databricks_model_deployer.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service
Functions

orchestrators

Initialization of the Databricks ZenML orchestrator.

Classes
DatabricksOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: WheeledOrchestrator

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: DatabricksOrchestratorConfig property

Returns the DatabricksOrchestratorConfig config.

Returns:

Type Description
DatabricksOrchestratorConfig

The configuration.

pipeline_directory: str property

Returns path to a directory in which the kubeflow pipeline files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property

Path to the root directory for all files concerning this orchestrator.

Returns:

Type Description
str

Path to the root directory.

settings_class: Type[DatabricksOrchestratorSettings] property

Settings class for the Databricks orchestrator.

Returns:

Type Description
Type[DatabricksOrchestratorSettings]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Raises:

Type Description
RuntimeError

If the run id cannot be read from the environment.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If the run id cannot be read from the environment.
    """
    try:
        return os.environ[ENV_ZENML_DATABRICKS_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_DATABRICKS_ORCHESTRATOR_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    run_url = f"{self.config.host}/jobs/{self.get_orchestrator_run_id()}"
    return {
        METADATA_ORCHESTRATOR_URL: Uri(run_url),
    }
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Creates a wheel and uploads the pipeline to Databricks.

This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a docker image that contains the code for the pipeline, all steps the context around these files.

Based on this docker image a callable is created which builds task for each step (_construct_databricks_pipeline). To do this the entrypoint of the docker image is configured to run the correct step within the docker image. The dependencies between these task are then also configured onto each task by pointing at the downstream steps.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
ValueError

If the schedule is not set or if the cron expression is not set.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Creates a wheel and uploads the pipeline to Databricks.

    This functions as an intermediary representation of the pipeline which
    is then deployed to the kubeflow pipelines instance.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()`
    method builds a docker image that contains the code for the
    pipeline, all steps the context around these files.

    Based on this docker image a callable is created which builds
    task for each step (`_construct_databricks_pipeline`).
    To do this the entrypoint of the docker image is configured to
    run the correct step within the docker image. The dependencies
    between these task are then also configured onto each
    task by pointing at the downstream steps.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        ValueError: If the schedule is not set or if the cron expression
            is not set.
    """
    settings = cast(
        DatabricksOrchestratorSettings, self.get_settings(deployment)
    )
    if deployment.schedule:
        if (
            deployment.schedule.catchup
            or deployment.schedule.interval_second
        ):
            logger.warning(
                "Databricks orchestrator only uses schedules with the "
                "`cron_expression` property, with optional `start_time` and/or `end_time`. "
                "All other properties are ignored."
            )
        if deployment.schedule.cron_expression is None:
            raise ValueError(
                "Property `cron_expression` must be set when passing "
                "schedule to a Databricks orchestrator."
            )
        if (
            deployment.schedule.cron_expression
            and settings.schedule_timezone is None
        ):
            raise ValueError(
                "Property `schedule_timezone` must be set when passing "
                "`cron_expression` to a Databricks orchestrator."
                "Databricks orchestrator requires a Java Timezone ID to run the pipeline on schedule."
                "Please refer to https://docs.oracle.com/middleware/1221/wcs/tag-ref/MISC/TimeZones.html for more information."
            )

    # Get deployment id
    deployment_id = deployment.id

    # Create a callable for future compilation into a dsl.Pipeline.
    def _construct_databricks_pipeline(
        zenml_project_wheel: str, job_cluster_key: str
    ) -> List[DatabricksTask]:
        """Create a databrcks task for each step.

        This should contain the name of the step or task and configures the
        entrypoint of the task to run the step.

        Additionally, this gives each task information about its
        direct downstream steps.

        Args:
            zenml_project_wheel: The wheel package containing the ZenML
                project.
            job_cluster_key: The ID of the Databricks job cluster.

        Returns:
            A list of Databricks tasks.
        """
        tasks = []
        for step_name, step in deployment.step_configurations.items():
            # The arguments are passed to configure the entrypoint of the
            # docker container when the step is called.
            arguments = DatabricksEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name,
                deployment_id=deployment_id,
                wheel_package=self.package_name,
                databricks_job_id=DATABRICKS_JOB_ID_PARAMETER_REFERENCE,
            )

            # Find the upstream container ops of the current step and
            # configure the current container op to run after them
            upstream_steps = [
                f"{deployment_id}_{upstream_step_name}"
                for upstream_step_name in step.spec.upstream_steps
            ]

            docker_settings = step.config.docker_settings
            docker_image_builder = PipelineDockerImageBuilder()
            # Gather the requirements files
            requirements_files = (
                docker_image_builder.gather_requirements_files(
                    docker_settings=docker_settings,
                    stack=Client().active_stack,
                    log=False,
                )
            )

            # Extract and clean the requirements
            requirements = list(
                itertools.chain.from_iterable(
                    r[1].strip().split("\n") for r in requirements_files
                )
            )

            # Remove empty items and duplicates
            requirements = sorted(set(filter(None, requirements)))

            task = convert_step_to_task(
                f"{deployment_id}_{step_name}",
                ZENML_STEP_DEFAULT_ENTRYPOINT_COMMAND,
                arguments,
                clean_requirements(requirements),
                depends_on=upstream_steps,
                zenml_project_wheel=zenml_project_wheel,
                job_cluster_key=job_cluster_key,
            )
            tasks.append(task)
        return tasks

    # Get the orchestrator run name
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    # Get a filepath to use to save the finished yaml to
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory, f"{orchestrator_run_name}.yaml"
    )

    # Copy the repository to a temporary directory and add a setup.py file
    repository_temp_dir = (
        self.copy_repository_to_temp_dir_and_add_setup_py()
    )

    # Create a wheel for the package in the temporary directory
    wheel_path = self.create_wheel(temp_dir=repository_temp_dir)

    databricks_client = self._get_databricks_client()

    # Create an empty folder in a volume.
    deployment_name = (
        deployment.pipeline.name if deployment.pipeline else "default"
    )
    databricks_directory = f"{DATABRICKS_WHEELS_DIRECTORY_PREFIX}/{deployment_name}/{orchestrator_run_name}"
    databricks_wheel_path = (
        f"{databricks_directory}/{wheel_path.rsplit('/', 1)[-1]}"
    )

    databricks_client.dbutils.fs.mkdirs(databricks_directory)
    databricks_client.dbutils.fs.cp(
        f"{DATABRICKS_LOCAL_FILESYSTEM_PREFIX}/{wheel_path}",
        databricks_wheel_path,
    )

    # Construct the env variables for the pipeline
    env_vars = environment.copy()
    spark_env_vars = settings.spark_env_vars
    if spark_env_vars:
        for key, value in spark_env_vars.items():
            env_vars[key] = value
    env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = (
        DATABRICKS_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH
    )

    fileio.rmtree(repository_temp_dir)

    logger.info(
        "Writing Databricks workflow definition to `%s`.",
        pipeline_file_path,
    )

    # using the databricks client uploads the pipeline to databricks
    job_cluster_key = self.sanitize_name(f"{deployment_id}")
    self._upload_and_run_pipeline(
        pipeline_name=orchestrator_run_name,
        settings=settings,
        tasks=_construct_databricks_pipeline(
            databricks_wheel_path, job_cluster_key
        ),
        env_vars=env_vars,
        job_cluster_key=job_cluster_key,
        schedule=deployment.schedule,
    )
setup_credentials() -> None

Set up credentials for the orchestrator.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
193
194
195
196
197
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()
Modules
databricks_orchestrator

Implementation of the Databricks orchestrator.

Classes
DatabricksOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: WheeledOrchestrator

Base class for Orchestrator responsible for running pipelines remotely in a VM.

This orchestrator does not support running on a schedule.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: DatabricksOrchestratorConfig property

Returns the DatabricksOrchestratorConfig config.

Returns:

Type Description
DatabricksOrchestratorConfig

The configuration.

pipeline_directory: str property

Returns path to a directory in which the kubeflow pipeline files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property

Path to the root directory for all files concerning this orchestrator.

Returns:

Type Description
str

Path to the root directory.

settings_class: Type[DatabricksOrchestratorSettings] property

Settings class for the Databricks orchestrator.

Returns:

Type Description
Type[DatabricksOrchestratorSettings]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Raises:

Type Description
RuntimeError

If the run id cannot be read from the environment.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If the run id cannot be read from the environment.
    """
    try:
        return os.environ[ENV_ZENML_DATABRICKS_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_DATABRICKS_ORCHESTRATOR_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    run_url = f"{self.config.host}/jobs/{self.get_orchestrator_run_id()}"
    return {
        METADATA_ORCHESTRATOR_URL: Uri(run_url),
    }
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Any

Creates a wheel and uploads the pipeline to Databricks.

This functions as an intermediary representation of the pipeline which is then deployed to the kubeflow pipelines instance.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a docker image that contains the code for the pipeline, all steps the context around these files.

Based on this docker image a callable is created which builds task for each step (_construct_databricks_pipeline). To do this the entrypoint of the docker image is configured to run the correct step within the docker image. The dependencies between these task are then also configured onto each task by pointing at the downstream steps.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
ValueError

If the schedule is not set or if the cron expression is not set.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Any:
    """Creates a wheel and uploads the pipeline to Databricks.

    This functions as an intermediary representation of the pipeline which
    is then deployed to the kubeflow pipelines instance.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()`
    method builds a docker image that contains the code for the
    pipeline, all steps the context around these files.

    Based on this docker image a callable is created which builds
    task for each step (`_construct_databricks_pipeline`).
    To do this the entrypoint of the docker image is configured to
    run the correct step within the docker image. The dependencies
    between these task are then also configured onto each
    task by pointing at the downstream steps.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        ValueError: If the schedule is not set or if the cron expression
            is not set.
    """
    settings = cast(
        DatabricksOrchestratorSettings, self.get_settings(deployment)
    )
    if deployment.schedule:
        if (
            deployment.schedule.catchup
            or deployment.schedule.interval_second
        ):
            logger.warning(
                "Databricks orchestrator only uses schedules with the "
                "`cron_expression` property, with optional `start_time` and/or `end_time`. "
                "All other properties are ignored."
            )
        if deployment.schedule.cron_expression is None:
            raise ValueError(
                "Property `cron_expression` must be set when passing "
                "schedule to a Databricks orchestrator."
            )
        if (
            deployment.schedule.cron_expression
            and settings.schedule_timezone is None
        ):
            raise ValueError(
                "Property `schedule_timezone` must be set when passing "
                "`cron_expression` to a Databricks orchestrator."
                "Databricks orchestrator requires a Java Timezone ID to run the pipeline on schedule."
                "Please refer to https://docs.oracle.com/middleware/1221/wcs/tag-ref/MISC/TimeZones.html for more information."
            )

    # Get deployment id
    deployment_id = deployment.id

    # Create a callable for future compilation into a dsl.Pipeline.
    def _construct_databricks_pipeline(
        zenml_project_wheel: str, job_cluster_key: str
    ) -> List[DatabricksTask]:
        """Create a databrcks task for each step.

        This should contain the name of the step or task and configures the
        entrypoint of the task to run the step.

        Additionally, this gives each task information about its
        direct downstream steps.

        Args:
            zenml_project_wheel: The wheel package containing the ZenML
                project.
            job_cluster_key: The ID of the Databricks job cluster.

        Returns:
            A list of Databricks tasks.
        """
        tasks = []
        for step_name, step in deployment.step_configurations.items():
            # The arguments are passed to configure the entrypoint of the
            # docker container when the step is called.
            arguments = DatabricksEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name,
                deployment_id=deployment_id,
                wheel_package=self.package_name,
                databricks_job_id=DATABRICKS_JOB_ID_PARAMETER_REFERENCE,
            )

            # Find the upstream container ops of the current step and
            # configure the current container op to run after them
            upstream_steps = [
                f"{deployment_id}_{upstream_step_name}"
                for upstream_step_name in step.spec.upstream_steps
            ]

            docker_settings = step.config.docker_settings
            docker_image_builder = PipelineDockerImageBuilder()
            # Gather the requirements files
            requirements_files = (
                docker_image_builder.gather_requirements_files(
                    docker_settings=docker_settings,
                    stack=Client().active_stack,
                    log=False,
                )
            )

            # Extract and clean the requirements
            requirements = list(
                itertools.chain.from_iterable(
                    r[1].strip().split("\n") for r in requirements_files
                )
            )

            # Remove empty items and duplicates
            requirements = sorted(set(filter(None, requirements)))

            task = convert_step_to_task(
                f"{deployment_id}_{step_name}",
                ZENML_STEP_DEFAULT_ENTRYPOINT_COMMAND,
                arguments,
                clean_requirements(requirements),
                depends_on=upstream_steps,
                zenml_project_wheel=zenml_project_wheel,
                job_cluster_key=job_cluster_key,
            )
            tasks.append(task)
        return tasks

    # Get the orchestrator run name
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    # Get a filepath to use to save the finished yaml to
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory, f"{orchestrator_run_name}.yaml"
    )

    # Copy the repository to a temporary directory and add a setup.py file
    repository_temp_dir = (
        self.copy_repository_to_temp_dir_and_add_setup_py()
    )

    # Create a wheel for the package in the temporary directory
    wheel_path = self.create_wheel(temp_dir=repository_temp_dir)

    databricks_client = self._get_databricks_client()

    # Create an empty folder in a volume.
    deployment_name = (
        deployment.pipeline.name if deployment.pipeline else "default"
    )
    databricks_directory = f"{DATABRICKS_WHEELS_DIRECTORY_PREFIX}/{deployment_name}/{orchestrator_run_name}"
    databricks_wheel_path = (
        f"{databricks_directory}/{wheel_path.rsplit('/', 1)[-1]}"
    )

    databricks_client.dbutils.fs.mkdirs(databricks_directory)
    databricks_client.dbutils.fs.cp(
        f"{DATABRICKS_LOCAL_FILESYSTEM_PREFIX}/{wheel_path}",
        databricks_wheel_path,
    )

    # Construct the env variables for the pipeline
    env_vars = environment.copy()
    spark_env_vars = settings.spark_env_vars
    if spark_env_vars:
        for key, value in spark_env_vars.items():
            env_vars[key] = value
    env_vars[ENV_ZENML_CUSTOM_SOURCE_ROOT] = (
        DATABRICKS_ZENML_DEFAULT_CUSTOM_REPOSITORY_PATH
    )

    fileio.rmtree(repository_temp_dir)

    logger.info(
        "Writing Databricks workflow definition to `%s`.",
        pipeline_file_path,
    )

    # using the databricks client uploads the pipeline to databricks
    job_cluster_key = self.sanitize_name(f"{deployment_id}")
    self._upload_and_run_pipeline(
        pipeline_name=orchestrator_run_name,
        settings=settings,
        tasks=_construct_databricks_pipeline(
            databricks_wheel_path, job_cluster_key
        ),
        env_vars=env_vars,
        job_cluster_key=job_cluster_key,
        schedule=deployment.schedule,
    )
setup_credentials() -> None

Set up credentials for the orchestrator.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator.py
193
194
195
196
197
def setup_credentials(self) -> None:
    """Set up credentials for the orchestrator."""
    connector = self.get_connector()
    assert connector is not None
    connector.configure_local_client()
Functions Modules
databricks_orchestrator_entrypoint_config

Entrypoint configuration for ZenML Databricks pipeline steps.

Classes
DatabricksEntrypointConfiguration(arguments: List[str])

Bases: StepEntrypointConfiguration

Entrypoint configuration for ZenML Databricks pipeline steps.

The only purpose of this entrypoint configuration is to reconstruct the environment variables that exceed the maximum length of 256 characters allowed for Databricks Processor steps from their individual components.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

The argument list should be something that argparse.ArgumentParser.parse_args(...) can handle (e.g. ["--some_option", "some_value"] or ["--some_option=some_value"]). It needs to provide values for all options returned by the get_entrypoint_options() method of this class.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, must include the step name.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for the wheel package.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator_entrypoint_config.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@classmethod
def get_entrypoint_arguments(
    cls,
    **kwargs: Any,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    The argument list should be something that
    `argparse.ArgumentParser.parse_args(...)` can handle (e.g.
    `["--some_option", "some_value"]` or `["--some_option=some_value"]`).
    It needs to provide values for all options returned by the
    `get_entrypoint_options()` method of this class.

    Args:
        **kwargs: Kwargs, must include the step name.

    Returns:
        The superclass arguments as well as arguments for the wheel package.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{WHEEL_PACKAGE_OPTION}",
        kwargs[WHEEL_PACKAGE_OPTION],
        f"--{DATABRICKS_JOB_ID_OPTION}",
        kwargs[DATABRICKS_JOB_ID_OPTION],
    ]
get_entrypoint_options() -> Set[str] classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the wheel package.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator_entrypoint_config.py
41
42
43
44
45
46
47
48
49
50
51
52
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the wheel package.
    """
    return (
        super().get_entrypoint_options()
        | {WHEEL_PACKAGE_OPTION}
        | {DATABRICKS_JOB_ID_OPTION}
    )
run() -> None

Runs the step.

Source code in src/zenml/integrations/databricks/orchestrators/databricks_orchestrator_entrypoint_config.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def run(self) -> None:
    """Runs the step."""
    # Get the wheel package and add it to the sys path
    wheel_package = self.entrypoint_args[WHEEL_PACKAGE_OPTION]
    distribution = pkg_resources.get_distribution(wheel_package)
    project_root = os.path.join(distribution.location, wheel_package)
    if project_root not in sys.path:
        sys.path.insert(0, project_root)
        sys.path.insert(-1, project_root)

    # Get the job id and add it to the environment
    databricks_job_id = self.entrypoint_args[DATABRICKS_JOB_ID_OPTION]
    os.environ[ENV_ZENML_DATABRICKS_ORCHESTRATOR_RUN_ID] = (
        databricks_job_id
    )

    # Run the step
    super().run()

services

Initialization of the Databricks Service.

Classes
Modules
databricks_deployment

Implementation of the Databricks Deployment service.

Classes
DatabricksDeploymentConfig(**data: Any)

Bases: DatabricksBaseConfig, ServiceConfig

Databricks service configurations.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
Functions
get_databricks_deployment_labels() -> Dict[str, str]

Generate labels for the Databricks deployment from the service configuration.

These labels are attached to the Databricks deployment resource and may be used as label selectors in lookup operations.

Returns:

Type Description
Dict[str, str]

The labels for the Databricks deployment.

Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def get_databricks_deployment_labels(self) -> Dict[str, str]:
    """Generate labels for the Databricks deployment from the service configuration.

    These labels are attached to the Databricks deployment resource
    and may be used as label selectors in lookup operations.

    Returns:
        The labels for the Databricks deployment.
    """
    labels = {}
    if self.pipeline_name:
        labels["zenml_pipeline_name"] = self.pipeline_name
    if self.pipeline_step_name:
        labels["zenml_pipeline_step_name"] = self.pipeline_step_name
    if self.model_name:
        labels["zenml_model_name"] = self.model_name
    if self.model_uri:
        labels["zenml_model_uri"] = self.model_uri
    sanitize_labels(labels)
    return labels
DatabricksDeploymentService(config: DatabricksDeploymentConfig, **attrs: Any)

Bases: BaseDeploymentService

Databricks model deployment service.

Attributes:

Name Type Description
SERVICE_TYPE

a service type descriptor with information describing the Databricks deployment service class

config DatabricksDeploymentConfig

service configuration

Initialize the Databricks deployment service.

Parameters:

Name Type Description Default
config DatabricksDeploymentConfig

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
113
114
115
116
117
118
119
120
def __init__(self, config: DatabricksDeploymentConfig, **attrs: Any):
    """Initialize the Databricks deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    super().__init__(config=config, **attrs)
Attributes
databricks_client: DatabricksClient property

Get the deployed Databricks inference endpoint.

Returns:

Type Description
WorkspaceClient

databricks inference endpoint.

databricks_endpoint: ServingEndpointDetailed property

Get the deployed Hugging Face inference endpoint.

Returns:

Type Description
ServingEndpointDetailed

Databricks inference endpoint.

prediction_url: Optional[str] property

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if

Optional[str]

the service is not yet ready.

Functions
check_status() -> Tuple[ServiceState, str]

Check the the current operational state of the Databricks deployment.

Returns:

Type Description
ServiceState

The operational state of the Databricks deployment and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the Databricks deployment.

    Returns:
        The operational state of the Databricks deployment and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    try:
        status = self.databricks_endpoint.state or None
        if (
            status
            and status.ready
            and status.ready == EndpointStateReady.READY
        ):
            return (ServiceState.ACTIVE, "")
        elif (
            status
            and status.config_update
            and status.config_update
            == EndpointStateConfigUpdate.UPDATE_FAILED
        ):
            return (
                ServiceState.ERROR,
                "Databricks Inference Endpoint deployment update failed",
            )
        elif (
            status
            and status.config_update
            and status.config_update
            == EndpointStateConfigUpdate.IN_PROGRESS
        ):
            return (ServiceState.PENDING_STARTUP, "")
        return (ServiceState.PENDING_STARTUP, "")
    except Exception as e:
        return (
            ServiceState.INACTIVE,
            f"Databricks Inference Endpoint deployment is inactive or not found: {e}",
        )
deprovision(force: bool = False) -> None

Deprovision the remote Databricks deployment instance.

Parameters:

Name Type Description Default
force bool

if True, the remote deployment instance will be forcefully deprovisioned.

False
Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def deprovision(self, force: bool = False) -> None:
    """Deprovision the remote Databricks deployment instance.

    Args:
        force: if True, the remote deployment instance will be
            forcefully deprovisioned.
    """
    try:
        self.databricks_client.serving_endpoints.delete(
            name=self._generate_an_endpoint_name()
        )
    except Exception:
        logger.error(
            "Databricks Inference Endpoint is deleted or cannot be found."
        )
get_client_id_and_secret() -> Tuple[str, str, str]

Get the Databricks client id and secret.

Raises:

Type Description
ValueError

If client id and secret are not found.

Returns:

Type Description
Tuple[str, str, str]

Databricks client id and secret.

Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_client_id_and_secret(self) -> Tuple[str, str, str]:
    """Get the Databricks client id and secret.

    Raises:
        ValueError: If client id and secret are not found.

    Returns:
        Databricks client id and secret.
    """
    client = Client()
    client_id = None
    client_secret = None
    host = None
    from zenml.integrations.databricks.model_deployers.databricks_model_deployer import (
        DatabricksModelDeployer,
    )

    model_deployer = client.active_stack.model_deployer
    if not isinstance(model_deployer, DatabricksModelDeployer):
        raise ValueError(
            "DatabricksModelDeployer is not active in the stack."
        )
    host = model_deployer.config.host
    self.config.host = host
    if model_deployer.config.secret_name:
        secret = client.get_secret(model_deployer.config.secret_name)
        client_id = secret.secret_values["client_id"]
        client_secret = secret.secret_values["client_secret"]

    else:
        client_id = model_deployer.config.client_id
        client_secret = model_deployer.config.client_secret
    if not client_id:
        raise ValueError("Client id not found.")
    if not client_secret:
        raise ValueError("Client secret not found.")
    if not host:
        raise ValueError("Host not found.")
    return host, client_id, client_secret
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Yields:

Type Description
str

A generator that can be accessed to get the service logs.

Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Yields:
        A generator that can be accessed to get the service logs.
    """
    logger.info(
        "Databricks Endpoints provides access to the logs of your Endpoints through the UI in the `Logs` tab of your Endpoint"
    )

    def log_generator() -> Generator[str, bool, None]:
        last_log_count = 0
        while True:
            logs = self.databricks_client.serving_endpoints.logs(
                name=self._generate_an_endpoint_name(),
                served_model_name=self.config.model_name,
            )

            log_lines = logs.logs.split("\n")

            # Apply tail if specified and it's the first iteration
            if tail is not None and last_log_count == 0:
                log_lines = log_lines[-tail:]

            # Yield only new lines
            for line in log_lines[last_log_count:]:
                yield line

            last_log_count = len(log_lines)

            if not follow:
                break

            # Add a small delay to avoid excessive API calls
            time.sleep(1)

    yield from log_generator()
predict(request: Union[NDArray[Any], pd.DataFrame]) -> NDArray[Any]

Make a prediction using the service.

Parameters:

Name Type Description Default
request Union[NDArray[Any], DataFrame]

The input data for the prediction.

required

Returns:

Type Description
NDArray[Any]

The prediction result.

Raises:

Type Description
Exception

if the service is not running

ValueError

if the endpoint secret name is not provided.

Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def predict(
    self, request: Union["NDArray[Any]", pd.DataFrame]
) -> "NDArray[Any]":
    """Make a prediction using the service.

    Args:
        request: The input data for the prediction.

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the endpoint secret name is not provided.
    """
    if not self.is_running:
        raise Exception(
            "Databricks endpoint inference service is not running. "
            "Please start the service before making predictions."
        )
    if self.prediction_url is not None:
        if not self.config.endpoint_secret_name:
            raise ValueError(
                "No endpoint secret name is provided for prediction."
            )
        databricks_token = Client().get_secret(
            self.config.endpoint_secret_name
        )
        if not databricks_token.secret_values["token"]:
            raise ValueError("No databricks token found.")
        headers = {
            "Authorization": f"Bearer {databricks_token.secret_values['token']}",
            "Content-Type": "application/json",
        }
        if isinstance(request, pd.DataFrame):
            response = requests.post(  # nosec
                self.prediction_url,
                json={"instances": request.to_dict("records")},
                headers=headers,
            )
        else:
            response = requests.post(  # nosec
                self.prediction_url,
                json={"instances": request.tolist()},
                headers=headers,
            )
    else:
        raise ValueError("No endpoint known for prediction.")
    response.raise_for_status()

    return np.array(response.json()["predictions"])
provision() -> None

Provision or update remote Databricks deployment instance.

Source code in src/zenml/integrations/databricks/services/databricks_deployment.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def provision(self) -> None:
    """Provision or update remote Databricks deployment instance."""
    from databricks.sdk.service.serving import (
        ServedModelInputWorkloadSize,
        ServedModelInputWorkloadType,
    )

    tags = []
    for key, value in self._get_databricks_deployment_labels().items():
        tags.append(EndpointTag(key=key, value=value))
    # Attempt to create and wait for the inference endpoint
    served_model = ServedModelInput(
        model_name=self.config.model_name,
        model_version=self.config.model_version,
        scale_to_zero_enabled=self.config.scale_to_zero_enabled,
        workload_type=ServedModelInputWorkloadType(
            self.config.workload_type
        ),
        workload_size=ServedModelInputWorkloadSize(
            self.config.workload_size
        ),
    )

    databricks_endpoint = (
        self.databricks_client.serving_endpoints.create_and_wait(
            name=self._generate_an_endpoint_name(),
            config=EndpointCoreConfigInput(
                served_models=[served_model],
            ),
            tags=tags,
        )
    )
    # Check if the endpoint URL is available after provisioning
    if databricks_endpoint.endpoint_url:
        logger.info(
            f"Databricks inference endpoint successfully deployed and available. Endpoint URL: {databricks_endpoint.endpoint_url}"
        )
    else:
        logger.error(
            "Failed to start Databricks inference endpoint service: No URL available, please check the Databricks console for more details."
        )
DatabricksServiceStatus

Bases: ServiceStatus

Databricks service status.

Functions

utils

Utilities for Databricks integration.

Modules
databricks_utils

Databricks utilities.

Functions
convert_step_to_task(task_name: str, command: str, arguments: List[str], libraries: Optional[List[str]] = None, depends_on: Optional[List[str]] = None, zenml_project_wheel: Optional[str] = None, job_cluster_key: Optional[str] = None) -> DatabricksTask

Convert a ZenML step to a Databricks task.

Parameters:

Name Type Description Default
task_name str

Name of the task.

required
command str

Command to run.

required
arguments List[str]

Arguments to pass to the command.

required
libraries Optional[List[str]]

List of libraries to install.

None
depends_on Optional[List[str]]

List of tasks to depend on.

None
zenml_project_wheel Optional[str]

Path to the ZenML project wheel.

None
job_cluster_key Optional[str]

ID of the Databricks job_cluster_key.

None

Returns:

Type Description
Task

Databricks task.

Source code in src/zenml/integrations/databricks/utils/databricks_utils.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def convert_step_to_task(
    task_name: str,
    command: str,
    arguments: List[str],
    libraries: Optional[List[str]] = None,
    depends_on: Optional[List[str]] = None,
    zenml_project_wheel: Optional[str] = None,
    job_cluster_key: Optional[str] = None,
) -> DatabricksTask:
    """Convert a ZenML step to a Databricks task.

    Args:
        task_name: Name of the task.
        command: Command to run.
        arguments: Arguments to pass to the command.
        libraries: List of libraries to install.
        depends_on: List of tasks to depend on.
        zenml_project_wheel: Path to the ZenML project wheel.
        job_cluster_key: ID of the Databricks job_cluster_key.

    Returns:
        Databricks task.
    """
    db_libraries = []
    if libraries:
        for library in libraries:
            db_libraries.append(Library(pypi=PythonPyPiLibrary(library)))
    db_libraries.append(Library(whl=zenml_project_wheel))
    db_libraries.append(
        Library(pypi=PythonPyPiLibrary(f"zenml=={__version__}"))
    )
    return DatabricksTask(
        task_key=task_name,
        job_cluster_key=job_cluster_key,
        libraries=db_libraries,
        python_wheel_task=PythonWheelTask(
            package_name="zenml",
            entry_point=command,
            parameters=arguments,
        ),
        depends_on=[TaskDependency(task) for task in depends_on]
        if depends_on
        else None,
    )
sanitize_labels(labels: Dict[str, str]) -> None

Update the label values to be valid Kubernetes labels.

See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

Parameters:

Name Type Description Default
labels Dict[str, str]

the labels to sanitize.

required
Source code in src/zenml/integrations/databricks/utils/databricks_utils.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def sanitize_labels(labels: Dict[str, str]) -> None:
    """Update the label values to be valid Kubernetes labels.

    See:
    https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

    Args:
        labels: the labels to sanitize.
    """
    for key, value in labels.items():
        # Kubernetes labels must be alphanumeric, no longer than
        # 63 characters, and must begin and end with an alphanumeric
        # character ([a-z0-9A-Z])
        labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
            "-_."
        )