Huggingface
zenml.integrations.huggingface
special
Initialization of the Huggingface integration.
HuggingfaceIntegration (Integration)
Definition of Huggingface integration for ZenML.
Source code in zenml/integrations/huggingface/__init__.py
class HuggingfaceIntegration(Integration):
"""Definition of Huggingface integration for ZenML."""
NAME = HUGGINGFACE
REQUIREMENTS_IGNORED_ON_UNINSTALL = ["fsspec", "pandas"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.huggingface import materializers # noqa
from zenml.integrations.huggingface import services
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
"""Defines platform specific requirements for the integration.
Args:
target_os: The target operating system.
Returns:
A list of requirements.
"""
requirements = [
"datasets",
"huggingface_hub>0.19.0",
"accelerate",
"bitsandbytes>=0.41.3",
"peft",
# temporary fix for CI issue similar to:
# - https://github.com/huggingface/datasets/issues/6737
# - https://github.com/huggingface/datasets/issues/6697
# TODO try relaxing it back going forward
"fsspec<=2023.12.0",
]
# In python 3.8 higher transformers version lead to other packages breaking
if sys.version_info.minor > 8:
requirements += ["transformers"]
else:
requirements += ["transformers<=4.31"]
# Add the pandas integration requirements
from zenml.integrations.pandas import PandasIntegration
return requirements + \
PandasIntegration.get_requirements(target_os=target_os)
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Huggingface integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.huggingface.flavors import (
HuggingFaceModelDeployerFlavor,
)
return [HuggingFaceModelDeployerFlavor]
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.huggingface import materializers # noqa
from zenml.integrations.huggingface import services
flavors()
classmethod
Declare the stack component flavors for the Huggingface integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Huggingface integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.huggingface.flavors import (
HuggingFaceModelDeployerFlavor,
)
return [HuggingFaceModelDeployerFlavor]
get_requirements(target_os=None)
classmethod
Defines platform specific requirements for the integration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_os |
Optional[str] |
The target operating system. |
None |
Returns:
Type | Description |
---|---|
List[str] |
A list of requirements. |
Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
"""Defines platform specific requirements for the integration.
Args:
target_os: The target operating system.
Returns:
A list of requirements.
"""
requirements = [
"datasets",
"huggingface_hub>0.19.0",
"accelerate",
"bitsandbytes>=0.41.3",
"peft",
# temporary fix for CI issue similar to:
# - https://github.com/huggingface/datasets/issues/6737
# - https://github.com/huggingface/datasets/issues/6697
# TODO try relaxing it back going forward
"fsspec<=2023.12.0",
]
# In python 3.8 higher transformers version lead to other packages breaking
if sys.version_info.minor > 8:
requirements += ["transformers"]
else:
requirements += ["transformers<=4.31"]
# Add the pandas integration requirements
from zenml.integrations.pandas import PandasIntegration
return requirements + \
PandasIntegration.get_requirements(target_os=target_os)
flavors
special
Hugging Face integration flavors.
huggingface_model_deployer_flavor
Hugging Face model deployer flavor.
HuggingFaceBaseConfig (BaseModel)
Hugging Face Inference Endpoint configuration.
Source code in zenml/integrations/huggingface/flavors/huggingface_model_deployer_flavor.py
class HuggingFaceBaseConfig(BaseModel):
"""Hugging Face Inference Endpoint configuration."""
repository: Optional[str] = None
framework: Optional[str] = None
accelerator: Optional[str] = None
instance_size: Optional[str] = None
instance_type: Optional[str] = None
region: Optional[str] = None
vendor: Optional[str] = None
account_id: Optional[str] = None
min_replica: int = 0
max_replica: int = 1
revision: Optional[str] = None
task: Optional[str] = None
custom_image: Optional[Dict[str, Any]] = None
endpoint_type: str = "public"
secret_name: Optional[str] = None
namespace: Optional[str] = None
HuggingFaceModelDeployerConfig (BaseModelDeployerConfig, HuggingFaceBaseConfig)
Configuration for the Hugging Face model deployer.
Attributes:
Name | Type | Description |
---|---|---|
token |
Optional[str] |
Hugging Face token used for authentication |
namespace |
str |
Hugging Face namespace used to list endpoints |
Source code in zenml/integrations/huggingface/flavors/huggingface_model_deployer_flavor.py
class HuggingFaceModelDeployerConfig(
BaseModelDeployerConfig, HuggingFaceBaseConfig
):
"""Configuration for the Hugging Face model deployer.
Attributes:
token: Hugging Face token used for authentication
namespace: Hugging Face namespace used to list endpoints
"""
token: Optional[str] = SecretField(default=None)
# The namespace to list endpoints for. Set to `"*"` to list all endpoints
# from all namespaces (i.e. personal namespace and all orgs the user belongs to).
namespace: str
HuggingFaceModelDeployerFlavor (BaseModelDeployerFlavor)
Hugging Face Endpoint model deployer flavor.
Source code in zenml/integrations/huggingface/flavors/huggingface_model_deployer_flavor.py
class HuggingFaceModelDeployerFlavor(BaseModelDeployerFlavor):
"""Hugging Face Endpoint model deployer flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return HUGGINGFACE_MODEL_DEPLOYER_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_registry/huggingface.png"
@property
def config_class(self) -> Type[HuggingFaceModelDeployerConfig]:
"""Returns `HuggingFaceModelDeployerConfig` config class.
Returns:
The config class.
"""
return HuggingFaceModelDeployerConfig
@property
def implementation_class(self) -> Type["HuggingFaceModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
HuggingFaceModelDeployer,
)
return HuggingFaceModelDeployer
config_class: Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig]
property
readonly
Returns HuggingFaceModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[HuggingFaceModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[HuggingFaceModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
materializers
special
Initialization of Huggingface materializers.
huggingface_datasets_materializer
Implementation of the Huggingface datasets materializer.
HFDatasetMaterializer (BaseMaterializer)
Materializer to read data to and from huggingface datasets.
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
class HFDatasetMaterializer(BaseMaterializer):
"""Materializer to read data to and from huggingface datasets."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (Dataset, DatasetDict)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = (
ArtifactType.DATA_ANALYSIS
)
def load(
self, data_type: Union[Type[Dataset], Type[DatasetDict]]
) -> Union[Dataset, DatasetDict]:
"""Reads Dataset.
Args:
data_type: The type of the dataset to read.
Returns:
The dataset read from the specified dir.
"""
temp_dir = mkdtemp()
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_DATASET_DIR),
temp_dir,
)
return load_from_disk(temp_dir)
def save(self, ds: Union[Dataset, DatasetDict]) -> None:
"""Writes a Dataset to the specified dir.
Args:
ds: The Dataset to write.
"""
temp_dir = TemporaryDirectory()
path = os.path.join(temp_dir.name, DEFAULT_DATASET_DIR)
try:
ds.save_to_disk(path)
io_utils.copy_dir(
path,
os.path.join(self.uri, DEFAULT_DATASET_DIR),
)
finally:
fileio.rmtree(temp_dir.name)
def extract_metadata(
self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `Dataset` object.
Args:
ds: The `Dataset` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
Raises:
ValueError: If the given object is not a `Dataset` or `DatasetDict`.
"""
pandas_materializer = PandasMaterializer(self.uri)
if isinstance(ds, Dataset):
return pandas_materializer.extract_metadata(ds.to_pandas())
elif isinstance(ds, DatasetDict):
metadata: Dict[str, Dict[str, "MetadataType"]] = defaultdict(dict)
for dataset_name, dataset in ds.items():
dataset_metadata = pandas_materializer.extract_metadata(
dataset.to_pandas()
)
for key, value in dataset_metadata.items():
metadata[key][dataset_name] = value
return dict(metadata)
raise ValueError(f"Unsupported type {type(ds)}")
def save_visualizations(
self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, VisualizationType]:
"""Save visualizations for the dataset.
Args:
ds: The Dataset or DatasetDict to visualize.
Returns:
A dictionary mapping visualization paths to their types.
Raises:
ValueError: If the given object is not a `Dataset` or `DatasetDict`.
"""
visualizations = {}
if isinstance(ds, Dataset):
datasets = {"default": ds}
elif isinstance(ds, DatasetDict):
datasets = ds
else:
raise ValueError(f"Unsupported type {type(ds)}")
for name, dataset in datasets.items():
# Generate a unique identifier for the dataset
if dataset.info.download_checksums:
dataset_id = extract_repo_name(
[x for x in dataset.info.download_checksums.keys()][0]
)
if dataset_id:
# Create the iframe HTML
html = f"""
<iframe
src="https://huggingface.co/datasets/{dataset_id}/embed/viewer"
frameborder="0"
width="100%"
height="560px"
></iframe>
"""
# Save the HTML to a file
visualization_path = os.path.join(
self.uri, f"{name}_viewer.html"
)
with fileio.open(visualization_path, "w") as f:
f.write(html)
visualizations[visualization_path] = VisualizationType.HTML
return visualizations
extract_metadata(self, ds)
Extract metadata from the given Dataset
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds |
Union[datasets.Dataset, datasets.dataset_dict.DatasetDict] |
The |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
The extracted metadata as a dictionary. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the given object is not a |
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def extract_metadata(
self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `Dataset` object.
Args:
ds: The `Dataset` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
Raises:
ValueError: If the given object is not a `Dataset` or `DatasetDict`.
"""
pandas_materializer = PandasMaterializer(self.uri)
if isinstance(ds, Dataset):
return pandas_materializer.extract_metadata(ds.to_pandas())
elif isinstance(ds, DatasetDict):
metadata: Dict[str, Dict[str, "MetadataType"]] = defaultdict(dict)
for dataset_name, dataset in ds.items():
dataset_metadata = pandas_materializer.extract_metadata(
dataset.to_pandas()
)
for key, value in dataset_metadata.items():
metadata[key][dataset_name] = value
return dict(metadata)
raise ValueError(f"Unsupported type {type(ds)}")
load(self, data_type)
Reads Dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Union[Type[datasets.Dataset], Type[datasets.dataset_dict.DatasetDict]] |
The type of the dataset to read. |
required |
Returns:
Type | Description |
---|---|
Union[datasets.Dataset, datasets.dataset_dict.DatasetDict] |
The dataset read from the specified dir. |
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def load(
self, data_type: Union[Type[Dataset], Type[DatasetDict]]
) -> Union[Dataset, DatasetDict]:
"""Reads Dataset.
Args:
data_type: The type of the dataset to read.
Returns:
The dataset read from the specified dir.
"""
temp_dir = mkdtemp()
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_DATASET_DIR),
temp_dir,
)
return load_from_disk(temp_dir)
save(self, ds)
Writes a Dataset to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds |
Union[datasets.Dataset, datasets.dataset_dict.DatasetDict] |
The Dataset to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def save(self, ds: Union[Dataset, DatasetDict]) -> None:
"""Writes a Dataset to the specified dir.
Args:
ds: The Dataset to write.
"""
temp_dir = TemporaryDirectory()
path = os.path.join(temp_dir.name, DEFAULT_DATASET_DIR)
try:
ds.save_to_disk(path)
io_utils.copy_dir(
path,
os.path.join(self.uri, DEFAULT_DATASET_DIR),
)
finally:
fileio.rmtree(temp_dir.name)
save_visualizations(self, ds)
Save visualizations for the dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ds |
Union[datasets.Dataset, datasets.dataset_dict.DatasetDict] |
The Dataset or DatasetDict to visualize. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.enums.VisualizationType] |
A dictionary mapping visualization paths to their types. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the given object is not a |
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def save_visualizations(
self, ds: Union[Dataset, DatasetDict]
) -> Dict[str, VisualizationType]:
"""Save visualizations for the dataset.
Args:
ds: The Dataset or DatasetDict to visualize.
Returns:
A dictionary mapping visualization paths to their types.
Raises:
ValueError: If the given object is not a `Dataset` or `DatasetDict`.
"""
visualizations = {}
if isinstance(ds, Dataset):
datasets = {"default": ds}
elif isinstance(ds, DatasetDict):
datasets = ds
else:
raise ValueError(f"Unsupported type {type(ds)}")
for name, dataset in datasets.items():
# Generate a unique identifier for the dataset
if dataset.info.download_checksums:
dataset_id = extract_repo_name(
[x for x in dataset.info.download_checksums.keys()][0]
)
if dataset_id:
# Create the iframe HTML
html = f"""
<iframe
src="https://huggingface.co/datasets/{dataset_id}/embed/viewer"
frameborder="0"
width="100%"
height="560px"
></iframe>
"""
# Save the HTML to a file
visualization_path = os.path.join(
self.uri, f"{name}_viewer.html"
)
with fileio.open(visualization_path, "w") as f:
f.write(html)
visualizations[visualization_path] = VisualizationType.HTML
return visualizations
extract_repo_name(checksum_str)
Extracts the repo name from the checksum string.
An example of a checksum_str is: "hf://datasets/nyu-mll/glue@bcdcba79d07bc864c1c254ccfcedcce55bcc9a8c/mrpc/train-00000-of-00001.parquet" and the expected output is "nyu-mll/glue".
Parameters:
Name | Type | Description | Default |
---|---|---|---|
checksum_str |
str |
The checksum_str to extract the repo name from. |
required |
Returns:
Type | Description |
---|---|
str |
The extracted repo name. |
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def extract_repo_name(checksum_str: str) -> Optional[str]:
"""Extracts the repo name from the checksum string.
An example of a checksum_str is:
"hf://datasets/nyu-mll/glue@bcdcba79d07bc864c1c254ccfcedcce55bcc9a8c/mrpc/train-00000-of-00001.parquet"
and the expected output is "nyu-mll/glue".
Args:
checksum_str: The checksum_str to extract the repo name from.
Returns:
str: The extracted repo name.
"""
dataset = None
try:
parts = checksum_str.split("/")
if len(parts) >= 4:
# Case: nyu-mll/glue
dataset = f"{parts[3]}/{parts[4].split('@')[0]}"
except Exception: # pylint: disable=broad-except
pass
return dataset
huggingface_pt_model_materializer
Implementation of the Huggingface PyTorch model materializer.
HFPTModelMaterializer (BaseMaterializer)
Materializer to read torch model to and from huggingface pretrained model.
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
class HFPTModelMaterializer(BaseMaterializer):
"""Materializer to read torch model to and from huggingface pretrained model."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (PreTrainedModel,)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL
def load(self, data_type: Type[PreTrainedModel]) -> PreTrainedModel:
"""Reads HFModel.
Args:
data_type: The type of the model to read.
Returns:
The model read from the specified dir.
"""
temp_dir = TemporaryDirectory()
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_PT_MODEL_DIR), temp_dir.name
)
config = AutoConfig.from_pretrained(temp_dir.name)
architecture = config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(temp_dir.name)
def save(self, model: PreTrainedModel) -> None:
"""Writes a Model to the specified dir.
Args:
model: The Torch Model to write.
"""
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
io_utils.copy_dir(
temp_dir.name,
os.path.join(self.uri, DEFAULT_PT_MODEL_DIR),
)
def extract_metadata(
self, model: PreTrainedModel
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `PreTrainedModel` object.
Args:
model: The `PreTrainedModel` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
from zenml.integrations.pytorch.utils import count_module_params
module_param_metadata = count_module_params(model)
return {
**module_param_metadata,
"dtype": DType(str(model.dtype)),
"device": str(model.device),
}
extract_metadata(self, model)
Extract metadata from the given PreTrainedModel
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
transformers.PreTrainedModel |
The |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
The extracted metadata as a dictionary. |
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def extract_metadata(
self, model: PreTrainedModel
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `PreTrainedModel` object.
Args:
model: The `PreTrainedModel` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
from zenml.integrations.pytorch.utils import count_module_params
module_param_metadata = count_module_params(model)
return {
**module_param_metadata,
"dtype": DType(str(model.dtype)),
"device": str(model.device),
}
load(self, data_type)
Reads HFModel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[transformers.PreTrainedModel] |
The type of the model to read. |
required |
Returns:
Type | Description |
---|---|
transformers.PreTrainedModel |
The model read from the specified dir. |
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def load(self, data_type: Type[PreTrainedModel]) -> PreTrainedModel:
"""Reads HFModel.
Args:
data_type: The type of the model to read.
Returns:
The model read from the specified dir.
"""
temp_dir = TemporaryDirectory()
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_PT_MODEL_DIR), temp_dir.name
)
config = AutoConfig.from_pretrained(temp_dir.name)
architecture = config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(temp_dir.name)
save(self, model)
Writes a Model to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
transformers.PreTrainedModel |
The Torch Model to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def save(self, model: PreTrainedModel) -> None:
"""Writes a Model to the specified dir.
Args:
model: The Torch Model to write.
"""
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
io_utils.copy_dir(
temp_dir.name,
os.path.join(self.uri, DEFAULT_PT_MODEL_DIR),
)
huggingface_t5_materializer
Implementation of the Huggingface t5 materializer.
HFT5Materializer (BaseMaterializer)
Base class for huggingface t5 models.
Source code in zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
class HFT5Materializer(BaseMaterializer):
"""Base class for huggingface t5 models."""
SKIP_REGISTRATION: ClassVar[bool] = False
ASSOCIATED_TYPES = (
T5ForConditionalGeneration,
T5Tokenizer,
T5TokenizerFast,
)
def load(
self, data_type: Type[Any]
) -> Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]:
"""Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.
Args:
data_type: A T5ForConditionalGeneration or T5Tokenizer type.
Returns:
A T5ForConditionalGeneration or T5Tokenizer object.
Raises:
ValueError: Unsupported data type used
"""
filepath = self.uri
with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
# Copy files from artifact store to temporary directory
for file in fileio.listdir(filepath):
src = os.path.join(filepath, file)
dst = os.path.join(temp_dir, file)
if fileio.isdir(src):
fileio.makedirs(dst)
for subfile in fileio.listdir(src):
subsrc = os.path.join(src, subfile)
subdst = os.path.join(dst, subfile)
fileio.copy(subsrc, subdst)
else:
fileio.copy(src, dst)
# Load the model or tokenizer from the temporary directory
if data_type in [
T5ForConditionalGeneration,
T5Tokenizer,
T5TokenizerFast,
]:
return data_type.from_pretrained(temp_dir)
else:
raise ValueError(f"Unsupported data type: {data_type}")
def save(
self,
obj: Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast],
) -> None:
"""Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.
Args:
obj: A T5ForConditionalGeneration model or T5Tokenizer.
"""
# Create a temporary directory
with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
# Save the model or tokenizer
obj.save_pretrained(temp_dir)
# Copy the directory to the artifact store
filepath = self.uri
fileio.makedirs(filepath)
for file in os.listdir(temp_dir):
src = os.path.join(temp_dir, file)
dst = os.path.join(filepath, file)
if os.path.isdir(src):
fileio.makedirs(dst)
for subfile in os.listdir(src):
subsrc = os.path.join(src, subfile)
subdst = os.path.join(dst, subfile)
fileio.copy(subsrc, subdst)
else:
fileio.copy(src, dst)
load(self, data_type)
Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
A T5ForConditionalGeneration or T5Tokenizer type. |
required |
Returns:
Type | Description |
---|---|
Union[transformers.T5ForConditionalGeneration, transformers.T5Tokenizer, transformers.T5TokenizerFast] |
A T5ForConditionalGeneration or T5Tokenizer object. |
Exceptions:
Type | Description |
---|---|
ValueError |
Unsupported data type used |
Source code in zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
def load(
self, data_type: Type[Any]
) -> Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast]:
"""Reads a T5ForConditionalGeneration model or T5Tokenizer from a serialized zip file.
Args:
data_type: A T5ForConditionalGeneration or T5Tokenizer type.
Returns:
A T5ForConditionalGeneration or T5Tokenizer object.
Raises:
ValueError: Unsupported data type used
"""
filepath = self.uri
with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
# Copy files from artifact store to temporary directory
for file in fileio.listdir(filepath):
src = os.path.join(filepath, file)
dst = os.path.join(temp_dir, file)
if fileio.isdir(src):
fileio.makedirs(dst)
for subfile in fileio.listdir(src):
subsrc = os.path.join(src, subfile)
subdst = os.path.join(dst, subfile)
fileio.copy(subsrc, subdst)
else:
fileio.copy(src, dst)
# Load the model or tokenizer from the temporary directory
if data_type in [
T5ForConditionalGeneration,
T5Tokenizer,
T5TokenizerFast,
]:
return data_type.from_pretrained(temp_dir)
else:
raise ValueError(f"Unsupported data type: {data_type}")
save(self, obj)
Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
Union[transformers.T5ForConditionalGeneration, transformers.T5Tokenizer, transformers.T5TokenizerFast] |
A T5ForConditionalGeneration model or T5Tokenizer. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_t5_materializer.py
def save(
self,
obj: Union[T5ForConditionalGeneration, T5Tokenizer, T5TokenizerFast],
) -> None:
"""Creates a serialization for a T5ForConditionalGeneration model or T5Tokenizer.
Args:
obj: A T5ForConditionalGeneration model or T5Tokenizer.
"""
# Create a temporary directory
with tempfile.TemporaryDirectory(prefix="zenml-temp-") as temp_dir:
# Save the model or tokenizer
obj.save_pretrained(temp_dir)
# Copy the directory to the artifact store
filepath = self.uri
fileio.makedirs(filepath)
for file in os.listdir(temp_dir):
src = os.path.join(temp_dir, file)
dst = os.path.join(filepath, file)
if os.path.isdir(src):
fileio.makedirs(dst)
for subfile in os.listdir(src):
subsrc = os.path.join(src, subfile)
subdst = os.path.join(dst, subfile)
fileio.copy(subsrc, subdst)
else:
fileio.copy(src, dst)
huggingface_tf_model_materializer
Implementation of the Huggingface TF model materializer.
HFTFModelMaterializer (BaseMaterializer)
Materializer to read Tensorflow model to and from huggingface pretrained model.
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
class HFTFModelMaterializer(BaseMaterializer):
"""Materializer to read Tensorflow model to and from huggingface pretrained model."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (TFPreTrainedModel,)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL
def load(self, data_type: Type[TFPreTrainedModel]) -> TFPreTrainedModel:
"""Reads HFModel.
Args:
data_type: The type of the model to read.
Returns:
The model read from the specified dir.
"""
temp_dir = TemporaryDirectory()
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_TF_MODEL_DIR), temp_dir.name
)
config = AutoConfig.from_pretrained(temp_dir.name)
architecture = "TF" + config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(temp_dir.name)
def save(self, model: TFPreTrainedModel) -> None:
"""Writes a Model to the specified dir.
Args:
model: The TF Model to write.
"""
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
io_utils.copy_dir(
temp_dir.name,
os.path.join(self.uri, DEFAULT_TF_MODEL_DIR),
)
def extract_metadata(
self, model: TFPreTrainedModel
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `PreTrainedModel` object.
Args:
model: The `PreTrainedModel` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
return {
"num_layers": len(model.layers),
"num_params": model.num_parameters(only_trainable=False),
"num_trainable_params": model.num_parameters(only_trainable=True),
}
extract_metadata(self, model)
Extract metadata from the given PreTrainedModel
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
transformers.TFPreTrainedModel |
The |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
The extracted metadata as a dictionary. |
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def extract_metadata(
self, model: TFPreTrainedModel
) -> Dict[str, "MetadataType"]:
"""Extract metadata from the given `PreTrainedModel` object.
Args:
model: The `PreTrainedModel` object to extract metadata from.
Returns:
The extracted metadata as a dictionary.
"""
return {
"num_layers": len(model.layers),
"num_params": model.num_parameters(only_trainable=False),
"num_trainable_params": model.num_parameters(only_trainable=True),
}
load(self, data_type)
Reads HFModel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[transformers.TFPreTrainedModel] |
The type of the model to read. |
required |
Returns:
Type | Description |
---|---|
transformers.TFPreTrainedModel |
The model read from the specified dir. |
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def load(self, data_type: Type[TFPreTrainedModel]) -> TFPreTrainedModel:
"""Reads HFModel.
Args:
data_type: The type of the model to read.
Returns:
The model read from the specified dir.
"""
temp_dir = TemporaryDirectory()
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_TF_MODEL_DIR), temp_dir.name
)
config = AutoConfig.from_pretrained(temp_dir.name)
architecture = "TF" + config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(temp_dir.name)
save(self, model)
Writes a Model to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
transformers.TFPreTrainedModel |
The TF Model to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def save(self, model: TFPreTrainedModel) -> None:
"""Writes a Model to the specified dir.
Args:
model: The TF Model to write.
"""
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
io_utils.copy_dir(
temp_dir.name,
os.path.join(self.uri, DEFAULT_TF_MODEL_DIR),
)
huggingface_tokenizer_materializer
Implementation of the Huggingface tokenizer materializer.
HFTokenizerMaterializer (BaseMaterializer)
Materializer to read tokenizer to and from huggingface tokenizer.
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
class HFTokenizerMaterializer(BaseMaterializer):
"""Materializer to read tokenizer to and from huggingface tokenizer."""
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
PreTrainedTokenizerBase,
)
ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL
def load(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
"""Reads Tokenizer.
Args:
data_type: The type of the tokenizer to read.
Returns:
The tokenizer read from the specified dir.
"""
with TemporaryDirectory() as temp_dir:
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_TOKENIZER_DIR), temp_dir
)
return AutoTokenizer.from_pretrained(temp_dir)
def save(self, tokenizer: Type[Any]) -> None:
"""Writes a Tokenizer to the specified dir.
Args:
tokenizer: The HFTokenizer to write.
"""
with TemporaryDirectory() as temp_dir:
tokenizer.save_pretrained(temp_dir)
io_utils.copy_dir(
temp_dir,
os.path.join(self.uri, DEFAULT_TOKENIZER_DIR),
)
load(self, data_type)
Reads Tokenizer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the tokenizer to read. |
required |
Returns:
Type | Description |
---|---|
transformers.tokenization_utils_base.PreTrainedTokenizerBase |
The tokenizer read from the specified dir. |
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
def load(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
"""Reads Tokenizer.
Args:
data_type: The type of the tokenizer to read.
Returns:
The tokenizer read from the specified dir.
"""
with TemporaryDirectory() as temp_dir:
io_utils.copy_dir(
os.path.join(self.uri, DEFAULT_TOKENIZER_DIR), temp_dir
)
return AutoTokenizer.from_pretrained(temp_dir)
save(self, tokenizer)
Writes a Tokenizer to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tokenizer |
Type[Any] |
The HFTokenizer to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
def save(self, tokenizer: Type[Any]) -> None:
"""Writes a Tokenizer to the specified dir.
Args:
tokenizer: The HFTokenizer to write.
"""
with TemporaryDirectory() as temp_dir:
tokenizer.save_pretrained(temp_dir)
io_utils.copy_dir(
temp_dir,
os.path.join(self.uri, DEFAULT_TOKENIZER_DIR),
)
model_deployers
special
Initialization of the Hugging Face model deployers.
huggingface_model_deployer
Implementation of the Hugging Face Model Deployer.
HuggingFaceModelDeployer (BaseModelDeployer)
Hugging Face endpoint model deployer.
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
class HuggingFaceModelDeployer(BaseModelDeployer):
"""Hugging Face endpoint model deployer."""
NAME: ClassVar[str] = "HuggingFace"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = (
HuggingFaceModelDeployerFlavor
)
@property
def config(self) -> HuggingFaceModelDeployerConfig:
"""Config class for the Hugging Face Model deployer settings class.
Returns:
The configuration.
"""
return cast(HuggingFaceModelDeployerConfig, self._config)
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
Returns:
A validator that checks that the stack contains a remote artifact
store.
"""
def _validate_if_secret_or_token_is_present(
stack: "Stack",
) -> Tuple[bool, str]:
"""Check if secret or token is present in the stack.
Args:
stack: The stack to validate.
Returns:
A tuple with a boolean indicating whether the stack is valid
and a message describing the validation result.
"""
return bool(self.config.token or self.config.secret_name), (
"The Hugging Face model deployer requires either a secret name"
" or a token to be present in the stack."
)
return StackValidator(
custom_validation_function=_validate_if_secret_or_token_is_present,
)
def _create_new_service(
self, id: UUID, timeout: int, config: HuggingFaceServiceConfig
) -> HuggingFaceDeploymentService:
"""Creates a new Hugging FaceDeploymentService.
Args:
id: the UUID of the model to be deployed with Hugging Face model deployer.
timeout: the timeout in seconds to wait for the Hugging Face inference endpoint
to be provisioned and successfully started or updated.
config: the configuration of the model to be deployed with Hugging Face model deployer.
Returns:
The HuggingFaceServiceConfig object that can be used to interact
with the Hugging Face inference endpoint.
"""
# create a new service for the new model
service = HuggingFaceDeploymentService(uuid=id, config=config)
logger.info(
f"Creating an artifact {HUGGINGFACE_SERVICE_ARTIFACT} with service instance attached as metadata."
" If there's an active pipeline and/or model this artifact will be associated with it."
)
service.start(timeout=timeout)
return service
def _clean_up_existing_service(
self,
timeout: int,
force: bool,
existing_service: HuggingFaceDeploymentService,
) -> None:
"""Stop existing services.
Args:
timeout: the timeout in seconds to wait for the Hugging Face
deployment to be stopped.
force: if True, force the service to stop
existing_service: Existing Hugging Face deployment service
"""
# stop the older service
existing_service.stop(timeout=timeout, force=force)
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new Hugging Face deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
Args:
id: the UUID of the model to be deployed with Hugging Face.
config: the configuration of the model to be deployed with Hugging Face.
timeout: the timeout in seconds to wait for the Hugging Face endpoint
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the Hugging Face
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML Hugging Face deployment service object that can be used to
interact with the remote Hugging Face inference endpoint server.
"""
with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(HuggingFaceServiceConfig, config)
# create a new HuggingFaceDeploymentService instance
service = self._create_new_service(
id=id, timeout=timeout, config=config
)
logger.info(
f"Creating a new Hugging Face inference endpoint service: {service}"
)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
}
return service
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Method to stop a model server.
Args:
service: The service to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
Returns:
The stopped service.
"""
service.stop(timeout=timeout, force=force)
return service
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Method to start a model server.
Args:
service: The service to start.
timeout: Timeout in seconds to wait for the service to start.
Returns:
The started service.
"""
service.start(timeout=timeout)
return service
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
service: The service to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
service = cast(HuggingFaceDeploymentService, service)
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "HuggingFaceDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information that might be relevant to the user.
Args:
service_instance: Instance of a HuggingFaceDeploymentService
Returns:
Model server information.
"""
return {
"PREDICTION_URL": service_instance.get_prediction_url(),
"HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
}
config: HuggingFaceModelDeployerConfig
property
readonly
Config class for the Hugging Face Model deployer settings class.
Returns:
Type | Description |
---|---|
HuggingFaceModelDeployerConfig |
The configuration. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a remote artifact store. |
FLAVOR (BaseModelDeployerFlavor)
Hugging Face Endpoint model deployer flavor.
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
class HuggingFaceModelDeployerFlavor(BaseModelDeployerFlavor):
"""Hugging Face Endpoint model deployer flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return HUGGINGFACE_MODEL_DEPLOYER_FLAVOR
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_registry/huggingface.png"
@property
def config_class(self) -> Type[HuggingFaceModelDeployerConfig]:
"""Returns `HuggingFaceModelDeployerConfig` config class.
Returns:
The config class.
"""
return HuggingFaceModelDeployerConfig
@property
def implementation_class(self) -> Type["HuggingFaceModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
HuggingFaceModelDeployer,
)
return HuggingFaceModelDeployer
config_class: Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig]
property
readonly
Returns HuggingFaceModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.huggingface.flavors.huggingface_model_deployer_flavor.HuggingFaceModelDeployerConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[HuggingFaceModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[HuggingFaceModelDeployer] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
get_model_server_info(service_instance)
staticmethod
Return implementation specific information that might be relevant to the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
HuggingFaceDeploymentService |
Instance of a HuggingFaceDeploymentService |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
Model server information. |
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "HuggingFaceDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information that might be relevant to the user.
Args:
service_instance: Instance of a HuggingFaceDeploymentService
Returns:
Model server information.
"""
return {
"PREDICTION_URL": service_instance.get_prediction_url(),
"HEALTH_CHECK_URL": service_instance.get_healthcheck_url(),
}
perform_delete_model(self, service, timeout=300, force=False)
Method to delete all configuration of a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to delete. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
300 |
force |
bool |
If True, force the service to stop. |
False |
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_delete_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
service: The service to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
service = cast(HuggingFaceDeploymentService, service)
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
perform_deploy_model(self, id, config, timeout=300)
Create a new Hugging Face deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
UUID |
the UUID of the model to be deployed with Hugging Face. |
required |
config |
ServiceConfig |
the configuration of the model to be deployed with Hugging Face. |
required |
timeout |
int |
the timeout in seconds to wait for the Hugging Face endpoint to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Hugging Face server is provisioned, without waiting for it to fully start. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML Hugging Face deployment service object that can be used to interact with the remote Hugging Face inference endpoint server. |
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_deploy_model(
self,
id: UUID,
config: ServiceConfig,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new Hugging Face deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
Args:
id: the UUID of the model to be deployed with Hugging Face.
config: the configuration of the model to be deployed with Hugging Face.
timeout: the timeout in seconds to wait for the Hugging Face endpoint
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the Hugging Face
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML Hugging Face deployment service object that can be used to
interact with the remote Hugging Face inference endpoint server.
"""
with track_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
config = cast(HuggingFaceServiceConfig, config)
# create a new HuggingFaceDeploymentService instance
service = self._create_new_service(
id=id, timeout=timeout, config=config
)
logger.info(
f"Creating a new Hugging Face inference endpoint service: {service}"
)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
analytics_handler.metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
}
return service
perform_start_model(self, service, timeout=300)
Method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to start. |
required |
timeout |
int |
Timeout in seconds to wait for the service to start. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The started service. |
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_start_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Method to start a model server.
Args:
service: The service to start.
timeout: Timeout in seconds to wait for the service to start.
Returns:
The started service.
"""
service.start(timeout=timeout)
return service
perform_stop_model(self, service, timeout=300, force=False)
Method to stop a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
The service to stop. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
300 |
force |
bool |
If True, force the service to stop. |
False |
Returns:
Type | Description |
---|---|
BaseService |
The stopped service. |
Source code in zenml/integrations/huggingface/model_deployers/huggingface_model_deployer.py
def perform_stop_model(
self,
service: BaseService,
timeout: int = DEFAULT_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> BaseService:
"""Method to stop a model server.
Args:
service: The service to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
Returns:
The stopped service.
"""
service.stop(timeout=timeout, force=force)
return service
services
special
Initialization of the Hugging Face Service.
huggingface_deployment
Implementation of the Hugging Face Deployment service.
HuggingFaceDeploymentService (BaseDeploymentService)
Hugging Face model deployment service.
Attributes:
Name | Type | Description |
---|---|---|
SERVICE_TYPE |
ClassVar[zenml.services.service_type.ServiceType] |
a service type descriptor with information describing the Hugging Face deployment service class |
config |
HuggingFaceServiceConfig |
service configuration |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
class HuggingFaceDeploymentService(BaseDeploymentService):
"""Hugging Face model deployment service.
Attributes:
SERVICE_TYPE: a service type descriptor with information describing
the Hugging Face deployment service class
config: service configuration
"""
SERVICE_TYPE = ServiceType(
name="huggingface-deployment",
type="model-serving",
flavor="huggingface",
description="Hugging Face inference endpoint prediction service",
)
config: HuggingFaceServiceConfig
status: HuggingFaceServiceStatus = Field(
default_factory=lambda: HuggingFaceServiceStatus()
)
def __init__(self, config: HuggingFaceServiceConfig, **attrs: Any):
"""Initialize the Hugging Face deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
super().__init__(config=config, **attrs)
def get_token(self) -> str:
"""Get the Hugging Face token.
Raises:
ValueError: If token not found.
Returns:
Hugging Face token.
"""
client = Client()
token = None
if self.config.secret_name:
secret = client.get_secret(self.config.secret_name)
token = secret.secret_values["token"]
else:
from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
HuggingFaceModelDeployer,
)
model_deployer = client.active_stack.model_deployer
if not isinstance(model_deployer, HuggingFaceModelDeployer):
raise ValueError(
"HuggingFaceModelDeployer is not active in the stack."
)
token = model_deployer.config.token or None
if not token:
raise ValueError("Token not found.")
return token
@property
def hf_endpoint(self) -> InferenceEndpoint:
"""Get the deployed Hugging Face inference endpoint.
Returns:
Huggingface inference endpoint.
"""
return get_inference_endpoint(
name=self._generate_an_endpoint_name(),
token=self.get_token(),
namespace=self.config.namespace,
)
@property
def prediction_url(self) -> Optional[str]:
"""The prediction URI exposed by the prediction service.
Returns:
The prediction URI exposed by the prediction service, or None if
the service is not yet ready.
"""
return self.hf_endpoint.url if self.is_running else None
@property
def inference_client(self) -> InferenceClient:
"""Get the Hugging Face InferenceClient from Inference Endpoint.
Returns:
Hugging Face inference client.
"""
return self.hf_endpoint.client
def provision(self) -> None:
"""Provision or update remote Hugging Face deployment instance.
Raises:
Exception: If any unexpected error while creating inference endpoint.
"""
try:
# Attempt to create and wait for the inference endpoint
hf_endpoint = create_inference_endpoint(
name=self._generate_an_endpoint_name(),
repository=self.config.repository,
framework=self.config.framework,
accelerator=self.config.accelerator,
instance_size=self.config.instance_size,
instance_type=self.config.instance_type,
region=self.config.region,
vendor=self.config.vendor,
account_id=self.config.account_id,
min_replica=self.config.min_replica,
max_replica=self.config.max_replica,
revision=self.config.revision,
task=self.config.task,
custom_image=self.config.custom_image,
type=self.config.endpoint_type,
token=self.get_token(),
namespace=self.config.namespace,
).wait(timeout=POLLING_TIMEOUT)
except Exception as e:
self.status.update_state(
new_state=ServiceState.ERROR, error=str(e)
)
# Catch-all for any other unexpected errors
raise Exception(
f"An unexpected error occurred while provisioning the Hugging Face inference endpoint: {e}"
)
# Check if the endpoint URL is available after provisioning
if hf_endpoint.url:
logger.info(
f"Hugging Face inference endpoint successfully deployed and available. Endpoint URL: {hf_endpoint.url}"
)
else:
logger.error(
"Failed to start Hugging Face inference endpoint service: No URL available, please check the Hugging Face console for more details."
)
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the Hugging Face deployment.
Returns:
The operational state of the Hugging Face deployment and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
try:
status = self.hf_endpoint.status
if status == InferenceEndpointStatus.RUNNING:
return (ServiceState.ACTIVE, "")
elif status == InferenceEndpointStatus.SCALED_TO_ZERO:
return (
ServiceState.SCALED_TO_ZERO,
"Hugging Face Inference Endpoint is scaled to zero, but still running. It will be started on demand.",
)
elif status == InferenceEndpointStatus.FAILED:
return (
ServiceState.ERROR,
"Hugging Face Inference Endpoint deployment is inactive or not found",
)
elif status == InferenceEndpointStatus.PENDING:
return (ServiceState.PENDING_STARTUP, "")
return (ServiceState.PENDING_STARTUP, "")
except (InferenceEndpointError, HfHubHTTPError):
return (
ServiceState.INACTIVE,
"Hugging Face Inference Endpoint deployment is inactive or not found",
)
def deprovision(self, force: bool = False) -> None:
"""Deprovision the remote Hugging Face deployment instance.
Args:
force: if True, the remote deployment instance will be
forcefully deprovisioned.
"""
try:
self.hf_endpoint.delete()
except HfHubHTTPError:
logger.error(
"Hugging Face Inference Endpoint is deleted or cannot be found."
)
def predict(self, data: "Any", max_new_tokens: int) -> "Any":
"""Make a prediction using the service.
Args:
data: input data
max_new_tokens: Number of new tokens to generate
Returns:
The prediction result.
Raises:
Exception: if the service is not running
NotImplementedError: if task is not supported.
"""
if not self.is_running:
raise Exception(
"Hugging Face endpoint inference service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is not None:
if self.hf_endpoint.task == "text-generation":
result = self.inference_client.task_generation(
data, max_new_tokens=max_new_tokens
)
else:
# TODO: Add support for all different supported tasks
raise NotImplementedError(
"Tasks other than text-generation is not implemented."
)
return result
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
logger.info(
"Hugging Face Endpoints provides access to the logs of "
"your Endpoints through the UI in the “Logs” tab of your Endpoint"
)
return # type: ignore
def _generate_an_endpoint_name(self) -> str:
"""Generate a unique name for the Hugging Face Inference Endpoint.
Returns:
A unique name for the Hugging Face Inference Endpoint.
"""
return (
f"{self.config.service_name}-{str(self.uuid)[:UUID_SLICE_LENGTH]}"
)
hf_endpoint: huggingface_hub.InferenceEndpoint
property
readonly
Get the deployed Hugging Face inference endpoint.
Returns:
Type | Description |
---|---|
huggingface_hub.InferenceEndpoint |
Huggingface inference endpoint. |
inference_client: huggingface_hub.InferenceClient
property
readonly
Get the Hugging Face InferenceClient from Inference Endpoint.
Returns:
Type | Description |
---|---|
huggingface_hub.InferenceClient |
Hugging Face inference client. |
prediction_url: Optional[str]
property
readonly
The prediction URI exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction URI exposed by the prediction service, or None if the service is not yet ready. |
__init__(self, config, **attrs)
special
Initialize the Hugging Face deployment service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
HuggingFaceServiceConfig |
service configuration |
required |
attrs |
Any |
additional attributes to set on the service |
{} |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def __init__(self, config: HuggingFaceServiceConfig, **attrs: Any):
"""Initialize the Hugging Face deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
super().__init__(config=config, **attrs)
check_status(self)
Check the the current operational state of the Hugging Face deployment.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the Hugging Face deployment and a message providing additional information about that state (e.g. a description of the error, if one is encountered). |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the Hugging Face deployment.
Returns:
The operational state of the Hugging Face deployment and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
try:
status = self.hf_endpoint.status
if status == InferenceEndpointStatus.RUNNING:
return (ServiceState.ACTIVE, "")
elif status == InferenceEndpointStatus.SCALED_TO_ZERO:
return (
ServiceState.SCALED_TO_ZERO,
"Hugging Face Inference Endpoint is scaled to zero, but still running. It will be started on demand.",
)
elif status == InferenceEndpointStatus.FAILED:
return (
ServiceState.ERROR,
"Hugging Face Inference Endpoint deployment is inactive or not found",
)
elif status == InferenceEndpointStatus.PENDING:
return (ServiceState.PENDING_STARTUP, "")
return (ServiceState.PENDING_STARTUP, "")
except (InferenceEndpointError, HfHubHTTPError):
return (
ServiceState.INACTIVE,
"Hugging Face Inference Endpoint deployment is inactive or not found",
)
deprovision(self, force=False)
Deprovision the remote Hugging Face deployment instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the remote deployment instance will be forcefully deprovisioned. |
False |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def deprovision(self, force: bool = False) -> None:
"""Deprovision the remote Hugging Face deployment instance.
Args:
force: if True, the remote deployment instance will be
forcefully deprovisioned.
"""
try:
self.hf_endpoint.delete()
except HfHubHTTPError:
logger.error(
"Hugging Face Inference Endpoint is deleted or cannot be found."
)
get_logs(self, follow=False, tail=None)
Retrieve the service logs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
logger.info(
"Hugging Face Endpoints provides access to the logs of "
"your Endpoints through the UI in the “Logs” tab of your Endpoint"
)
return # type: ignore
get_token(self)
Get the Hugging Face token.
Exceptions:
Type | Description |
---|---|
ValueError |
If token not found. |
Returns:
Type | Description |
---|---|
str |
Hugging Face token. |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def get_token(self) -> str:
"""Get the Hugging Face token.
Raises:
ValueError: If token not found.
Returns:
Hugging Face token.
"""
client = Client()
token = None
if self.config.secret_name:
secret = client.get_secret(self.config.secret_name)
token = secret.secret_values["token"]
else:
from zenml.integrations.huggingface.model_deployers.huggingface_model_deployer import (
HuggingFaceModelDeployer,
)
model_deployer = client.active_stack.model_deployer
if not isinstance(model_deployer, HuggingFaceModelDeployer):
raise ValueError(
"HuggingFaceModelDeployer is not active in the stack."
)
token = model_deployer.config.token or None
if not token:
raise ValueError("Token not found.")
return token
predict(self, data, max_new_tokens)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
input data |
required |
max_new_tokens |
int |
Number of new tokens to generate |
required |
Returns:
Type | Description |
---|---|
Any |
The prediction result. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not running |
NotImplementedError |
if task is not supported. |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def predict(self, data: "Any", max_new_tokens: int) -> "Any":
"""Make a prediction using the service.
Args:
data: input data
max_new_tokens: Number of new tokens to generate
Returns:
The prediction result.
Raises:
Exception: if the service is not running
NotImplementedError: if task is not supported.
"""
if not self.is_running:
raise Exception(
"Hugging Face endpoint inference service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is not None:
if self.hf_endpoint.task == "text-generation":
result = self.inference_client.task_generation(
data, max_new_tokens=max_new_tokens
)
else:
# TODO: Add support for all different supported tasks
raise NotImplementedError(
"Tasks other than text-generation is not implemented."
)
return result
provision(self)
Provision or update remote Hugging Face deployment instance.
Exceptions:
Type | Description |
---|---|
Exception |
If any unexpected error while creating inference endpoint. |
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
def provision(self) -> None:
"""Provision or update remote Hugging Face deployment instance.
Raises:
Exception: If any unexpected error while creating inference endpoint.
"""
try:
# Attempt to create and wait for the inference endpoint
hf_endpoint = create_inference_endpoint(
name=self._generate_an_endpoint_name(),
repository=self.config.repository,
framework=self.config.framework,
accelerator=self.config.accelerator,
instance_size=self.config.instance_size,
instance_type=self.config.instance_type,
region=self.config.region,
vendor=self.config.vendor,
account_id=self.config.account_id,
min_replica=self.config.min_replica,
max_replica=self.config.max_replica,
revision=self.config.revision,
task=self.config.task,
custom_image=self.config.custom_image,
type=self.config.endpoint_type,
token=self.get_token(),
namespace=self.config.namespace,
).wait(timeout=POLLING_TIMEOUT)
except Exception as e:
self.status.update_state(
new_state=ServiceState.ERROR, error=str(e)
)
# Catch-all for any other unexpected errors
raise Exception(
f"An unexpected error occurred while provisioning the Hugging Face inference endpoint: {e}"
)
# Check if the endpoint URL is available after provisioning
if hf_endpoint.url:
logger.info(
f"Hugging Face inference endpoint successfully deployed and available. Endpoint URL: {hf_endpoint.url}"
)
else:
logger.error(
"Failed to start Hugging Face inference endpoint service: No URL available, please check the Hugging Face console for more details."
)
HuggingFaceServiceConfig (HuggingFaceBaseConfig, ServiceConfig)
Hugging Face service configurations.
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
class HuggingFaceServiceConfig(HuggingFaceBaseConfig, ServiceConfig):
"""Hugging Face service configurations."""
HuggingFaceServiceStatus (ServiceStatus)
Hugging Face service status.
Source code in zenml/integrations/huggingface/services/huggingface_deployment.py
class HuggingFaceServiceStatus(ServiceStatus):
"""Hugging Face service status."""
steps
special
Initialization for Hugging Face model deployer step.
accelerate_runner
Step function to run any ZenML step using Accelerate.
run_with_accelerate(step_function_top_level=None, **accelerate_launch_kwargs)
Run a function with accelerate.
Accelerate package: https://huggingface.co/docs/accelerate/en/index
Examples:
from zenml import step, pipeline
from zenml.integrations.hugginface.steps import run_with_accelerate
@run_with_accelerate(num_processes=4, multi_gpu=True)
@step
def training_step(some_param: int, ...):
# your training code is below
...
@pipeline
def training_pipeline(some_param: int, ...):
training_step(some_param, ...)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_function_top_level |
Optional[zenml.steps.base_step.BaseStep] |
The step function to run with accelerate [optional].
Used in functional calls like |
None |
accelerate_launch_kwargs |
Any |
A dictionary of arguments to pass along to the
|
{} |
Returns:
Type | Description |
---|---|
Union[Callable[[zenml.steps.base_step.BaseStep], zenml.steps.base_step.BaseStep], zenml.steps.base_step.BaseStep] |
The accelerate-enabled version of the step. |
Source code in zenml/integrations/huggingface/steps/accelerate_runner.py
def run_with_accelerate(
step_function_top_level: Optional[BaseStep] = None,
**accelerate_launch_kwargs: Any,
) -> Union[Callable[[BaseStep], BaseStep], BaseStep]:
"""Run a function with accelerate.
Accelerate package: https://huggingface.co/docs/accelerate/en/index
Example:
```python
from zenml import step, pipeline
from zenml.integrations.hugginface.steps import run_with_accelerate
@run_with_accelerate(num_processes=4, multi_gpu=True)
@step
def training_step(some_param: int, ...):
# your training code is below
...
@pipeline
def training_pipeline(some_param: int, ...):
training_step(some_param, ...)
```
Args:
step_function_top_level: The step function to run with accelerate [optional].
Used in functional calls like `run_with_accelerate(some_func,foo=bar)()`.
accelerate_launch_kwargs: A dictionary of arguments to pass along to the
`accelerate launch` command, including hardware selection, resource
allocation, and training paradigm options. Visit
https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch
for more details.
Returns:
The accelerate-enabled version of the step.
"""
def _decorator(step_function: BaseStep) -> BaseStep:
def _wrapper(
entrypoint: F, accelerate_launch_kwargs: Dict[str, Any]
) -> F:
@functools.wraps(entrypoint)
def inner(*args: Any, **kwargs: Any) -> Any:
if args:
raise ValueError(
"Accelerated steps do not support positional arguments."
)
with create_cli_wrapped_script(
entrypoint, flavor="accelerate"
) as (
script_path,
output_path,
):
commands = [str(script_path.absolute())]
for k, v in kwargs.items():
k = _cli_arg_name(k)
if isinstance(v, bool):
if v:
commands.append(f"--{k}")
elif type(v) in (list, tuple, set):
for each in v:
commands += [f"--{k}", f"{each}"]
else:
commands += [f"--{k}", f"{v}"]
logger.debug(commands)
parser = launch_command_parser()
args = parser.parse_args(commands)
for k, v in accelerate_launch_kwargs.items():
if k in args:
setattr(args, k, v)
else:
logger.warning(
f"You passed in `{k}` as an `accelerate launch` argument, but it was not accepted. "
"Please check https://huggingface.co/docs/accelerate/en/package_reference/cli#accelerate-launch "
"to find out more about supported arguments and retry."
)
try:
launch_command(args)
except Exception as e:
logger.error(
"Accelerate training job failed... See error message for details."
)
raise RuntimeError(
"Accelerate training job failed."
) from e
else:
logger.info(
"Accelerate training job finished successfully."
)
return pickle.load(open(output_path, "rb"))
return cast(F, inner)
try:
get_pipeline_context()
except RuntimeError:
pass
else:
raise RuntimeError(
f"`{run_with_accelerate.__name__}` decorator cannot be used "
"in a functional way with steps, please apply decoration "
"directly to a step instead. This behavior will be also "
"allowed in future, but now it faces technical limitations.\n"
"Example (allowed):\n"
f"@{run_with_accelerate.__name__}(...)\n"
f"def {step_function.name}(...):\n"
" ...\n"
"Example (not allowed):\n"
"def my_pipeline(...):\n"
f" run_with_accelerate({step_function.name},...)(...)\n"
)
setattr(
step_function, "unwrapped_entrypoint", step_function.entrypoint
)
setattr(
step_function,
"entrypoint",
_wrapper(
step_function.entrypoint,
accelerate_launch_kwargs=accelerate_launch_kwargs,
),
)
return step_function
if step_function_top_level:
return _decorator(step_function_top_level)
return _decorator
huggingface_deployer
Implementation of the Hugging Face Deployer step.