Skip to content

Huggingface

zenml.integrations.huggingface special

Initialization of the Huggingface integration.

HuggingfaceIntegration (Integration)

Definition of Huggingface integration for ZenML.

Source code in zenml/integrations/huggingface/__init__.py
class HuggingfaceIntegration(Integration):
    """Definition of Huggingface integration for ZenML."""

    NAME = HUGGINGFACE

    REQUIREMENTS_IGNORED_ON_UNINSTALL = ["fsspec", "pandas"]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.huggingface import materializers  # noqa
        from zenml.integrations.huggingface import services

    @classmethod
    def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
        """Defines platform specific requirements for the integration.

        Args:
            target_os: The target operating system.

        Returns:
            A list of requirements.
        """
        requirements = [
            "datasets",
            "huggingface_hub>0.19.0",
            "accelerate",
            "bitsandbytes>=0.41.3",
            "peft",
            # temporary fix for CI issue similar to:
            # - https://github.com/huggingface/datasets/issues/6737
            # - https://github.com/huggingface/datasets/issues/6697
            # TODO try relaxing it back going forward
            "fsspec<=2023.12.0",
        ]

        # In python 3.8 higher transformers version lead to other packages breaking
        if sys.version_info.minor > 8:
            requirements += ["transformers"]
        else:
            requirements += ["transformers<=4.31"]

        # Add the pandas integration requirements
        from zenml.integrations.pandas import PandasIntegration

        return requirements + \
            PandasIntegration.get_requirements(target_os=target_os)

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Huggingface integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.huggingface.flavors import (
            HuggingFaceModelDeployerFlavor,
        )

        return [HuggingFaceModelDeployerFlavor]

activate() classmethod

Activates the integration.

Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.huggingface import materializers  # noqa
    from zenml.integrations.huggingface import services

flavors() classmethod

Declare the stack component flavors for the Huggingface integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Huggingface integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.huggingface.flavors import (
        HuggingFaceModelDeployerFlavor,
    )

    return [HuggingFaceModelDeployerFlavor]

get_requirements(target_os=None) classmethod

Defines platform specific requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
    """Defines platform specific requirements for the integration.

    Args:
        target_os: The target operating system.

    Returns:
        A list of requirements.
    """
    requirements = [
        "datasets",
        "huggingface_hub>0.19.0",
        "accelerate",
        "bitsandbytes>=0.41.3",
        "peft",
        # temporary fix for CI issue similar to:
        # - https://github.com/huggingface/datasets/issues/6737
        # - https://github.com/huggingface/datasets/issues/6697
        # TODO try relaxing it back going forward
        "fsspec<=2023.12.0",
    ]

    # In python 3.8 higher transformers version lead to other packages breaking
    if sys.version_info.minor > 8:
        requirements += ["transformers"]
    else:
        requirements += ["transformers<=4.31"]

    # Add the pandas integration requirements
    from zenml.integrations.pandas import PandasIntegration

    return requirements + \
        PandasIntegration.get_requirements(target_os=target_os)

flavors special

Hugging Face integration flavors.

huggingface_model_deployer_flavor

Hugging Face model deployer flavor.

HuggingFaceBaseConfig (BaseModel)

Hugging Face Inference Endpoint configuration.

Source code in zenml/integrations/huggingface/flavors/huggingface_model_deployer_flavor.py
class HuggingFaceBaseConfig(BaseModel):
    """Hugging Face Inference Endpoint configuration."""

    repository: Optional[str] = None
    framework: Optional[str] = None
    accelerator: Optional[str] = None
    instance_size: Optional[str] = None
    instance_type: Optional[str] = None
    region: Optional[str] = None
    vendor: Optional[str] = None
    account_id: Optional[str] = None
    min_replica: int = 0
    max_replica: int = 1
    revision: Optional[str] = None
    task: Optional[str] = None
    custom_image: Optional[Dict[str, Any]] = None
    endpoint_type: str = "public"
    secret_name: Optional[str] = None
    namespace: Optional[str] = None
HuggingFaceModelDeployerConfig (BaseModelDeployerConfig, HuggingFaceBaseConfig)

Configuration for the Hugging Face model deployer.

Attributes:

Name Type Description
token Optional[str]

Hugging Face token used for authentication

namespace str

Hugging Face namespace used to list endpoints

Source code in zenml/integrations/huggingface/flavors/huggingface_model_deployer_flavor.py
class HuggingFaceModelDeployerConfig(
    BaseModelDeployerConfig, HuggingFaceBaseConfig
):
    """Configuration for the Hugging Face model deployer.

    Attributes:
        token: Hugging Face token used for authentication
        namespace: Hugging Face namespace used to list endpoints
    """

    token: Optional[str] = SecretField(default=None)

    # The namespace to list endpoints for. Set to `"*"` to list all endpoints
    # from all namespaces (i.e. personal namespace and all orgs the user belongs to).
    namespace: str
HuggingFaceModelDeployerFlavor (BaseModelDeployerFlavor)

Hugging Face Endpoint model deployer flavor.

Source code in zenml/integrations/huggingface/flavors/huggingface_model_deployer_flavor.py
class HuggingFaceModelDeployerFlavor(BaseModelDeployerFlavor):
    """Hugging Face Endpoint model deployer flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return HUGGINGFACE_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_registry/huggingface.png"

    @property
    def config_class(self) -> Type[HuggingFaceModelDeployerConfig]:
        """Returns `HuggingFaceModelDeployerConfig` config class.

        Returns:
            The config class.
        """
        return HuggingFaceModelDeployerConfig

    @property
    def implementation_class(self) -> Type["HuggingFaceModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
            HuggingFaceModelDeployer,
        )

        return HuggingFaceModelDeployer
config_class: Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig] property readonly

Returns HuggingFaceModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HuggingFaceModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[HuggingFaceModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

materializers special

Initialization of Huggingface materializers.

huggingface_datasets_materializer

Implementation of the Huggingface datasets materializer.

HFDatasetMaterializer (BaseMaterializer)

Materializer to read data to and from huggingface datasets.

Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
class HFDatasetMaterializer(BaseMaterializer):
    """Materializer to read data to and from huggingface datasets."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (Dataset, DatasetDict)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = (
        ArtifactType.DATA_ANALYSIS
    )

    def load(
        self, data_type: Union[Type[Dataset], Type[DatasetDict]]
    ) -> Union[Dataset, DatasetDict]:
        """Reads Dataset.

        Args:
            data_type: The type of the dataset to read.

        Returns:
            The dataset read from the specified dir.
        """
        temp_dir = mkdtemp()
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_DATASET_DIR),
            temp_dir,
        )
        return load_from_disk(temp_dir)

    def save(self, ds: Union[Dataset, DatasetDict]) -> None:
        """Writes a Dataset to the specified dir.

        Args:
            ds: The Dataset to write.
        """
        temp_dir = TemporaryDirectory()
        path = os.path.join(temp_dir.name, DEFAULT_DATASET_DIR)
        try:
            ds.save_to_disk(path)
            io_utils.copy_dir(
                path,
                os.path.join(self.uri, DEFAULT_DATASET_DIR),
            )
        finally:
            fileio.rmtree(temp_dir.name)

    def extract_metadata(
        self, ds: Union[Dataset, DatasetDict]
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `Dataset` object.

        Args:
            ds: The `Dataset` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.

        Raises:
            ValueError: If the given object is not a `Dataset` or `DatasetDict`.
        """
        pandas_materializer = PandasMaterializer(self.uri)
        if isinstance(ds, Dataset):
            return pandas_materializer.extract_metadata(ds.to_pandas())
        elif isinstance(ds, DatasetDict):
            metadata: Dict[str, Dict[str, "MetadataType"]] = defaultdict(dict)
            for dataset_name, dataset in ds.items():
                dataset_metadata = pandas_materializer.extract_metadata(
                    dataset.to_pandas()
                )
                for key, value in dataset_metadata.items():
                    metadata[key][dataset_name] = value
            return dict(metadata)
        raise ValueError(f"Unsupported type {type(ds)}")

    def save_visualizations(
        self, ds: Union[Dataset, DatasetDict]
    ) -> Dict[str, VisualizationType]:
        """Save visualizations for the dataset.

        Args:
            ds: The Dataset or DatasetDict to visualize.

        Returns:
            A dictionary mapping visualization paths to their types.

        Raises:
            ValueError: If the given object is not a `Dataset` or `DatasetDict`.
        """
        visualizations = {}

        if isinstance(ds, Dataset):
            datasets = {"default": ds}
        elif isinstance(ds, DatasetDict):
            datasets = ds
        else:
            raise ValueError(f"Unsupported type {type(ds)}")

        for name, dataset in datasets.items():
            # Generate a unique identifier for the dataset
            if dataset.info.download_checksums:
                dataset_id = extract_repo_name(
                    [x for x in dataset.info.download_checksums.keys()][0]
                )
                if dataset_id:
                    # Create the iframe HTML
                    html = f"""
                    <iframe
                    src="https://huggingface.co/datasets/{dataset_id}/embed/viewer"
                    frameborder="0"
                    width="100%"
                    height="560px"
                    ></iframe>
                    """

                    # Save the HTML to a file
                    visualization_path = os.path.join(
                        self.uri, f"{name}_viewer.html"
                    )
                    with fileio.open(visualization_path, "w") as f:
                        f.write(html)

                    visualizations[visualization_path] = VisualizationType.HTML

        return visualizations
extract_metadata(self, ds)

Extract metadata from the given Dataset object.

Parameters:

Name Type Description Default
ds Union[datasets.Dataset, datasets.dataset_dict.DatasetDict]

The Dataset object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Exceptions:

Type Description
ValueError

If the given object is not a Dataset or DatasetDict.

Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def extract_metadata(
    self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `Dataset` object.

    Args:
        ds: The `Dataset` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.

    Raises:
        ValueError: If the given object is not a `Dataset` or `DatasetDict`.
    """
    pandas_materializer = PandasMaterializer(self.uri)
    if isinstance(ds, Dataset):
        return pandas_materializer.extract_metadata(ds.to_pandas())
    elif isinstance(ds, DatasetDict):
        metadata: Dict[str, Dict[str, "MetadataType"]] = defaultdict(dict)
        for dataset_name, dataset in ds.items():
            dataset_metadata = pandas_materializer.extract_metadata(
                dataset.to_pandas()
            )
            for key, value in dataset_metadata.items():
                metadata[key][dataset_name] = value
        return dict(metadata)
    raise ValueError(f"Unsupported type {type(ds)}")
load(self, data_type)

Reads Dataset.

Parameters:

Name Type Description Default
data_type Union[Type[datasets.Dataset], Type[datasets.dataset_dict.DatasetDict]]

The type of the dataset to read.

required

Returns:

Type Description
Union[datasets.Dataset, datasets.dataset_dict.DatasetDict]

The dataset read from the specified dir.

Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def load(
    self, data_type: Union[Type[Dataset], Type[DatasetDict]]
) -> Union[Dataset, DatasetDict]:
    """Reads Dataset.

    Args:
        data_type: The type of the dataset to read.

    Returns:
        The dataset read from the specified dir.
    """
    temp_dir = mkdtemp()
    io_utils.copy_dir(
        os.path.join(self.uri, DEFAULT_DATASET_DIR),
        temp_dir,
    )
    return load_from_disk(temp_dir)
save(self, ds)

Writes a Dataset to the specified dir.

Parameters:

Name Type Description Default
ds Union[datasets.Dataset, datasets.dataset_dict.DatasetDict]

The Dataset to write.

required
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def save(self, ds: Union[Dataset, DatasetDict]) -> None:
    """Writes a Dataset to the specified dir.

    Args:
        ds: The Dataset to write.
    """
    temp_dir = TemporaryDirectory()
    path = os.path.join(temp_dir.name, DEFAULT_DATASET_DIR)
    try:
        ds.save_to_disk(path)
        io_utils.copy_dir(
            path,
            os.path.join(self.uri, DEFAULT_DATASET_DIR),
        )
    finally:
        fileio.rmtree(temp_dir.name)
save_visualizations(self, ds)

Save visualizations for the dataset.

Parameters:

Name Type Description Default
ds Union[datasets.Dataset, datasets.dataset_dict.DatasetDict]

The Dataset or DatasetDict to visualize.

required

Returns:

Type Description
Dict[str, zenml.enums.VisualizationType]

A dictionary mapping visualization paths to their types.

Exceptions:

Type Description
ValueError

If the given object is not a Dataset or DatasetDict.

Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def save_visualizations(
    self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, VisualizationType]:
    """Save visualizations for the dataset.

    Args:
        ds: The Dataset or DatasetDict to visualize.

    Returns:
        A dictionary mapping visualization paths to their types.

    Raises:
        ValueError: If the given object is not a `Dataset` or `DatasetDict`.
    """
    visualizations = {}

    if isinstance(ds, Dataset):
        datasets = {"default": ds}
    elif isinstance(ds, DatasetDict):
        datasets = ds
    else:
        raise ValueError(f"Unsupported type {type(ds)}")

    for name, dataset in datasets.items():
        # Generate a unique identifier for the dataset
        if dataset.info.download_checksums:
            dataset_id = extract_repo_name(
                [x for x in dataset.info.download_checksums.keys()][0]
            )
            if dataset_id:
                # Create the iframe HTML
                html = f"""
                <iframe
                src="https://huggingface.co/datasets/{dataset_id}/embed/viewer"
                frameborder="0"
                width="100%"
                height="560px"
                ></iframe>
                """

                # Save the HTML to a file
                visualization_path = os.path.join(
                    self.uri, f"{name}_viewer.html"
                )
                with fileio.open(visualization_path, "w") as f:
                    f.write(html)

                visualizations[visualization_path] = VisualizationType.HTML

    return visualizations
extract_repo_name(checksum_str)

Extracts the repo name from the checksum string.

An example of a checksum_str is: "hf://datasets/nyu-mll/glue@bcdcba79d07bc864c1c254ccfcedcce55bcc9a8c/mrpc/train-00000-of-00001.parquet" and the expected output is "nyu-mll/glue".

Parameters:

Name Type Description Default
checksum_str str

The checksum_str to extract the repo name from.

required

Returns:

Type Description
str

The extracted repo name.

Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def extract_repo_name(checksum_str: str) -> Optional[str]:
    """Extracts the repo name from the checksum string.

    An example of a checksum_str is:
    "hf://datasets/nyu-mll/glue@bcdcba79d07bc864c1c254ccfcedcce55bcc9a8c/mrpc/train-00000-of-00001.parquet"
    and the expected output is "nyu-mll/glue".

    Args:
        checksum_str: The checksum_str to extract the repo name from.

    Returns:
        str: The extracted repo name.
    """
    dataset = None
    try:
        parts = checksum_str.split("/")
        if len(parts) >= 4:
            # Case: nyu-mll/glue
            dataset = f"{parts[3]}/{parts[4].split('@')[0]}"
    except Exception:  # pylint: disable=broad-except
        pass

    return dataset

huggingface_pt_model_materializer

Implementation of the Huggingface PyTorch model materializer.

HFPTModelMaterializer (BaseMaterializer)

Materializer to read torch model to and from huggingface pretrained model.

Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
class HFPTModelMaterializer(BaseMaterializer):
    """Materializer to read torch model to and from huggingface pretrained model."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (PreTrainedModel,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL

    def load(self, data_type: Type[PreTrainedModel]) -> PreTrainedModel:
        """Reads HFModel.

        Args:
            data_type: The type of the model to read.

        Returns:
            The model read from the specified dir.
        """
        temp_dir = TemporaryDirectory()
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_PT_MODEL_DIR), temp_dir.name
        )

        config = AutoConfig.from_pretrained(temp_dir.name)
        architecture = config.architectures[0]
        model_cls = getattr(
            importlib.import_module("transformers"), architecture
        )
        return model_cls.from_pretrained(temp_dir.name)

    def save(self, model: PreTrainedModel) -> None:
        """Writes a Model to the specified dir.

        Args:
            model: The Torch Model to write.
        """
        temp_dir = TemporaryDirectory()
        model.save_pretrained(temp_dir.name)
        io_utils.copy_dir(
            temp_dir.name,
            os.path.join(self.uri, DEFAULT_PT_MODEL_DIR),
        )

    def extract_metadata(
        self, model: PreTrainedModel
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `PreTrainedModel` object.

        Args:
            model: The `PreTrainedModel` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        from zenml.integrations.pytorch.utils import count_module_params

        module_param_metadata = count_module_params(model)
        return {
            **module_param_metadata,
            "dtype": DType(str(model.dtype)),
            "device": str(model.device),
        }
extract_metadata(self, model)

Extract metadata from the given PreTrainedModel object.

Parameters:

Name Type Description Default
model transformers.PreTrainedModel

The PreTrainedModel object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def extract_metadata(
    self, model: PreTrainedModel
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `PreTrainedModel` object.

    Args:
        model: The `PreTrainedModel` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    from zenml.integrations.pytorch.utils import count_module_params

    module_param_metadata = count_module_params(model)
    return {
        **module_param_metadata,
        "dtype": DType(str(model.dtype)),
        "device": str(model.device),
    }
load(self, data_type)

Reads HFModel.

Parameters:

Name Type Description Default
data_type Type[transformers.PreTrainedModel]

The type of the model to read.

required

Returns:

Type Description
transformers.PreTrainedModel

The model read from the specified dir.

Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def load(self, data_type: Type[PreTrainedModel]) -> PreTrainedModel:
    """Reads HFModel.

    Args:
        data_type: The type of the model to read.

    Returns:
        The model read from the specified dir.
    """
    temp_dir = TemporaryDirectory()
    io_utils.copy_dir(
        os.path.join(self.uri, DEFAULT_PT_MODEL_DIR), temp_dir.name
    )

    config = AutoConfig.from_pretrained(temp_dir.name)
    architecture = config.architectures[0]
    model_cls = getattr(
        importlib.import_module("transformers"), architecture
    )
    return model_cls.from_pretrained(temp_dir.name)
save(self, model)

Writes a Model to the specified dir.

Parameters:

Name Type Description Default
model transformers.PreTrainedModel

The Torch Model to write.

required
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def save(self, model: PreTrainedModel) -> None:
    """Writes a Model to the specified dir.

    Args:
        model: The Torch Model to write.
    """
    temp_dir = TemporaryDirectory()
    model.save_pretrained(temp_dir.name)
    io_utils.copy_dir(
        temp_dir.name,
        os.path.join(self.uri, DEFAULT_PT_MODEL_DIR),
    )

huggingface_t5_materializer

Implementation of the Huggingface t5 materializer.

HFT5Materializer (BaseMaterializer)

Base class for huggingface t5 models.

Source code in zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
class HFT5Materializer(BaseMaterializer):
    """Base class for huggingface t5 models."""

    SKIP_REGISTRATION: ClassVar[bool] = False
    ASSOCIATED_TYPES = (
        T5ForConditionalGeneration,
        T5Tokenizer,
        T5TokenizerFast,
    )

    def load(
        self, data_type: Type[Any]
    ) -> Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]:
        """Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.

        Args:
            data_type: A T5ForConditionalGeneration or T5Tokenizer type.

        Returns:
            A T5ForConditionalGeneration or T5Tokenizer object.

        Raises:
            ValueError: Unsupported data type used
        """
        filepath = self.uri

        with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
            # Copy files from artifact store to temporary directory
            for file in fileio.listdir(filepath):
                src = os.path.join(filepath, file)
                dst = os.path.join(temp_dir, file)
                if fileio.isdir(src):
                    fileio.makedirs(dst)
                    for subfile in fileio.listdir(src):
                        subsrc = os.path.join(src, subfile)
                        subdst = os.path.join(dst, subfile)
                        fileio.copy(subsrc, subdst)
                else:
                    fileio.copy(src, dst)

            # Load the model or tokenizer from the temporary directory
            if data_type in [
                T5ForConditionalGeneration,
                T5Tokenizer,
                T5TokenizerFast,
            ]:
                return data_type.from_pretrained(temp_dir)
            else:
                raise ValueError(f"Unsupported data type: {data_type}")

    def save(
        self,
        obj: Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast],
    ) -> None:
        """Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.

        Args:
            obj: A T5ForConditionalGeneration model or T5Tokenizer.
        """
        # Create a temporary directory
        with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
            # Save the model or tokenizer
            obj.save_pretrained(temp_dir)

            # Copy the directory to the artifact store
            filepath = self.uri
            fileio.makedirs(filepath)
            for file in os.listdir(temp_dir):
                src = os.path.join(temp_dir, file)
                dst = os.path.join(filepath, file)
                if os.path.isdir(src):
                    fileio.makedirs(dst)
                    for subfile in os.listdir(src):
                        subsrc = os.path.join(src, subfile)
                        subdst = os.path.join(dst, subfile)
                        fileio.copy(subsrc, subdst)
                else:
                    fileio.copy(src, dst)
load(self, data_type)

Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.

Parameters:

Name Type Description Default
data_type Type[Any]

A T5ForConditionalGeneration or T5Tokenizer type.

required

Returns:

Type Description
Union[transformers.T5ForConditionalGeneration, transformers.T5Tokenizer, transformers.T5TokenizerFast]

A T5ForConditionalGeneration or T5Tokenizer object.

Exceptions:

Type Description
ValueError

Unsupported data type used

Source code in zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
def load(
    self, data_type: Type[Any]
) -> Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]:
    """Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.

    Args:
        data_type: A T5ForConditionalGeneration or T5Tokenizer type.

    Returns:
        A T5ForConditionalGeneration or T5Tokenizer object.

    Raises:
        ValueError: Unsupported data type used
    """
    filepath = self.uri

    with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
        # Copy files from artifact store to temporary directory
        for file in fileio.listdir(filepath):
            src = os.path.join(filepath, file)
            dst = os.path.join(temp_dir, file)
            if fileio.isdir(src):
                fileio.makedirs(dst)
                for subfile in fileio.listdir(src):
                    subsrc = os.path.join(src, subfile)
                    subdst = os.path.join(dst, subfile)
                    fileio.copy(subsrc, subdst)
            else:
                fileio.copy(src, dst)

        # Load the model or tokenizer from the temporary directory
        if data_type in [
            T5ForConditionalGeneration,
            T5Tokenizer,
            T5TokenizerFast,
        ]:
            return data_type.from_pretrained(temp_dir)
        else:
            raise ValueError(f"Unsupported data type: {data_type}")
save(self, obj)

Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.

Parameters:

Name Type Description Default
obj Union[transformers.T5ForConditionalGeneration, transformers.T5Tokenizer, transformers.T5TokenizerFast]

A T5ForConditionalGeneration model or T5Tokenizer.

required
Source code in zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
def save(
    self,
    obj: Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast],
) -> None:
    """Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.

    Args:
        obj: A T5ForConditionalGeneration model or T5Tokenizer.
    """
    # Create a temporary directory
    with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
        # Save the model or tokenizer
        obj.save_pretrained(temp_dir)

        # Copy the directory to the artifact store
        filepath = self.uri
        fileio.makedirs(filepath)
        for file in os.listdir(temp_dir):
            src = os.path.join(temp_dir, file)
            dst = os.path.join(filepath, file)
            if os.path.isdir(src):
                fileio.makedirs(dst)
                for subfile in os.listdir(src):
                    subsrc = os.path.join(src, subfile)
                    subdst = os.path.join(dst, subfile)
                    fileio.copy(subsrc, subdst)
            else:
                fileio.copy(src, dst)

huggingface_tf_model_materializer

Implementation of the Huggingface TF model materializer.

HFTFModelMaterializer (BaseMaterializer)

Materializer to read Tensorflow model to and from huggingface pretrained model.

Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
class HFTFModelMaterializer(BaseMaterializer):
    """Materializer to read Tensorflow model to and from huggingface pretrained model."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (TFPreTrainedModel,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL

    def load(self, data_type: Type[TFPreTrainedModel]) -> TFPreTrainedModel:
        """Reads HFModel.

        Args:
            data_type: The type of the model to read.

        Returns:
            The model read from the specified dir.
        """
        temp_dir = TemporaryDirectory()
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_TF_MODEL_DIR), temp_dir.name
        )

        config = AutoConfig.from_pretrained(temp_dir.name)
        architecture = "TF" + config.architectures[0]
        model_cls = getattr(
            importlib.import_module("transformers"), architecture
        )
        return model_cls.from_pretrained(temp_dir.name)

    def save(self, model: TFPreTrainedModel) -> None:
        """Writes a Model to the specified dir.

        Args:
            model: The TF Model to write.
        """
        temp_dir = TemporaryDirectory()
        model.save_pretrained(temp_dir.name)
        io_utils.copy_dir(
            temp_dir.name,
            os.path.join(self.uri, DEFAULT_TF_MODEL_DIR),
        )

    def extract_metadata(
        self, model: TFPreTrainedModel
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `PreTrainedModel` object.

        Args:
            model: The `PreTrainedModel` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return {
            "num_layers": len(model.layers),
            "num_params": model.num_parameters(only_trainable=False),
            "num_trainable_params": model.num_parameters(only_trainable=True),
        }
extract_metadata(self, model)

Extract metadata from the given PreTrainedModel object.

Parameters:

Name Type Description Default
model transformers.TFPreTrainedModel

The PreTrainedModel object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def extract_metadata(
    self, model: TFPreTrainedModel
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `PreTrainedModel` object.

    Args:
        model: The `PreTrainedModel` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "num_layers": len(model.layers),
        "num_params": model.num_parameters(only_trainable=False),
        "num_trainable_params": model.num_parameters(only_trainable=True),
    }
load(self, data_type)

Reads HFModel.

Parameters:

Name Type Description Default
data_type Type[transformers.TFPreTrainedModel]

The type of the model to read.

required

Returns:

Type Description
transformers.TFPreTrainedModel

The model read from the specified dir.

Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def load(self, data_type: Type[TFPreTrainedModel]) -> TFPreTrainedModel:
    """Reads HFModel.

    Args:
        data_type: The type of the model to read.

    Returns:
        The model read from the specified dir.
    """
    temp_dir = TemporaryDirectory()
    io_utils.copy_dir(
        os.path.join(self.uri, DEFAULT_TF_MODEL_DIR), temp_dir.name
    )

    config = AutoConfig.from_pretrained(temp_dir.name)
    architecture = "TF" + config.architectures[0]
    model_cls = getattr(
        importlib.import_module("transformers"), architecture
    )
    return model_cls.from_pretrained(temp_dir.name)
save(self, model)

Writes a Model to the specified dir.

Parameters:

Name Type Description Default
model transformers.TFPreTrainedModel

The TF Model to write.

required
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def save(self, model: TFPreTrainedModel) -> None:
    """Writes a Model to the specified dir.

    Args:
        model: The TF Model to write.
    """
    temp_dir = TemporaryDirectory()
    model.save_pretrained(temp_dir.name)
    io_utils.copy_dir(
        temp_dir.name,
        os.path.join(self.uri, DEFAULT_TF_MODEL_DIR),
    )

huggingface_tokenizer_materializer

Implementation of the Huggingface tokenizer materializer.

HFTokenizerMaterializer (BaseMaterializer)

Materializer to read tokenizer to and from huggingface tokenizer.

Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
class HFTokenizerMaterializer(BaseMaterializer):
    """Materializer to read tokenizer to and from huggingface tokenizer."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        PreTrainedTokenizerBase,
    )
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL

    def load(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
        """Reads Tokenizer.

        Args:
            data_type: The type of the tokenizer to read.

        Returns:
            The tokenizer read from the specified dir.
        """
        with TemporaryDirectory() as temp_dir:
            io_utils.copy_dir(
                os.path.join(self.uri, DEFAULT_TOKENIZER_DIR), temp_dir
            )
            return AutoTokenizer.from_pretrained(temp_dir)

    def save(self, tokenizer: Type[Any]) -> None:
        """Writes a Tokenizer to the specified dir.

        Args:
            tokenizer: The HFTokenizer to write.
        """
        with TemporaryDirectory() as temp_dir:
            tokenizer.save_pretrained(temp_dir)
            io_utils.copy_dir(
                temp_dir,
                os.path.join(self.uri, DEFAULT_TOKENIZER_DIR),
            )
load(self, data_type)

Reads Tokenizer.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the tokenizer to read.

required

Returns:

Type Description
transformers.tokenization_utils_base.PreTrainedTokenizerBase

The tokenizer read from the specified dir.

Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
def load(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
    """Reads Tokenizer.

    Args:
        data_type: The type of the tokenizer to read.

    Returns:
        The tokenizer read from the specified dir.
    """
    with TemporaryDirectory() as temp_dir:
        io_utils.copy_dir(
            os.path.join(self.uri, DEFAULT_TOKENIZER_DIR), temp_dir
        )
        return AutoTokenizer.from_pretrained(temp_dir)
save(self, tokenizer)

Writes a Tokenizer to the specified dir.

Parameters:

Name Type Description Default
tokenizer Type[Any]

The HFTokenizer to write.

required
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
def save(self, tokenizer: Type[Any]) -> None:
    """Writes a Tokenizer to the specified dir.

    Args:
        tokenizer: The HFTokenizer to write.
    """
    with TemporaryDirectory() as temp_dir:
        tokenizer.save_pretrained(temp_dir)
        io_utils.copy_dir(
            temp_dir,
            os.path.join(self.uri, DEFAULT_TOKENIZER_DIR),
        )

model_deployers special

Initialization of the Hugging Face model deployers.

huggingface_model_deployer

Implementation of the Hugging Face Model Deployer.

HuggingFaceModelDeployer (BaseModelDeployer)

Hugging Face endpoint model deployer.

Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
class HuggingFaceModelDeployer(BaseModelDeployer):
    """Hugging Face endpoint model deployer."""

    NAME: ClassVar[str] = "HuggingFace"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = (
        HuggingFaceModelDeployerFlavor
    )

    @property
    def config(self) -> HuggingFaceModelDeployerConfig:
        """Config class for the Hugging Face Model deployer settings class.

        Returns:
            The configuration.
        """
        return cast(HuggingFaceModelDeployerConfig, self._config)

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote artifact
            store.
        """

        def _validate_if_secret_or_token_is_present(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            """Check if secret or token is present in the stack.

            Args:
                stack: The stack to validate.

            Returns:
                A tuple with a boolean indicating whether the stack is valid
                and a message describing the validation result.
            """
            return bool(self.config.token or self.config.secret_name), (
                "The Hugging Face model deployer requires either a secret name"
                " or a token to be present in the stack."
            )

        return StackValidator(
            custom_validation_function=_validate_if_secret_or_token_is_present,
        )

    def _create_new_service(
        self, id: UUID, timeout: int, config: HuggingFaceServiceConfig
    ) -> HuggingFaceDeploymentService:
        """Creates a new Hugging FaceDeploymentService.

        Args:
            id: the UUID of the model to be deployed with Hugging Face model deployer.
            timeout: the timeout in seconds to wait for the Hugging Face inference endpoint
                to be provisioned and successfully started or updated.
            config: the configuration of the model to be deployed with Hugging Face model deployer.

        Returns:
            The HuggingFaceServiceConfig object that can be used to interact
            with the Hugging Face inference endpoint.
        """
        # create a new service for the new model
        service = HuggingFaceDeploymentService(uuid=id, config=config)

        logger.info(
            f"Creating an artifact {HUGGINGFACE_SERVICE_ARTIFACT} with service instance attached as metadata."
            " If there's an active pipeline and/or model this artifact will be associated with it."
        )
        service.start(timeout=timeout)
        return service

    def _clean_up_existing_service(
        self,
        timeout: int,
        force: bool,
        existing_service: HuggingFaceDeploymentService,
    ) -> None:
        """Stop existing services.

        Args:
            timeout: the timeout in seconds to wait for the Hugging Face
                deployment to be stopped.
            force: if True, force the service to stop
            existing_service: Existing Hugging Face deployment service
        """
        # stop the older service
        existing_service.stop(timeout=timeout, force=force)

    def perform_deploy_model(
        self,
        id: UUID,
        config: ServiceConfig,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new Hugging Face deployment service or update an existing one.

        This should serve the supplied model and deployment configuration.

        Args:
            id: the UUID of the model to be deployed with Hugging Face.
            config: the configuration of the model to be deployed with Hugging Face.
            timeout: the timeout in seconds to wait for the Hugging Face endpoint
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the Hugging Face
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML Hugging Face deployment service object that can be used to
            interact with the remote Hugging Face inference endpoint server.
        """
        with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
            config = cast(HuggingFaceServiceConfig, config)
            # create a new HuggingFaceDeploymentService instance
            service = self._create_new_service(
                id=id, timeout=timeout, config=config
            )
            logger.info(
                f"Creating a new Hugging Face inference endpoint service: {service}"
            )
            # Add telemetry with metadata that gets the stack metadata and
            # differentiates between pure model and custom code deployments
            stack = Client().active_stack
            stack_metadata = {
                component_type.value: component.flavor
                for component_type, component in stack.components.items()
            }
            analytics_handler.metadata = {
                "store_type": Client().zen_store.type.value,
                **stack_metadata,
            }

        return service

    def perform_stop_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> BaseService:
        """Method to stop a model server.

        Args:
            service: The service to stop.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.

        Returns:
            The stopped service.
        """
        service.stop(timeout=timeout, force=force)
        return service

    def perform_start_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Method to start a model server.

        Args:
            service: The service to start.
            timeout: Timeout in seconds to wait for the service to start.

        Returns:
            The started service.
        """
        service.start(timeout=timeout)
        return service

    def perform_delete_model(
        self,
        service: BaseService,
        timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to delete all configuration of a model server.

        Args:
            service: The service to delete.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        service = cast(HuggingFaceDeploymentService, service)
        self._clean_up_existing_service(
            existing_service=service, timeout=timeout, force=force
        )

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "HuggingFaceDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information that might be relevant to the user.

        Args:
            service_instance: Instance of a HuggingFaceDeploymentService

        Returns:
            Model server information.
        """
        return {
            "PREDICTION_URL": service_instance.get_prediction_url(),
            "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
        }
config: HuggingFaceModelDeployerConfig property readonly

Config class for the Hugging Face Model deployer settings class.

Returns:

Type Description
HuggingFaceModelDeployerConfig

The configuration.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote artifact store.

FLAVOR (BaseModelDeployerFlavor)

Hugging Face Endpoint model deployer flavor.

Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
class HuggingFaceModelDeployerFlavor(BaseModelDeployerFlavor):
    """Hugging Face Endpoint model deployer flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return HUGGINGFACE_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_registry/huggingface.png"

    @property
    def config_class(self) -> Type[HuggingFaceModelDeployerConfig]:
        """Returns `HuggingFaceModelDeployerConfig` config class.

        Returns:
            The config class.
        """
        return HuggingFaceModelDeployerConfig

    @property
    def implementation_class(self) -> Type["HuggingFaceModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
            HuggingFaceModelDeployer,
        )

        return HuggingFaceModelDeployer
config_class: Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig] property readonly

Returns HuggingFaceModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[HuggingFaceModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[HuggingFaceModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

get_model_server_info(service_instance) staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance HuggingFaceDeploymentService

Instance of a HuggingFaceDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "HuggingFaceDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a HuggingFaceDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.get_prediction_url(),
        "HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
    }
perform_delete_model(self, service, timeout=300, force=False)

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
service BaseService

The service to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

300
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_delete_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        service: The service to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    service = cast(HuggingFaceDeploymentService, service)
    self._clean_up_existing_service(
        existing_service=service, timeout=timeout, force=force
    )
perform_deploy_model(self, id, config, timeout=300)

Create a new Hugging Face deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

Parameters:

Name Type Description Default
id UUID

the UUID of the model to be deployed with Hugging Face.

required
config ServiceConfig

the configuration of the model to be deployed with Hugging Face.

required
timeout int

the timeout in seconds to wait for the Hugging Face endpoint to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Hugging Face server is provisioned, without waiting for it to fully start.

300

Returns:

Type Description
BaseService

The ZenML Hugging Face deployment service object that can be used to interact with the remote Hugging Face inference endpoint server.

Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_deploy_model(
    self,
    id: UUID,
    config: ServiceConfig,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Hugging Face deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    Args:
        id: the UUID of the model to be deployed with Hugging Face.
        config: the configuration of the model to be deployed with Hugging Face.
        timeout: the timeout in seconds to wait for the Hugging Face endpoint
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Hugging Face
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Hugging Face deployment service object that can be used to
        interact with the remote Hugging Face inference endpoint server.
    """
    with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(HuggingFaceServiceConfig, config)
        # create a new HuggingFaceDeploymentService instance
        service = self._create_new_service(
            id=id, timeout=timeout, config=config
        )
        logger.info(
            f"Creating a new Hugging Face inference endpoint service: {service}"
        )
        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
        }

    return service
perform_start_model(self, service, timeout=300)

Method to start a model server.

Parameters:

Name Type Description Default
service BaseService

The service to start.

required
timeout int

Timeout in seconds to wait for the service to start.

300

Returns:

Type Description
BaseService

The started service.

Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_start_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Method to start a model server.

    Args:
        service: The service to start.
        timeout: Timeout in seconds to wait for the service to start.

    Returns:
        The started service.
    """
    service.start(timeout=timeout)
    return service
perform_stop_model(self, service, timeout=300, force=False)

Method to stop a model server.

Parameters:

Name Type Description Default
service BaseService

The service to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

300
force bool

If True, force the service to stop.

False

Returns:

Type Description
BaseService

The stopped service.

Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_stop_model(
    self,
    service: BaseService,
    timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> BaseService:
    """Method to stop a model server.

    Args:
        service: The service to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.

    Returns:
        The stopped service.
    """
    service.stop(timeout=timeout, force=force)
    return service

services special

Initialization of the Hugging Face Service.

huggingface_deployment

Implementation of the Hugging Face Deployment service.

HuggingFaceDeploymentService (BaseDeploymentService)

Hugging Face model deployment service.

Attributes:

Name Type Description
SERVICE_TYPE ClassVar[zenml.services.service_type.ServiceType]

a service type descriptor with information describing the Hugging Face deployment service class

config HuggingFaceServiceConfig

service configuration

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
class HuggingFaceDeploymentService(BaseDeploymentService):
    """Hugging Face model deployment service.

    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the Hugging Face deployment service class
        config: service configuration
    """

    SERVICE_TYPE = ServiceType(
        name="huggingface-deployment",
        type="model-serving",
        flavor="huggingface",
        description="Hugging Face inference endpoint prediction service",
    )
    config: HuggingFaceServiceConfig
    status: HuggingFaceServiceStatus = Field(
        default_factory=lambda: HuggingFaceServiceStatus()
    )

    def __init__(self, config: HuggingFaceServiceConfig, **attrs: Any):
        """Initialize the Hugging Face deployment service.

        Args:
            config: service configuration
            attrs: additional attributes to set on the service
        """
        super().__init__(config=config, **attrs)

    def get_token(self) -> str:
        """Get the Hugging Face token.

        Raises:
            ValueError: If token not found.

        Returns:
            Hugging Face token.
        """
        client = Client()
        token = None
        if self.config.secret_name:
            secret = client.get_secret(self.config.secret_name)
            token = secret.secret_values["token"]
        else:
            from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
                HuggingFaceModelDeployer,
            )

            model_deployer = client.active_stack.model_deployer
            if not isinstance(model_deployer, HuggingFaceModelDeployer):
                raise ValueError(
                    "HuggingFaceModelDeployer is not active in the stack."
                )
            token = model_deployer.config.token or None
        if not token:
            raise ValueError("Token not found.")
        return token

    @property
    def hf_endpoint(self) -> InferenceEndpoint:
        """Get the deployed Hugging Face inference endpoint.

        Returns:
            Huggingface inference endpoint.
        """
        return get_inference_endpoint(
            name=self._generate_an_endpoint_name(),
            token=self.get_token(),
            namespace=self.config.namespace,
        )

    @property
    def prediction_url(self) -> Optional[str]:
        """The prediction URI exposed by the prediction service.

        Returns:
            The prediction URI exposed by the prediction service, or None if
            the service is not yet ready.
        """
        return self.hf_endpoint.url if self.is_running else None

    @property
    def inference_client(self) -> InferenceClient:
        """Get the Hugging Face InferenceClient from Inference Endpoint.

        Returns:
            Hugging Face inference client.
        """
        return self.hf_endpoint.client

    def provision(self) -> None:
        """Provision or update remote Hugging Face deployment instance.

        Raises:
            Exception: If any unexpected error while creating inference endpoint.
        """
        try:
            # Attempt to create and wait for the inference endpoint
            hf_endpoint = create_inference_endpoint(
                name=self._generate_an_endpoint_name(),
                repository=self.config.repository,
                framework=self.config.framework,
                accelerator=self.config.accelerator,
                instance_size=self.config.instance_size,
                instance_type=self.config.instance_type,
                region=self.config.region,
                vendor=self.config.vendor,
                account_id=self.config.account_id,
                min_replica=self.config.min_replica,
                max_replica=self.config.max_replica,
                revision=self.config.revision,
                task=self.config.task,
                custom_image=self.config.custom_image,
                type=self.config.endpoint_type,
                token=self.get_token(),
                namespace=self.config.namespace,
            ).wait(timeout=POLLING_TIMEOUT)

        except Exception as e:
            self.status.update_state(
                new_state=ServiceState.ERROR, error=str(e)
            )
            # Catch-all for any other unexpected errors
            raise Exception(
                f"An unexpected error occurred while provisioning the Hugging Face inference endpoint: {e}"
            )

        # Check if the endpoint URL is available after provisioning
        if hf_endpoint.url:
            logger.info(
                f"Hugging Face inference endpoint successfully deployed and available. Endpoint URL: {hf_endpoint.url}"
            )
        else:
            logger.error(
                "Failed to start Hugging Face inference endpoint service: No URL available, please check the Hugging Face console for more details."
            )

    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the Hugging Face deployment.

        Returns:
            The operational state of the Hugging Face deployment and a message
            providing additional information about that state (e.g. a
            description of the error, if one is encountered).
        """
        try:
            status = self.hf_endpoint.status
            if status == InferenceEndpointStatus.RUNNING:
                return (ServiceState.ACTIVE, "")

            elif status == InferenceEndpointStatus.SCALED_TO_ZERO:
                return (
                    ServiceState.SCALED_TO_ZERO,
                    "Hugging Face Inference Endpoint is scaled to zero, but still running. It will be started on demand.",
                )

            elif status == InferenceEndpointStatus.FAILED:
                return (
                    ServiceState.ERROR,
                    "Hugging Face Inference Endpoint deployment is inactive or not found",
                )
            elif status == InferenceEndpointStatus.PENDING:
                return (ServiceState.PENDING_STARTUP, "")
            return (ServiceState.PENDING_STARTUP, "")
        except (InferenceEndpointError, HfHubHTTPError):
            return (
                ServiceState.INACTIVE,
                "Hugging Face Inference Endpoint deployment is inactive or not found",
            )

    def deprovision(self, force: bool = False) -> None:
        """Deprovision the remote Hugging Face deployment instance.

        Args:
            force: if True, the remote deployment instance will be
                forcefully deprovisioned.
        """
        try:
            self.hf_endpoint.delete()
        except HfHubHTTPError:
            logger.error(
                "Hugging Face Inference Endpoint is deleted or cannot be found."
            )

    def predict(self, data: "Any", max_new_tokens: int) -> "Any":
        """Make a prediction using the service.

        Args:
            data: input data
            max_new_tokens: Number of new tokens to generate

        Returns:
            The prediction result.

        Raises:
            Exception: if the service is not running
            NotImplementedError: if task is not supported.
        """
        if not self.is_running:
            raise Exception(
                "Hugging Face endpoint inference service is not running. "
                "Please start the service before making predictions."
            )
        if self.prediction_url is not None:
            if self.hf_endpoint.task == "text-generation":
                result = self.inference_client.task_generation(
                    data, max_new_tokens=max_new_tokens
                )
        else:
            # TODO: Add support for all different supported tasks
            raise NotImplementedError(
                "Tasks other than text-generation is not implemented."
            )
        return result

    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the service logs.

        Args:
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.
        """
        logger.info(
            "Hugging Face Endpoints provides access to the logs of "
            "your Endpoints through the UI in the “Logs” tab of your Endpoint"
        )
        return  # type: ignore

    def _generate_an_endpoint_name(self) -> str:
        """Generate a unique name for the Hugging Face Inference Endpoint.

        Returns:
            A unique name for the Hugging Face Inference Endpoint.
        """
        return (
            f"{self.config.service_name}-{str(self.uuid)[:UUID_SLICE_LENGTH]}"
        )
hf_endpoint: huggingface_hub.InferenceEndpoint property readonly

Get the deployed Hugging Face inference endpoint.

Returns:

Type Description
huggingface_hub.InferenceEndpoint

Huggingface inference endpoint.

inference_client: huggingface_hub.InferenceClient property readonly

Get the Hugging Face InferenceClient from Inference Endpoint.

Returns:

Type Description
huggingface_hub.InferenceClient

Hugging Face inference client.

prediction_url: Optional[str] property readonly

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if the service is not yet ready.

__init__(self, config, **attrs) special

Initialize the Hugging Face deployment service.

Parameters:

Name Type Description Default
config HuggingFaceServiceConfig

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def __init__(self, config: HuggingFaceServiceConfig, **attrs: Any):
    """Initialize the Hugging Face deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    super().__init__(config=config, **attrs)
check_status(self)

Check the the current operational state of the Hugging Face deployment.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the Hugging Face deployment and a message providing additional information about that state (e.g. a description of the error, if one is encountered).

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the Hugging Face deployment.

    Returns:
        The operational state of the Hugging Face deployment and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    try:
        status = self.hf_endpoint.status
        if status == InferenceEndpointStatus.RUNNING:
            return (ServiceState.ACTIVE, "")

        elif status == InferenceEndpointStatus.SCALED_TO_ZERO:
            return (
                ServiceState.SCALED_TO_ZERO,
                "Hugging Face Inference Endpoint is scaled to zero, but still running. It will be started on demand.",
            )

        elif status == InferenceEndpointStatus.FAILED:
            return (
                ServiceState.ERROR,
                "Hugging Face Inference Endpoint deployment is inactive or not found",
            )
        elif status == InferenceEndpointStatus.PENDING:
            return (ServiceState.PENDING_STARTUP, "")
        return (ServiceState.PENDING_STARTUP, "")
    except (InferenceEndpointError, HfHubHTTPError):
        return (
            ServiceState.INACTIVE,
            "Hugging Face Inference Endpoint deployment is inactive or not found",
        )
deprovision(self, force=False)

Deprovision the remote Hugging Face deployment instance.

Parameters:

Name Type Description Default
force bool

if True, the remote deployment instance will be forcefully deprovisioned.

False
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def deprovision(self, force: bool = False) -> None:
    """Deprovision the remote Hugging Face deployment instance.

    Args:
        force: if True, the remote deployment instance will be
            forcefully deprovisioned.
    """
    try:
        self.hf_endpoint.delete()
    except HfHubHTTPError:
        logger.error(
            "Hugging Face Inference Endpoint is deleted or cannot be found."
        )
get_logs(self, follow=False, tail=None)

Retrieve the service logs.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the service logs.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    logger.info(
        "Hugging Face Endpoints provides access to the logs of "
        "your Endpoints through the UI in the “Logs” tab of your Endpoint"
    )
    return  # type: ignore
get_token(self)

Get the Hugging Face token.

Exceptions:

Type Description
ValueError

If token not found.

Returns:

Type Description
str

Hugging Face token.

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def get_token(self) -> str:
    """Get the Hugging Face token.

    Raises:
        ValueError: If token not found.

    Returns:
        Hugging Face token.
    """
    client = Client()
    token = None
    if self.config.secret_name:
        secret = client.get_secret(self.config.secret_name)
        token = secret.secret_values["token"]
    else:
        from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
            HuggingFaceModelDeployer,
        )

        model_deployer = client.active_stack.model_deployer
        if not isinstance(model_deployer, HuggingFaceModelDeployer):
            raise ValueError(
                "HuggingFaceModelDeployer is not active in the stack."
            )
        token = model_deployer.config.token or None
    if not token:
        raise ValueError("Token not found.")
    return token
predict(self, data, max_new_tokens)

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

input data

required
max_new_tokens int

Number of new tokens to generate

required

Returns:

Type Description
Any

The prediction result.

Exceptions:

Type Description
Exception

if the service is not running

NotImplementedError

if task is not supported.

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def predict(self, data: "Any", max_new_tokens: int) -> "Any":
    """Make a prediction using the service.

    Args:
        data: input data
        max_new_tokens: Number of new tokens to generate

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        NotImplementedError: if task is not supported.
    """
    if not self.is_running:
        raise Exception(
            "Hugging Face endpoint inference service is not running. "
            "Please start the service before making predictions."
        )
    if self.prediction_url is not None:
        if self.hf_endpoint.task == "text-generation":
            result = self.inference_client.task_generation(
                data, max_new_tokens=max_new_tokens
            )
    else:
        # TODO: Add support for all different supported tasks
        raise NotImplementedError(
            "Tasks other than text-generation is not implemented."
        )
    return result
provision(self)

Provision or update remote Hugging Face deployment instance.

Exceptions:

Type Description
Exception

If any unexpected error while creating inference endpoint.

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def provision(self) -> None:
    """Provision or update remote Hugging Face deployment instance.

    Raises:
        Exception: If any unexpected error while creating inference endpoint.
    """
    try:
        # Attempt to create and wait for the inference endpoint
        hf_endpoint = create_inference_endpoint(
            name=self._generate_an_endpoint_name(),
            repository=self.config.repository,
            framework=self.config.framework,
            accelerator=self.config.accelerator,
            instance_size=self.config.instance_size,
            instance_type=self.config.instance_type,
            region=self.config.region,
            vendor=self.config.vendor,
            account_id=self.config.account_id,
            min_replica=self.config.min_replica,
            max_replica=self.config.max_replica,
            revision=self.config.revision,
            task=self.config.task,
            custom_image=self.config.custom_image,
            type=self.config.endpoint_type,
            token=self.get_token(),
            namespace=self.config.namespace,
        ).wait(timeout=POLLING_TIMEOUT)

    except Exception as e:
        self.status.update_state(
            new_state=ServiceState.ERROR, error=str(e)
        )
        # Catch-all for any other unexpected errors
        raise Exception(
            f"An unexpected error occurred while provisioning the Hugging Face inference endpoint: {e}"
        )

    # Check if the endpoint URL is available after provisioning
    if hf_endpoint.url:
        logger.info(
            f"Hugging Face inference endpoint successfully deployed and available. Endpoint URL: {hf_endpoint.url}"
        )
    else:
        logger.error(
            "Failed to start Hugging Face inference endpoint service: No URL available, please check the Hugging Face console for more details."
        )
HuggingFaceServiceConfig (HuggingFaceBaseConfig, ServiceConfig)

Hugging Face service configurations.

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
class HuggingFaceServiceConfig(HuggingFaceBaseConfig, ServiceConfig):
    """Hugging Face service configurations."""
HuggingFaceServiceStatus (ServiceStatus)

Hugging Face service status.

Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
class HuggingFaceServiceStatus(ServiceStatus):
    """Hugging Face service status."""

steps special

Initialization for Hugging Face model deployer step.

accelerate_runner

Step function to run any ZenML step using Accelerate.

run_with_accelerate(step_function_top_level=None, **accelerate_launch_kwargs)

Run a function with accelerate.

Accelerate package: https://huggingface.co/docs/accelerate/en/index

Examples:

from zenml import step, pipeline
from zenml.integrations.hugginface.steps import run_with_accelerate

@run_with_accelerate(num_processes=4, multi_gpu=True)
@step
def training_step(some_param: int, ...):
    # your training code is below
    ...

@pipeline
def training_pipeline(some_param: int, ...):
    training_step(some_param, ...)

Parameters:

Name Type Description Default
step_function_top_level Optional[zenml.steps.base_step.BaseStep]

The step function to run with accelerate [optional]. Used in functional calls like run_with_accelerate(some_func,foo=bar)().

None
accelerate_launch_kwargs Any

A dictionary of arguments to pass along to the accelerate launch command, including hardware selection, resource allocation, and training paradigm options. Visit https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch for more details.

{}

Returns:

Type Description
Union[Callable[[zenml.steps.base_step.BaseStep], zenml.steps.base_step.BaseStep], zenml.steps.base_step.BaseStep]

The accelerate-enabled version of the step.

Source code in zenml/integrations/huggingface/steps/accelerate_runner.py
def run_with_accelerate(
    step_function_top_level: Optional[BaseStep] = None,
    **accelerate_launch_kwargs: Any,
) -> Union[Callable[[BaseStep], BaseStep], BaseStep]:
    """Run a function with accelerate.

    Accelerate package: https://huggingface.co/docs/accelerate/en/index
    Example:
        ```python
        from zenml import step, pipeline
        from zenml.integrations.hugginface.steps import run_with_accelerate

        @run_with_accelerate(num_processes=4, multi_gpu=True)
        @step
        def training_step(some_param: int, ...):
            # your training code is below
            ...

        @pipeline
        def training_pipeline(some_param: int, ...):
            training_step(some_param, ...)
        ```

    Args:
        step_function_top_level: The step function to run with accelerate [optional].
            Used in functional calls like `run_with_accelerate(some_func,foo=bar)()`.
        accelerate_launch_kwargs: A dictionary of arguments to pass along to the
            `accelerate launch` command, including hardware selection, resource
            allocation, and training paradigm options. Visit
            https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch
            for more details.

    Returns:
        The accelerate-enabled version of the step.
    """

    def _decorator(step_function: BaseStep) -> BaseStep:
        def _wrapper(
            entrypoint: F, accelerate_launch_kwargs: Dict[str, Any]
        ) -> F:
            @functools.wraps(entrypoint)
            def inner(*args: Any, **kwargs: Any) -> Any:
                if args:
                    raise ValueError(
                        "Accelerated steps do not support positional arguments."
                    )

                with create_cli_wrapped_script(
                    entrypoint, flavor="accelerate"
                ) as (
                    script_path,
                    output_path,
                ):
                    commands = [str(script_path.absolute())]
                    for k, v in kwargs.items():
                        k = _cli_arg_name(k)
                        if isinstance(v, bool):
                            if v:
                                commands.append(f"--{k}")
                        elif type(v) in (list, tuple, set):
                            for each in v:
                                commands += [f"--{k}", f"{each}"]
                        else:
                            commands += [f"--{k}", f"{v}"]
                    logger.debug(commands)

                    parser = launch_command_parser()
                    args = parser.parse_args(commands)
                    for k, v in accelerate_launch_kwargs.items():
                        if k in args:
                            setattr(args, k, v)
                        else:
                            logger.warning(
                                f"You passed in `{k}` as an `accelerate launch` argument, but it was not accepted. "
                                "Please check https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch "
                                "to find out more about supported arguments and retry."
                            )
                    try:
                        launch_command(args)
                    except Exception as e:
                        logger.error(
                            "Accelerate training job failed... See error message for details."
                        )
                        raise RuntimeError(
                            "Accelerate training job failed."
                        ) from e
                    else:
                        logger.info(
                            "Accelerate training job finished successfully."
                        )
                        return pickle.load(open(output_path, "rb"))

            return cast(F, inner)

        try:
            get_pipeline_context()
        except RuntimeError:
            pass
        else:
            raise RuntimeError(
                f"`{run_with_accelerate.__name__}` decorator cannot be used "
                "in a functional way with steps, please apply decoration "
                "directly to a step instead. This behavior will be also "
                "allowed in future, but now it faces technical limitations.\n"
                "Example (allowed):\n"
                f"@{run_with_accelerate.__name__}(...)\n"
                f"def {step_function.name}(...):\n"
                "    ...\n"
                "Example (not allowed):\n"
                "def my_pipeline(...):\n"
                f"    run_with_accelerate({step_function.name},...)(...)\n"
            )

        setattr(
            step_function, "unwrapped_entrypoint", step_function.entrypoint
        )
        setattr(
            step_function,
            "entrypoint",
            _wrapper(
                step_function.entrypoint,
                accelerate_launch_kwargs=accelerate_launch_kwargs,
            ),
        )

        return step_function

    if step_function_top_level:
        return _decorator(step_function_top_level)
    return _decorator

huggingface_deployer

Implementation of the Hugging Face Deployer step.