Skip to content

Tensorflow

zenml.integrations.tensorflow special

Initialization for TensorFlow integration.

TensorflowIntegration (Integration)

Definition of Tensorflow integration for ZenML.

Source code in zenml/integrations/tensorflow/__init__.py
class TensorflowIntegration(Integration):
    """Definition of Tensorflow integration for ZenML."""

    NAME = TENSORFLOW
    REQUIREMENTS = []

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        # need to import this explicitly to load the Tensorflow file IO support
        # for S3 and other file systems
        if (
            not platform.system() == "Darwin"
            or not platform.machine() == "arm64"
        ):
            import tensorflow_io  # type: ignore [import]

        from zenml.integrations.tensorflow import materializers  # noqa

    @classmethod
    def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
        """Defines platform specific requirements for the integration.

        Args:
            target_os: The target operating system.

        Returns:
            A list of requirements.
        """
        if sys.version_info > (3, 11):
            tf_version = "2.13"
        else:
            # Capping tensorflow to 2.11 for Python 3.10 and below because it
            # is not compatible with Pytorch
            # (see https://github.com/pytorch/pytorch/issues/99637).
            tf_version = "2.11"
        target_os = target_os or platform.system()
        if target_os == "Darwin" and platform.machine() == "arm64":
            requirements = [
                f"tensorflow-macos=={tf_version}",
            ]
        else:
            requirements = [
                f"tensorflow=={tf_version}",
                "tensorflow_io>=0.24.0",
                "protobuf>=3.6.0,<4.0.0",
            ]
        return requirements

activate() classmethod

Activates the integration.

Source code in zenml/integrations/tensorflow/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    # need to import this explicitly to load the Tensorflow file IO support
    # for S3 and other file systems
    if (
        not platform.system() == "Darwin"
        or not platform.machine() == "arm64"
    ):
        import tensorflow_io  # type: ignore [import]

    from zenml.integrations.tensorflow import materializers  # noqa

get_requirements(target_os=None) classmethod

Defines platform specific requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in zenml/integrations/tensorflow/__init__.py
@classmethod
def get_requirements(cls, target_os: Optional[str] = None) -> List[str]:
    """Defines platform specific requirements for the integration.

    Args:
        target_os: The target operating system.

    Returns:
        A list of requirements.
    """
    if sys.version_info > (3, 11):
        tf_version = "2.13"
    else:
        # Capping tensorflow to 2.11 for Python 3.10 and below because it
        # is not compatible with Pytorch
        # (see https://github.com/pytorch/pytorch/issues/99637).
        tf_version = "2.11"
    target_os = target_os or platform.system()
    if target_os == "Darwin" and platform.machine() == "arm64":
        requirements = [
            f"tensorflow-macos=={tf_version}",
        ]
    else:
        requirements = [
            f"tensorflow=={tf_version}",
            "tensorflow_io>=0.24.0",
            "protobuf>=3.6.0,<4.0.0",
        ]
    return requirements

materializers special

Initialization for the TensorFlow materializers.

keras_materializer

Implementation of the TensorFlow Keras materializer.

KerasMaterializer (BaseMaterializer)

Materializer to read/write Keras models.

Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
class KerasMaterializer(BaseMaterializer):
    """Materializer to read/write Keras models."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (keras.Model,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.MODEL

    def load(self, data_type: Type[Any]) -> keras.Model:
        """Reads and returns a Keras model after copying it to temporary path.

        Args:
            data_type: The type of the data to read.

        Returns:
            A tf.keras.Model model.
        """
        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()

        # Copy from artifact store to temporary directory
        io_utils.copy_dir(self.uri, temp_dir.name)

        # Load the model from the temporary directory
        model = keras.models.load_model(temp_dir.name)

        # Cleanup and return
        fileio.rmtree(temp_dir.name)

        return model

    def save(self, model: keras.Model) -> None:
        """Writes a keras model to the artifact store.

        Args:
            model: A tf.keras.Model model.
        """
        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()
        model.save(temp_dir.name)
        io_utils.copy_dir(temp_dir.name, self.uri)

        # Remove the temporary directory
        fileio.rmtree(temp_dir.name)

    def extract_metadata(
        self, model: keras.Model
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `Model` object.

        Args:
            model: The `Model` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return {
            "num_layers": len(model.layers),
            "num_params": count_params(model.weights),
            "num_trainable_params": count_params(model.trainable_weights),
        }
extract_metadata(self, model)

Extract metadata from the given Model object.

Parameters:

Name Type Description Default
model Model

The Model object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
def extract_metadata(
    self, model: keras.Model
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `Model` object.

    Args:
        model: The `Model` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {
        "num_layers": len(model.layers),
        "num_params": count_params(model.weights),
        "num_trainable_params": count_params(model.trainable_weights),
    }
load(self, data_type)

Reads and returns a Keras model after copying it to temporary path.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Model

A tf.keras.Model model.

Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
def load(self, data_type: Type[Any]) -> keras.Model:
    """Reads and returns a Keras model after copying it to temporary path.

    Args:
        data_type: The type of the data to read.

    Returns:
        A tf.keras.Model model.
    """
    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()

    # Copy from artifact store to temporary directory
    io_utils.copy_dir(self.uri, temp_dir.name)

    # Load the model from the temporary directory
    model = keras.models.load_model(temp_dir.name)

    # Cleanup and return
    fileio.rmtree(temp_dir.name)

    return model
save(self, model)

Writes a keras model to the artifact store.

Parameters:

Name Type Description Default
model Model

A tf.keras.Model model.

required
Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
def save(self, model: keras.Model) -> None:
    """Writes a keras model to the artifact store.

    Args:
        model: A tf.keras.Model model.
    """
    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()
    model.save(temp_dir.name)
    io_utils.copy_dir(temp_dir.name, self.uri)

    # Remove the temporary directory
    fileio.rmtree(temp_dir.name)

tf_dataset_materializer

Implementation of the TensorFlow dataset materializer.

TensorflowDatasetMaterializer (BaseMaterializer)

Materializer to read data to and from tf.data.Dataset.

Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
class TensorflowDatasetMaterializer(BaseMaterializer):
    """Materializer to read data to and from tf.data.Dataset."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (tf.data.Dataset,)
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA

    def load(self, data_type: Type[Any]) -> Any:
        """Reads data into tf.data.Dataset.

        Args:
            data_type: The type of the data to read.

        Returns:
            A tf.data.Dataset object.
        """
        temp_dir = tempfile.mkdtemp()
        io_utils.copy_dir(self.uri, temp_dir)
        path = os.path.join(temp_dir, DEFAULT_FILENAME)
        dataset = tf.data.experimental.load(path)
        # Don't delete the temporary directory here as the dataset is lazily
        # loaded and needs to read it when the object gets used
        return dataset

    def save(self, dataset: tf.data.Dataset) -> None:
        """Persists a tf.data.Dataset object.

        Args:
            dataset: The dataset to persist.
        """
        temp_dir = tempfile.TemporaryDirectory()
        path = os.path.join(temp_dir.name, DEFAULT_FILENAME)
        try:
            tf.data.experimental.save(
                dataset, path, compression=None, shard_func=None
            )
            io_utils.copy_dir(temp_dir.name, self.uri)
        finally:
            fileio.rmtree(temp_dir.name)

    def extract_metadata(
        self, dataset: tf.data.Dataset
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given `Dataset` object.

        Args:
            dataset: The `Dataset` object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return {"length": len(dataset)}
extract_metadata(self, dataset)

Extract metadata from the given Dataset object.

Parameters:

Name Type Description Default
dataset DatasetV2

The Dataset object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
def extract_metadata(
    self, dataset: tf.data.Dataset
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given `Dataset` object.

    Args:
        dataset: The `Dataset` object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return {"length": len(dataset)}
load(self, data_type)

Reads data into tf.data.Dataset.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

A tf.data.Dataset object.

Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
def load(self, data_type: Type[Any]) -> Any:
    """Reads data into tf.data.Dataset.

    Args:
        data_type: The type of the data to read.

    Returns:
        A tf.data.Dataset object.
    """
    temp_dir = tempfile.mkdtemp()
    io_utils.copy_dir(self.uri, temp_dir)
    path = os.path.join(temp_dir, DEFAULT_FILENAME)
    dataset = tf.data.experimental.load(path)
    # Don't delete the temporary directory here as the dataset is lazily
    # loaded and needs to read it when the object gets used
    return dataset
save(self, dataset)

Persists a tf.data.Dataset object.

Parameters:

Name Type Description Default
dataset DatasetV2

The dataset to persist.

required
Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
def save(self, dataset: tf.data.Dataset) -> None:
    """Persists a tf.data.Dataset object.

    Args:
        dataset: The dataset to persist.
    """
    temp_dir = tempfile.TemporaryDirectory()
    path = os.path.join(temp_dir.name, DEFAULT_FILENAME)
    try:
        tf.data.experimental.save(
            dataset, path, compression=None, shard_func=None
        )
        io_utils.copy_dir(temp_dir.name, self.uri)
    finally:
        fileio.rmtree(temp_dir.name)