Skip to content

Huggingface

zenml.integrations.huggingface

Initialization of the Huggingface integration.

Attributes

HUGGINGFACE = 'huggingface' module-attribute

HUGGINGFACE_MODEL_DEPLOYER_FLAVOR = 'huggingface' module-attribute

HUGGINGFACE_SERVICE_ARTIFACT = 'hf_deployment_service' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

HuggingfaceIntegration

Bases: Integration

Definition of Huggingface integration for ZenML.

Functions
activate() -> None classmethod

Activates the integration.

Source code in src/zenml/integrations/huggingface/__init__.py
33
34
35
36
37
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.huggingface import materializers  # noqa
    from zenml.integrations.huggingface import services
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the Huggingface integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/huggingface/__init__.py
66
67
68
69
70
71
72
73
74
75
76
77
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Huggingface integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.huggingface.flavors import (
        HuggingFaceModelDeployerFlavor,
    )

    return [HuggingFaceModelDeployerFlavor]
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Defines platform specific requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/huggingface/__init__.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@classmethod
def get_requirements(cls, target_os: Optional[str] = None, python_version: Optional[str] = None
) -> List[str]:
    """Defines platform specific requirements for the integration.

    Args:
        target_os: The target operating system.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    requirements = [
        "datasets>=2.16.0",
        "huggingface_hub>0.19.0",
        "accelerate",
        "bitsandbytes>=0.41.3",
        "peft",
        "transformers",
    ]

    # Add the pandas integration requirements
    from zenml.integrations.pandas import PandasIntegration

    return requirements + \
        PandasIntegration.get_requirements(target_os=target_os, python_version=python_version)

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

flavors

Hugging Face integration flavors.

Classes
HuggingFaceBaseConfig

Bases: BaseModel

Hugging Face Inference Endpoint configuration.

HuggingFaceModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig, HuggingFaceBaseConfig

Configuration for the Hugging Face model deployer.

Attributes:

Name Type Description
token Optional[str]

Hugging Face token used for authentication

namespace str

Hugging Face namespace used to list endpoints

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
HuggingFaceModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Hugging Face Endpoint model deployer flavor.

Attributes
config_class: Type[HuggingFaceModelDeployerConfig] property

Returns HuggingFaceModelDeployerConfig config class.

Returns:

Type Description
Type[HuggingFaceModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HuggingFaceModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[HuggingFaceModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Modules
huggingface_model_deployer_flavor

Hugging Face model deployer flavor.

Classes
HuggingFaceBaseConfig

Bases: BaseModel

Hugging Face Inference Endpoint configuration.

HuggingFaceModelDeployerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseModelDeployerConfig, HuggingFaceBaseConfig

Configuration for the Hugging Face model deployer.

Attributes:

Name Type Description
token Optional[str]

Hugging Face token used for authentication

namespace str

Hugging Face namespace used to list endpoints

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
HuggingFaceModelDeployerFlavor

Bases: BaseModelDeployerFlavor

Hugging Face Endpoint model deployer flavor.

Attributes
config_class: Type[HuggingFaceModelDeployerConfig] property

Returns HuggingFaceModelDeployerConfig config class.

Returns:

Type Description
Type[HuggingFaceModelDeployerConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HuggingFaceModelDeployer] property

Implementation class for this flavor.

Returns:

Type Description
Type[HuggingFaceModelDeployer]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

Functions

materializers

Initialization of Huggingface materializers.

Classes
Modules
huggingface_datasets_materializer

Implementation of the Huggingface datasets materializer.

Classes
HFDatasetMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read data to and from huggingface datasets.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
extract_metadata(ds: Union[Dataset, DatasetDict]) -> Dict[str, MetadataType]

Extract metadata from the given Dataset object.

Parameters:

Name Type Description Default
ds Union[Dataset, DatasetDict]

The Dataset object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Raises:

Type Description
ValueError

If the given object is not a Dataset or DatasetDict.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def extract_metadata(
    self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `Dataset` object.

    Args:
        ds: The `Dataset` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.

    Raises:
        ValueError: If the given object is not a `Dataset` or `DatasetDict`.
    """
    pandas_materializer = PandasMaterializer(self.uri)
    if isinstance(ds, Dataset):
        return pandas_materializer.extract_metadata(ds.to_pandas())
    elif isinstance(ds, DatasetDict):
        metadata: Dict[str, Dict[str, "MetadataType"]] = defaultdict(dict)
        for dataset_name, dataset in ds.items():
            dataset_metadata = pandas_materializer.extract_metadata(
                dataset.to_pandas()
            )
            for key, value in dataset_metadata.items():
                metadata[key][dataset_name] = value
        return dict(metadata)
    raise ValueError(f"Unsupported type {type(ds)}")
load(data_type: Union[Type[Dataset], Type[DatasetDict]]) -> Union[Dataset, DatasetDict]

Reads Dataset.

Parameters:

Name Type Description Default
data_type Union[Type[Dataset], Type[DatasetDict]]

The type of the dataset to read.

required

Returns:

Type Description
Union[Dataset, DatasetDict]

The dataset read from the specified dir.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def load(
    self, data_type: Union[Type[Dataset], Type[DatasetDict]]
) -> Union[Dataset, DatasetDict]:
    """Reads Dataset.

    Args:
        data_type: The type of the dataset to read.

    Returns:
        The dataset read from the specified dir.
    """
    with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_DATASET_DIR),
            temp_dir,
        )
        return load_from_disk(temp_dir)
save(ds: Union[Dataset, DatasetDict]) -> None

Writes a Dataset to the specified dir.

Parameters:

Name Type Description Default
ds Union[Dataset, DatasetDict]

The Dataset to write.

required
Source code in src/zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def save(self, ds: Union[Dataset, DatasetDict]) -> None:
    """Writes a Dataset to the specified dir.

    Args:
        ds: The Dataset to write.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        path = os.path.join(temp_dir, DEFAULT_DATASET_DIR)
        ds.save_to_disk(path)
        io_utils.copy_dir(
            path,
            os.path.join(self.uri, DEFAULT_DATASET_DIR),
        )
save_visualizations(ds: Union[Dataset, DatasetDict]) -> Dict[str, VisualizationType]

Save visualizations for the dataset.

Parameters:

Name Type Description Default
ds Union[Dataset, DatasetDict]

The Dataset or DatasetDict to visualize.

required

Returns:

Type Description
Dict[str, VisualizationType]

A dictionary mapping visualization paths to their types.

Raises:

Type Description
ValueError

If the given object is not a Dataset or DatasetDict.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def save_visualizations(
    self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, VisualizationType]:
    """Save visualizations for the dataset.

    Args:
        ds: The Dataset or DatasetDict to visualize.

    Returns:
        A dictionary mapping visualization paths to their types.

    Raises:
        ValueError: If the given object is not a `Dataset` or `DatasetDict`.
    """
    visualizations = {}

    if isinstance(ds, Dataset):
        datasets = {"default": ds}
    elif isinstance(ds, DatasetDict):
        datasets = ds
    else:
        raise ValueError(f"Unsupported type {type(ds)}")

    for name, dataset in datasets.items():
        # Generate a unique identifier for the dataset
        if dataset.info.download_checksums:
            dataset_id = extract_repo_name(
                [x for x in dataset.info.download_checksums.keys()][0]
            )
            if dataset_id:
                # Create the iframe HTML
                html = f"""
                <iframe
                src="https://huggingface.co/datasets/{dataset_id}/embed/viewer"
                frameborder="0"
                width="100%"
                height="560px"
                ></iframe>
                """

                # Save the HTML to a file
                visualization_path = os.path.join(
                    self.uri, f"{name}_viewer.html"
                )
                with fileio.open(visualization_path, "w") as f:
                    f.write(html)

                visualizations[visualization_path] = VisualizationType.HTML

    return visualizations
Functions
extract_repo_name(checksum_str: str) -> Optional[str]

Extracts the repo name from the checksum string.

An example of a checksum_str is: "hf://datasets/nyu-mll/glue@bcdcba79d07bc864c1c254ccfcedcce55bcc9a8c/mrpc/train-00000-of-00001.parquet" and the expected output is "nyu-mll/glue".

Parameters:

Name Type Description Default
checksum_str str

The checksum_str to extract the repo name from.

required

Returns:

Name Type Description
str Optional[str]

The extracted repo name.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def extract_repo_name(checksum_str: str) -> Optional[str]:
    """Extracts the repo name from the checksum string.

    An example of a checksum_str is:
    "hf://datasets/nyu-mll/glue@bcdcba79d07bc864c1c254ccfcedcce55bcc9a8c/mrpc/train-00000-of-00001.parquet"
    and the expected output is "nyu-mll/glue".

    Args:
        checksum_str: The checksum_str to extract the repo name from.

    Returns:
        str: The extracted repo name.
    """
    dataset = None
    try:
        parts = checksum_str.split("/")
        if len(parts) >= 4:
            # Case: nyu-mll/glue
            dataset = f"{parts[3]}/{parts[4].split('@')[0]}"
    except Exception:  # pylint: disable=broad-except
        pass

    return dataset
Modules
huggingface_pt_model_materializer

Implementation of the Huggingface PyTorch model materializer.

Classes
HFPTModelMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read torch model to and from huggingface pretrained model.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
extract_metadata(model: PreTrainedModel) -> Dict[str, MetadataType]

Extract metadata from the given PreTrainedModel object.

Parameters:

Name Type Description Default
model PreTrainedModel

The PreTrainedModel object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def extract_metadata(
    self, model: PreTrainedModel
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `PreTrainedModel` object.

    Args:
        model: The `PreTrainedModel` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    from zenml.integrations.pytorch.utils import count_module_params

    module_param_metadata = count_module_params(model)
    return {
        **module_param_metadata,
        "dtype": DType(str(model.dtype)),
        "device": str(model.device),
    }
load(data_type: Type[PreTrainedModel]) -> PreTrainedModel

Reads HFModel.

Parameters:

Name Type Description Default
data_type Type[PreTrainedModel]

The type of the model to read.

required

Returns:

Type Description
PreTrainedModel

The model read from the specified dir.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def load(self, data_type: Type[PreTrainedModel]) -> PreTrainedModel:
    """Reads HFModel.

    Args:
        data_type: The type of the model to read.

    Returns:
        The model read from the specified dir.
    """
    with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_PT_MODEL_DIR), temp_dir
        )

        config = AutoConfig.from_pretrained(temp_dir)
        architecture = config.architectures[0]
        model_cls = getattr(
            importlib.import_module("transformers"), architecture
        )
        return model_cls.from_pretrained(temp_dir)
save(model: PreTrainedModel) -> None

Writes a Model to the specified dir.

Parameters:

Name Type Description Default
model PreTrainedModel

The Torch Model to write.

required
Source code in src/zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
60
61
62
63
64
65
66
67
68
69
70
71
def save(self, model: PreTrainedModel) -> None:
    """Writes a Model to the specified dir.

    Args:
        model: The Torch Model to write.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        model.save_pretrained(temp_dir)
        io_utils.copy_dir(
            temp_dir,
            os.path.join(self.uri, DEFAULT_PT_MODEL_DIR),
        )
Modules
huggingface_t5_materializer

Implementation of the Huggingface t5 materializer.

Classes
HFT5Materializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Base class for huggingface t5 models.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
load(data_type: Type[Any]) -> Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]

Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.

Parameters:

Name Type Description Default
data_type Type[Any]

A T5ForConditionalGeneration or T5Tokenizer type.

required

Returns:

Type Description
Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]

A T5ForConditionalGeneration or T5Tokenizer object.

Raises:

Type Description
ValueError

Unsupported data type used

Source code in src/zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load(
    self, data_type: Type[Any]
) -> Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]:
    """Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.

    Args:
        data_type: A T5ForConditionalGeneration or T5Tokenizer type.

    Returns:
        A T5ForConditionalGeneration or T5Tokenizer object.

    Raises:
        ValueError: Unsupported data type used
    """
    filepath = self.uri
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        # Copy files from artifact store to temporary directory
        for file in fileio.listdir(filepath):
            src = os.path.join(filepath, file)
            dst = os.path.join(temp_dir, file)
            if fileio.isdir(src):
                fileio.makedirs(dst)
                for subfile in fileio.listdir(src):
                    subsrc = os.path.join(src, subfile)
                    subdst = os.path.join(dst, subfile)
                    fileio.copy(subsrc, subdst)
            else:
                fileio.copy(src, dst)

        # Load the model or tokenizer from the temporary directory
        if data_type in [
            T5ForConditionalGeneration,
            T5Tokenizer,
            T5TokenizerFast,
        ]:
            return data_type.from_pretrained(temp_dir)
        else:
            raise ValueError(f"Unsupported data type: {data_type}")
save(obj: Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]) -> None

Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.

Parameters:

Name Type Description Default
obj Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]

A T5ForConditionalGeneration model or T5Tokenizer.

required
Source code in src/zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def save(
    self,
    obj: Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast],
) -> None:
    """Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.

    Args:
        obj: A T5ForConditionalGeneration model or T5Tokenizer.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        # Save the model or tokenizer
        obj.save_pretrained(temp_dir)

        # Copy the directory to the artifact store
        filepath = self.uri
        fileio.makedirs(filepath)
        for file in os.listdir(temp_dir):
            src = os.path.join(temp_dir, file)
            dst = os.path.join(filepath, file)
            if os.path.isdir(src):
                fileio.makedirs(dst)
                for subfile in os.listdir(src):
                    subsrc = os.path.join(src, subfile)
                    subdst = os.path.join(dst, subfile)
                    fileio.copy(subsrc, subdst)
            else:
                fileio.copy(src, dst)
Modules
huggingface_tf_model_materializer

Implementation of the Huggingface TF model materializer.

Classes
HFTFModelMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read Tensorflow model to and from huggingface pretrained model.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
extract_metadata(model: TFPreTrainedModel) -> Dict[str, MetadataType]

Extract metadata from the given PreTrainedModel object.

Parameters:

Name Type Description Default
model TFPreTrainedModel

The PreTrainedModel object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def extract_metadata(
    self, model: TFPreTrainedModel
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `PreTrainedModel` object.

    Args:
        model: The `PreTrainedModel` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "num_layers": len(model.layers),
        "num_params": model.num_parameters(only_trainable=False),
        "num_trainable_params": model.num_parameters(only_trainable=True),
    }
load(data_type: Type[TFPreTrainedModel]) -> TFPreTrainedModel

Reads HFModel.

Parameters:

Name Type Description Default
data_type Type[TFPreTrainedModel]

The type of the model to read.

required

Returns:

Type Description
TFPreTrainedModel

The model read from the specified dir.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def load(self, data_type: Type[TFPreTrainedModel]) -> TFPreTrainedModel:
    """Reads HFModel.

    Args:
        data_type: The type of the model to read.

    Returns:
        The model read from the specified dir.
    """
    with self.get_temporary_directory(delete_at_exit=False) as temp_dir:
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_TF_MODEL_DIR), temp_dir
        )

        config = AutoConfig.from_pretrained(temp_dir)
        architecture = "TF" + config.architectures[0]
        model_cls = getattr(
            importlib.import_module("transformers"), architecture
        )
        return model_cls.from_pretrained(temp_dir)
save(model: TFPreTrainedModel) -> None

Writes a Model to the specified dir.

Parameters:

Name Type Description Default
model TFPreTrainedModel

The TF Model to write.

required
Source code in src/zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
60
61
62
63
64
65
66
67
68
69
70
71
def save(self, model: TFPreTrainedModel) -> None:
    """Writes a Model to the specified dir.

    Args:
        model: The TF Model to write.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        model.save_pretrained(temp_dir)
        io_utils.copy_dir(
            temp_dir,
            os.path.join(self.uri, DEFAULT_TF_MODEL_DIR),
        )
Modules
huggingface_tokenizer_materializer

Implementation of the Huggingface tokenizer materializer.

Classes
HFTokenizerMaterializer(uri: str, artifact_store: Optional[BaseArtifactStore] = None)

Bases: BaseMaterializer

Materializer to read tokenizer to and from huggingface tokenizer.

Source code in src/zenml/materializers/base_materializer.py
125
126
127
128
129
130
131
132
133
134
135
def __init__(
    self, uri: str, artifact_store: Optional[BaseArtifactStore] = None
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data will be stored.
        artifact_store: The artifact store used to store this artifact.
    """
    self.uri = uri
    self._artifact_store = artifact_store
Functions
load(data_type: Type[Any]) -> PreTrainedTokenizerBase

Reads Tokenizer.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the tokenizer to read.

required

Returns:

Type Description
PreTrainedTokenizerBase

The tokenizer read from the specified dir.

Source code in src/zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def load(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
    """Reads Tokenizer.

    Args:
        data_type: The type of the tokenizer to read.

    Returns:
        The tokenizer read from the specified dir.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_TOKENIZER_DIR), temp_dir
        )
        return AutoTokenizer.from_pretrained(temp_dir)
save(tokenizer: Type[Any]) -> None

Writes a Tokenizer to the specified dir.

Parameters:

Name Type Description Default
tokenizer Type[Any]

The HFTokenizer to write.

required
Source code in src/zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
54
55
56
57
58
59
60
61
62
63
64
65
def save(self, tokenizer: Type[Any]) -> None:
    """Writes a Tokenizer to the specified dir.

    Args:
        tokenizer: The HFTokenizer to write.
    """
    with self.get_temporary_directory(delete_at_exit=True) as temp_dir:
        tokenizer.save_pretrained(temp_dir)
        io_utils.copy_dir(
            temp_dir,
            os.path.join(self.uri, DEFAULT_TOKENIZER_DIR),
        )
Modules

model_deployers

Initialization of the Hugging Face model deployers.

Classes
HuggingFaceModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

Hugging Face endpoint model deployer.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: HuggingFaceModelDeployerConfig property

Config class for the Hugging Face Model deployer settings class.

Returns:

Type Description
HuggingFaceModelDeployerConfig

The configuration.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote artifact

Optional[StackValidator]

store.

Functions
get_model_server_info(service_instance: HuggingFaceDeploymentService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance HuggingFaceDeploymentService

Instance of a HuggingFaceDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "HuggingFaceDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a HuggingFaceDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
    }
perform_delete_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> None

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False
Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    service = cast(HuggingFaceDeploymentService, service)
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Create a new Hugging Face deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

Parameters:

Name Type Description Default
id UUID

the UUID of the model to be deployed with Hugging Face.

required
config ServiceConfig

the configuration of the model to be deployed with Hugging Face.

required
timeout int

the timeout in seconds to wait for the Hugging Face endpoint to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Hugging Face server is provisioned, without waiting for it to fully start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML Hugging Face deployment service object that can be used to

BaseService

interact with the remote Hugging Face inference endpoint server.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Hugging Face deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    Args:
        id: the UUID of the model to be deployed with Hugging Face.
        config: the configuration of the model to be deployed with Hugging Face.
        timeout: the timeout in seconds to wait for the Hugging Face endpoint
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Hugging Face
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Hugging Face deployment service object that can be used to
        interact with the remote Hugging Face inference endpoint server.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(HuggingFaceServiceConfig, config)
        # create a new HuggingFaceDeploymentService instance
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(
            f"Creating a new Hugging Face inference endpoint service: {service}"
        )
        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
        }

    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The started service.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service
Modules
huggingface_model_deployer

Implementation of the Hugging Face Model Deployer.

Classes
HuggingFaceModelDeployer(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseModelDeployer

Hugging Face endpoint model deployer.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: HuggingFaceModelDeployerConfig property

Config class for the Hugging Face Model deployer settings class.

Returns:

Type Description
HuggingFaceModelDeployerConfig

The configuration.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote artifact

Optional[StackValidator]

store.

Functions
get_model_server_info(service_instance: HuggingFaceDeploymentService) -> Dict[str, Optional[str]] staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance HuggingFaceDeploymentService

Instance of a HuggingFaceDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "HuggingFaceDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a HuggingFaceDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
    }
perform_delete_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> None

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False
Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    service = cast(HuggingFaceDeploymentService, service)
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(id: UUID, config: ServiceConfig, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Create a new Hugging Face deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

Parameters:

Name Type Description Default
id UUID

the UUID of the model to be deployed with Hugging Face.

required
config ServiceConfig

the configuration of the model to be deployed with Hugging Face.

required
timeout int

the timeout in seconds to wait for the Hugging Face endpoint to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Hugging Face server is provisioned, without waiting for it to fully start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The ZenML Hugging Face deployment service object that can be used to

BaseService

interact with the remote Hugging Face inference endpoint server.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Hugging Face deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    Args:
        id: the UUID of the model to be deployed with Hugging Face.
        config: the configuration of the model to be deployed with Hugging Face.
        timeout: the timeout in seconds to wait for the Hugging Face endpoint
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Hugging Face
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Hugging Face deployment service object that can be used to
        interact with the remote Hugging Face inference endpoint server.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(HuggingFaceServiceConfig, config)
        # create a new HuggingFaceDeploymentService instance
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(
            f"Creating a new Hugging Face inference endpoint service: {service}"
        )
        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
        }

    return service
perform_start_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> BaseService

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
BaseService

The started service.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(service: BaseService, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT, force: bool = False) -> BaseService

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in src/zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service
Functions

services

Initialization of the Hugging Face Service.

Classes
Modules
huggingface_deployment

Implementation of the Hugging Face Deployment service.

Classes
HuggingFaceDeploymentService(config: HuggingFaceServiceConfig, **attrs: Any)

Bases: BaseDeploymentService

Hugging Face model deployment service.

Attributes:

Name Type Description
SERVICE_TYPE

a service type descriptor with information describing the Hugging Face deployment service class

config HuggingFaceServiceConfig

service configuration

Initialize the Hugging Face deployment service.

Parameters:

Name Type Description Default
config HuggingFaceServiceConfig

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
74
75
76
77
78
79
80
81
def __init__(self, config: HuggingFaceServiceConfig, **attrs: Any):
    """Initialize the Hugging Face deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    super().__init__(config=config, **attrs)
Attributes
hf_endpoint: InferenceEndpoint property

Get the deployed Hugging Face inference endpoint.

Returns:

Type Description
InferenceEndpoint

Huggingface inference endpoint.

inference_client: InferenceClient property

Get the Hugging Face InferenceClient from Inference Endpoint.

Returns:

Type Description
InferenceClient

Hugging Face inference client.

prediction_url: Optional[str] property

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if

Optional[str]

the service is not yet ready.

Functions
check_status() -> Tuple[ServiceState, str]

Check the current operational state of the Hugging Face deployment.

Returns:

Type Description
ServiceState

The operational state of the Hugging Face deployment and a message

str

providing additional information about that state (e.g. a

Tuple[ServiceState, str]

description of the error, if one is encountered).

Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the current operational state of the Hugging Face deployment.

    Returns:
        The operational state of the Hugging Face deployment and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    try:
        status = self.hf_endpoint.status
        if status == InferenceEndpointStatus.RUNNING:
            return ServiceState.ACTIVE, ""

        elif status == InferenceEndpointStatus.SCALED_TO_ZERO:
            return (
                ServiceState.SCALED_TO_ZERO,
                "Hugging Face Inference Endpoint is scaled to zero, but "
                "still running. It will be started on demand.",
            )

        elif status == InferenceEndpointStatus.FAILED:
            return (
                ServiceState.ERROR,
                "Hugging Face Inference Endpoint deployment is inactive "
                "or not found",
            )
        elif status == InferenceEndpointStatus.PENDING:
            return ServiceState.PENDING_STARTUP, ""
        return ServiceState.PENDING_STARTUP, ""
    except (InferenceEndpointError, HfHubHTTPError):
        return (
            ServiceState.INACTIVE,
            "Hugging Face Inference Endpoint deployment is inactive or "
            "not found",
        )
deprovision(force: bool = False) -> None

Deprovision the remote Hugging Face deployment instance.

Parameters:

Name Type Description Default
force bool

if True, the remote deployment instance will be forcefully deprovisioned.

False
Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
268
269
270
271
272
273
274
275
276
277
278
279
280
def deprovision(self, force: bool = False) -> None:
    """Deprovision the remote Hugging Face deployment instance.

    Args:
        force: if True, the remote deployment instance will be
            forcefully deprovisioned.
    """
    try:
        self.hf_endpoint.delete()
    except HfHubHTTPError:
        logger.error(
            "Hugging Face Inference Endpoint is deleted or cannot be found."
        )
get_logs(follow: bool = False, tail: Optional[int] = None) -> Generator[str, bool, None]

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
None

A generator that can be accessed to get the service logs.

Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    logger.info(
        "Hugging Face Endpoints provides access to the logs of "
        "your Endpoints through the UI in the “Logs” tab of your Endpoint"
    )
    return  # type: ignore
get_token() -> str

Get the Hugging Face token.

Raises:

Type Description
ValueError

If token not found.

Returns:

Type Description
str

Hugging Face token.

Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def get_token(self) -> str:
    """Get the Hugging Face token.

    Raises:
        ValueError: If token not found.

    Returns:
        Hugging Face token.
    """
    client = Client()
    token = None
    if self.config.secret_name:
        secret = client.get_secret(self.config.secret_name)
        token = secret.secret_values["token"]
    else:
        from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
            HuggingFaceModelDeployer,
        )

        model_deployer = client.active_stack.model_deployer
        if not isinstance(model_deployer, HuggingFaceModelDeployer):
            raise ValueError(
                "HuggingFaceModelDeployer is not active in the stack."
            )
        token = model_deployer.config.token or None
    if not token:
        raise ValueError("Token not found.")
    return token
predict(data: Any, max_new_tokens: int) -> Any

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

input data

required
max_new_tokens int

Number of new tokens to generate

required

Returns:

Type Description
Any

The prediction result.

Raises:

Type Description
Exception

if the service is not running

NotImplementedError

if task is not supported.

Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def predict(self, data: "Any", max_new_tokens: int) -> "Any":
    """Make a prediction using the service.

    Args:
        data: input data
        max_new_tokens: Number of new tokens to generate

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        NotImplementedError: if task is not supported.
    """
    if not self.is_running:
        raise Exception(
            "Hugging Face endpoint inference service is not running. "
            "Please start the service before making predictions."
        )
    if self.prediction_url is not None:
        if self.hf_endpoint.task == "text-generation":
            return self.inference_client.text_generation(
                data, max_new_tokens=max_new_tokens
            )
    # TODO: Add support for all different supported tasks
    raise NotImplementedError(
        "Tasks other than text-generation is not implemented."
    )
provision() -> None

Provision or update remote Hugging Face deployment instance.

Raises:

Type Description
Exception

If any unexpected error while creating inference endpoint.

Source code in src/zenml/integrations/huggingface/services/huggingface_deployment.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def provision(self) -> None:
    """Provision or update remote Hugging Face deployment instance.

    Raises:
        Exception: If any unexpected error while creating inference
            endpoint.
    """
    try:
        validated_config = self._validate_endpoint_configuration()

        hf_endpoint = create_inference_endpoint(
            name=self._generate_an_endpoint_name(),
            repository=validated_config["repository"],
            framework=validated_config["framework"],
            accelerator=validated_config["accelerator"],
            instance_size=validated_config["instance_size"],
            instance_type=validated_config["instance_type"],
            region=validated_config["region"],
            vendor=validated_config["vendor"],
            account_id=self.config.account_id,
            min_replica=self.config.min_replica,
            max_replica=self.config.max_replica,
            revision=self.config.revision,
            task=self.config.task,
            custom_image=self.config.custom_image,
            type=InferenceEndpointType(validated_config["endpoint_type"]),
            token=self.get_token(),
            namespace=self.config.namespace,
        ).wait(timeout=POLLING_TIMEOUT)

    except Exception as e:
        self.status.update_state(
            new_state=ServiceState.ERROR, error=str(e)
        )
        # Catch-all for any other unexpected errors
        raise Exception(
            "An unexpected error occurred while provisioning the "
            f"Hugging Face inference endpoint: {e}"
        )

    # Check if the endpoint URL is available after provisioning
    if hf_endpoint.url:
        logger.info(
            "Hugging Face inference endpoint successfully deployed "
            f"and available. Endpoint URL: {hf_endpoint.url}"
        )
    else:
        logger.error(
            "Failed to start Hugging Face inference endpoint "
            "service: No URL available, please check the Hugging "
            "Face console for more details."
        )
HuggingFaceServiceConfig(**data: Any)

Bases: HuggingFaceBaseConfig, ServiceConfig

Hugging Face service configurations.

Source code in src/zenml/services/service.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init__(self, **data: Any):
    """Initialize the service configuration.

    Args:
        **data: keyword arguments.

    Raises:
        ValueError: if neither 'name' nor 'model_name' is set.
    """
    super().__init__(**data)
    if self.name or self.model_name:
        self.service_name = data.get(
            "service_name",
            f"{ZENM_ENDPOINT_PREFIX}{self.name or self.model_name}",
        )
    else:
        raise ValueError("Either 'name' or 'model_name' must be set.")
HuggingFaceServiceStatus

Bases: ServiceStatus

Hugging Face service status.

Functions

steps

Initialization for Hugging Face model deployer step.

Functions
Modules
accelerate_runner

Step function to run any ZenML step using Accelerate.

Classes Functions
run_with_accelerate(step_function_top_level: Optional[BaseStep] = None, **accelerate_launch_kwargs: Any) -> Union[Callable[[BaseStep], BaseStep], BaseStep]

Run a function with accelerate.

Accelerate package: https://huggingface.co/docs/accelerate/en/index Example: ```python from zenml import step, pipeline from zenml.integrations.hugginface.steps import run_with_accelerate

@run_with_accelerate(num_processes=4, multi_gpu=True)
@step
def training_step(some_param: int, ...):
    # your training code is below
    ...

@pipeline
def training_pipeline(some_param: int, ...):
    training_step(some_param, ...)
```

Parameters:

Name Type Description Default
step_function_top_level Optional[BaseStep]

The step function to run with accelerate [optional]. Used in functional calls like run_with_accelerate(some_func,foo=bar)().

None
accelerate_launch_kwargs Any

A dictionary of arguments to pass along to the accelerate launch command, including hardware selection, resource allocation, and training paradigm options. Visit https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch for more details.

{}

Returns:

Type Description
Union[Callable[[BaseStep], BaseStep], BaseStep]

The accelerate-enabled version of the step.

Source code in src/zenml/integrations/huggingface/steps/accelerate_runner.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def run_with_accelerate(
    step_function_top_level: Optional[BaseStep] = None,
    **accelerate_launch_kwargs: Any,
) -> Union[Callable[[BaseStep], BaseStep], BaseStep]:
    """Run a function with accelerate.

    Accelerate package: https://huggingface.co/docs/accelerate/en/index
    Example:
        ```python
        from zenml import step, pipeline
        from zenml.integrations.hugginface.steps import run_with_accelerate

        @run_with_accelerate(num_processes=4, multi_gpu=True)
        @step
        def training_step(some_param: int, ...):
            # your training code is below
            ...

        @pipeline
        def training_pipeline(some_param: int, ...):
            training_step(some_param, ...)
        ```

    Args:
        step_function_top_level: The step function to run with accelerate [optional].
            Used in functional calls like `run_with_accelerate(some_func,foo=bar)()`.
        accelerate_launch_kwargs: A dictionary of arguments to pass along to the
            `accelerate launch` command, including hardware selection, resource
            allocation, and training paradigm options. Visit
            https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch
            for more details.

    Returns:
        The accelerate-enabled version of the step.
    """

    def _decorator(step_function: BaseStep) -> BaseStep:
        def _wrapper(
            entrypoint: F, accelerate_launch_kwargs: Dict[str, Any]
        ) -> F:
            @functools.wraps(entrypoint)
            def inner(*args: Any, **kwargs: Any) -> Any:
                if args:
                    raise ValueError(
                        "Accelerated steps do not support positional arguments."
                    )

                with create_cli_wrapped_script(
                    entrypoint, flavor="accelerate"
                ) as (
                    script_path,
                    output_path,
                ):
                    commands = [str(script_path.absolute())]
                    for k, v in kwargs.items():
                        k = _cli_arg_name(k)
                        if isinstance(v, bool):
                            if v:
                                commands.append(f"--{k}")
                        elif type(v) in (list, tuple, set):
                            for each in v:
                                commands += [f"--{k}", f"{each}"]
                        else:
                            commands += [f"--{k}", f"{v}"]
                    logger.debug(commands)

                    parser = launch_command_parser()
                    args = parser.parse_args(commands)
                    for k, v in accelerate_launch_kwargs.items():
                        if k in args:
                            setattr(args, k, v)
                        else:
                            logger.warning(
                                f"You passed in `{k}` as an `accelerate launch` argument, but it was not accepted. "
                                "Please check https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch "
                                "to find out more about supported arguments and retry."
                            )
                    try:
                        launch_command(args)
                    except Exception as e:
                        logger.error(
                            "Accelerate training job failed... See error message for details."
                        )
                        raise RuntimeError(
                            "Accelerate training job failed."
                        ) from e
                    else:
                        logger.info(
                            "Accelerate training job finished successfully."
                        )
                        return pickle.load(open(output_path, "rb"))

            return cast(F, inner)

        try:
            get_pipeline_context()
        except RuntimeError:
            pass
        else:
            raise RuntimeError(
                f"`{run_with_accelerate.__name__}` decorator cannot be used "
                "in a functional way with steps, please apply decoration "
                "directly to a step instead. This behavior will be also "
                "allowed in future, but now it faces technical limitations.\n"
                "Example (allowed):\n"
                f"@{run_with_accelerate.__name__}(...)\n"
                f"def {step_function.name}(...):\n"
                "    ...\n"
                "Example (not allowed):\n"
                "def my_pipeline(...):\n"
                f"    run_with_accelerate({step_function.name},...)(...)\n"
            )

        setattr(
            step_function, "unwrapped_entrypoint", step_function.entrypoint
        )
        setattr(
            step_function,
            "entrypoint",
            _wrapper(
                step_function.entrypoint,
                accelerate_launch_kwargs=accelerate_launch_kwargs,
            ),
        )

        return step_function

    if step_function_top_level:
        return _decorator(step_function_top_level)
    return _decorator
huggingface_deployer

Implementation of the Hugging Face Deployer step.

Classes Functions
huggingface_model_deployer_step(service_config: HuggingFaceServiceConfig, deploy_decision: bool = True, timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT) -> HuggingFaceDeploymentService

Hugging Face model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment with Hugging Face Inference Endpoint.

Parameters:

Name Type Description Default
service_config HuggingFaceServiceConfig

Hugging Face deployment service configuration.

required
deploy_decision bool

whether to deploy the model or not

True
timeout int

the timeout in seconds to wait for the deployment to start

DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT

Returns:

Type Description
HuggingFaceDeploymentService

Huggingface deployment service

Source code in src/zenml/integrations/huggingface/steps/huggingface_deployer.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@step(enable_cache=False)
def huggingface_model_deployer_step(
    service_config: HuggingFaceServiceConfig,
    deploy_decision: bool = True,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> HuggingFaceDeploymentService:
    """Hugging Face model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment with Hugging Face Inference Endpoint.

    Args:
        service_config: Hugging Face deployment service configuration.
        deploy_decision: whether to deploy the model or not
        timeout: the timeout in seconds to wait for the deployment to start

    Returns:
        Huggingface deployment service
    """
    model_deployer = cast(
        HuggingFaceModelDeployer,
        HuggingFaceModelDeployer.get_active_model_deployer(),
    )

    # get pipeline name, step name and run id
    context = get_step_context()
    pipeline_name = context.pipeline.name
    step_name = context.step_run.name

    # update the step configuration with the real pipeline runtime information
    service_config = service_config.model_copy()
    service_config.pipeline_name = pipeline_name
    service_config.pipeline_step_name = step_name

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        config=service_config.model_dump()
    )

    # even when the deploy decision is negative, if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{service_config.model_name}'..."
        )
        service = cast(HuggingFaceDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=timeout)
        return service

    # invoke the Hugging Face model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    service = cast(
        HuggingFaceDeploymentService,
        model_deployer.deploy_model(
            service_config,
            replace=True,
            timeout=timeout,
            service_type=HuggingFaceDeploymentService.SERVICE_TYPE,
        ),
    )

    logger.info(
        f"Hugging Face deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service