Skip to content

Gcp

zenml.integrations.gcp special

Initialization of the GCP ZenML integration.

The GCP integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores and provides an io module to handle file operations on Google Cloud Storage (GCS).

Additionally, the GCP secrets manager integration submodule provides a way to access the GCP secrets manager from within your ZenML Pipeline runs.

The Vertex AI integration submodule provides a way to run ZenML pipelines in a Vertex AI environment.

GcpIntegration (Integration)

Definition of Google Cloud Platform integration for ZenML.

Source code in zenml/integrations/gcp/__init__.py
class GcpIntegration(Integration):
    """Definition of Google Cloud Platform integration for ZenML."""

    NAME = GCP
    REQUIREMENTS = [
        "kfp==1.8.16",
        "gcsfs",
        "google-cloud-secret-manager",
        "google-cloud-aiplatform>=1.11.0",
        "google-cloud-scheduler>=2.7.3",
        "google-cloud-functions>=1.8.3",
        # google-cloud-bigquery 2.34.4 is not compatible with shapely 2.0.0
        # which was released on 2021-12-21. This is a temporary fix until
        # google-cloud-bigquery is updated.
        "shapely<2.0",
    ]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the GCP integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.gcp.flavors import (
            GCPArtifactStoreFlavor,
            GCPSecretsManagerFlavor,
            VertexOrchestratorFlavor,
            VertexStepOperatorFlavor,
        )

        return [
            VertexOrchestratorFlavor,
            VertexStepOperatorFlavor,
            GCPSecretsManagerFlavor,
            GCPArtifactStoreFlavor,
        ]

flavors() classmethod

Declare the stack component flavors for the GCP integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/gcp/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the GCP integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.gcp.flavors import (
        GCPArtifactStoreFlavor,
        GCPSecretsManagerFlavor,
        VertexOrchestratorFlavor,
        VertexStepOperatorFlavor,
    )

    return [
        VertexOrchestratorFlavor,
        VertexStepOperatorFlavor,
        GCPSecretsManagerFlavor,
        GCPArtifactStoreFlavor,
    ]

artifact_stores special

Initialization of the GCP Artifact Store.

gcp_artifact_store

Implementation of the GCP Artifact Store.

GCPArtifactStore (BaseArtifactStore, AuthenticationMixin)

Artifact Store for Google Cloud Storage based artifacts.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
class GCPArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for Google Cloud Storage based artifacts."""

    _filesystem: Optional[gcsfs.GCSFileSystem] = None

    @property
    def config(self) -> GCPArtifactStoreConfig:
        """Returns the `GCPArtifactStoreConfig` config.

        Returns:
            The configuration.
        """
        return cast(GCPArtifactStoreConfig, self._config)

    @property
    def filesystem(self) -> gcsfs.GCSFileSystem:
        """The gcsfs filesystem to access this artifact store.

        Returns:
            The gcsfs filesystem to access this artifact store.
        """
        if not self._filesystem:
            secret = self.get_authentication_secret(
                expected_schema_type=GCPSecretSchema
            )
            token = secret.get_credential_dict() if secret else None
            self._filesystem = gcsfs.GCSFileSystem(token=token)

        return self._filesystem

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object that can be used to read or write to the file.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )
        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
          in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        return [
            f"{GCP_PATH_PREFIX}{path}"
            for path in self.filesystem.glob(path=pattern)
        ]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path of the directory to list.

        Returns:
            A list of paths of files in the directory.
        """
        path_without_prefix = convert_to_str(path)
        if path_without_prefix.startswith(GCP_PATH_PREFIX):
            path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a file info dict returned by GCP.

            Args:
                file_dict: A file info dict returned by the GCP filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["name"])
            base_name = file_path[len(path_without_prefix) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
            # gcsfs.listdir also returns the root directory, so we filter
            # it out here
            if _extract_basename(dict_)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path of the directory to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path of the directory to create.
        """
        self.filesystem.makedir(path=path)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path of the file to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: the path to get stat info for.

        Returns:
            A dictionary with the stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        for (
            directory,
            subdirectories,
            files,
        ) in self.filesystem.walk(path=top):
            yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files
config: GCPArtifactStoreConfig property readonly

Returns the GCPArtifactStoreConfig config.

Returns:

Type Description
GCPArtifactStoreConfig

The configuration.

filesystem: GCSFileSystem property readonly

The gcsfs filesystem to access this artifact store.

Returns:

Type Description
GCSFileSystem

The gcsfs filesystem to access this artifact store.

copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )
    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
      in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [
        f"{GCP_PATH_PREFIX}{path}"
        for path in self.filesystem.glob(path=pattern)
    ]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths of files in the directory.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path of the directory to list.

    Returns:
        A list of paths of files in the directory.
    """
    path_without_prefix = convert_to_str(path)
    if path_without_prefix.startswith(GCP_PATH_PREFIX):
        path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by GCP.

        Args:
            file_dict: A file info dict returned by the GCP filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path_without_prefix) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
        # gcsfs.listdir also returns the root directory, so we filter
        # it out here
        if _extract_basename(dict_)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to create.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to create.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedir(path=path)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object that can be used to read or write to the file.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object that can be used to read or write to the file.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the file to remove.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

the path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the stat info.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: the path to get stat info for.

    Returns:
        A dictionary with the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files

constants

Constants for the VertexAI integration.

flavors special

GCP integration flavors.

gcp_artifact_store_flavor

GCP artifact store flavor.

GCPArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin) pydantic-model

Configuration for GCP Artifact Store.

Source code in zenml/integrations/gcp/flavors/gcp_artifact_store_flavor.py
class GCPArtifactStoreConfig(
    BaseArtifactStoreConfig, AuthenticationConfigMixin
):
    """Configuration for GCP Artifact Store."""

    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {GCP_PATH_PREFIX}
GCPArtifactStoreFlavor (BaseArtifactStoreFlavor)

Flavor of the GCP artifact store.

Source code in zenml/integrations/gcp/flavors/gcp_artifact_store_flavor.py
class GCPArtifactStoreFlavor(BaseArtifactStoreFlavor):
    """Flavor of the GCP artifact store."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return GCP_ARTIFACT_STORE_FLAVOR

    @property
    def config_class(self) -> Type[GCPArtifactStoreConfig]:
        """Returns GCPArtifactStoreConfig config class.

        Returns:
                The config class.
        """
        return GCPArtifactStoreConfig

    @property
    def implementation_class(self) -> Type["GCPArtifactStore"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.gcp.artifact_stores import GCPArtifactStore

        return GCPArtifactStore
config_class: Type[zenml.integrations.gcp.flavors.gcp_artifact_store_flavor.GCPArtifactStoreConfig] property readonly

Returns GCPArtifactStoreConfig config class.

Returns:

Type Description
Type[zenml.integrations.gcp.flavors.gcp_artifact_store_flavor.GCPArtifactStoreConfig]

The config class.

implementation_class: Type[GCPArtifactStore] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[GCPArtifactStore]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

gcp_secrets_manager_flavor

GCP secrets manager flavor.

GCPSecretsManagerConfig (BaseSecretsManagerConfig) pydantic-model

Configuration for the GCP Secrets Manager.

Attributes:

Name Type Description
project_id str

This is necessary to access the correct GCP project. The project_id of your GCP project space that contains the Secret Manager.

Source code in zenml/integrations/gcp/flavors/gcp_secrets_manager_flavor.py
class GCPSecretsManagerConfig(BaseSecretsManagerConfig):
    """Configuration for the GCP Secrets Manager.

    Attributes:
        project_id: This is necessary to access the correct GCP project.
            The project_id of your GCP project space that contains the Secret
            Manager.
    """

    SUPPORTS_SCOPING: ClassVar[bool] = True
    project_id: str

    @classmethod
    def _validate_scope(
        cls,
        scope: SecretsManagerScope,
        namespace: Optional[str],
    ) -> None:
        """Validate the scope and namespace value.

        Args:
            scope: Scope value.
            namespace: Optional namespace value.
        """
        if namespace:
            validate_gcp_secret_name_or_namespace(namespace)
GCPSecretsManagerFlavor (BaseSecretsManagerFlavor)

Class for the GCPSecretsManagerFlavor.

Source code in zenml/integrations/gcp/flavors/gcp_secrets_manager_flavor.py
class GCPSecretsManagerFlavor(BaseSecretsManagerFlavor):
    """Class for the `GCPSecretsManagerFlavor`."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return GCP_SECRETS_MANAGER_FLAVOR

    @property
    def config_class(self) -> Type[GCPSecretsManagerConfig]:
        """Returns GCPSecretsManagerConfig config class.

        Returns:
                The config class.
        """
        return GCPSecretsManagerConfig

    @property
    def implementation_class(self) -> Type["GCPSecretsManager"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.gcp.secrets_manager import GCPSecretsManager

        return GCPSecretsManager
config_class: Type[zenml.integrations.gcp.flavors.gcp_secrets_manager_flavor.GCPSecretsManagerConfig] property readonly

Returns GCPSecretsManagerConfig config class.

Returns:

Type Description
Type[zenml.integrations.gcp.flavors.gcp_secrets_manager_flavor.GCPSecretsManagerConfig]

The config class.

implementation_class: Type[GCPSecretsManager] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[GCPSecretsManager]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

validate_gcp_secret_name_or_namespace(name)

Validate a secret name or namespace.

A Google secret ID is a string with a maximum length of 255 characters and can contain uppercase and lowercase letters, numerals, and the hyphen (-) and underscore (_) characters. For scoped secrets, we have to limit the size of the name and namespace even further to allow space for both in the Google secret ID.

Given that we also save secret names and namespaces as labels, we are also limited by the limitation that Google imposes on label values: max 63 characters and must only contain lowercase letters, numerals and the hyphen (-) and underscore (_) characters

Parameters:

Name Type Description Default
name str

the secret name or namespace

required

Exceptions:

Type Description
ValueError

if the secret name or namespace is invalid

Source code in zenml/integrations/gcp/flavors/gcp_secrets_manager_flavor.py
def validate_gcp_secret_name_or_namespace(name: str) -> None:
    """Validate a secret name or namespace.

    A Google secret ID is a string with a maximum length of 255 characters
    and can contain uppercase and lowercase letters, numerals, and the
    hyphen (-) and underscore (_) characters. For scoped secrets, we have to
    limit the size of the name and namespace even further to allow space for
    both in the Google secret ID.

    Given that we also save secret names and namespaces as labels, we are
    also limited by the limitation that Google imposes on label values: max
    63 characters and must only contain lowercase letters, numerals
    and the hyphen (-) and underscore (_) characters

    Args:
        name: the secret name or namespace

    Raises:
        ValueError: if the secret name or namespace is invalid
    """
    if not re.fullmatch(r"[a-z0-9_\-]+", name):
        raise ValueError(
            f"Invalid secret name or namespace '{name}'. Must contain "
            f"only lowercase alphanumeric characters and the hyphen (-) and "
            f"underscore (_) characters."
        )

    if name and len(name) > 63:
        raise ValueError(
            f"Invalid secret name or namespace '{name}'. The length is "
            f"limited to maximum 63 characters."
        )

vertex_orchestrator_flavor

Vertex orchestrator flavor.

VertexOrchestratorConfig (BaseOrchestratorConfig, GoogleCredentialsConfigMixin, VertexOrchestratorSettings) pydantic-model

Configuration for the Vertex orchestrator.

Attributes:

Name Type Description
project Optional[str]

GCP project name. If None, the project will be inferred from the environment.

location str

Name of GCP region where the pipeline job will be executed. Vertex AI Pipelines is available in the following regions: https://cloud.google.com/vertex-ai/docs/general/locations#feature -availability

pipeline_root Optional[str]

a Cloud Storage URI that will be used by the Vertex AI Pipelines. If not provided but the artifact store in the stack used to execute the pipeline is a zenml.integrations.gcp.artifact_stores.GCPArtifactStore, then a subdirectory of the artifact store will be used.

encryption_spec_key_name Optional[str]

The Cloud KMS resource identifier of the customer managed encryption key used to protect the job. Has the form: projects/<PRJCT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY> . The key needs to be in the same region as where the compute resource is created.

workload_service_account Optional[str]

the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. If not provided, the default service account will be used.

network Optional[str]

the full name of the Compute Engine Network to which the job should be peered. For example, projects/12345/global/networks/myVPC If not provided, the job will not be peered with any network.

cpu_limit Optional[str]

The maximum CPU limit for this operator. This string value can be a number (integer value for number of CPUs) as string, or a number followed by "m", which means 1/1000. You can specify at most 96 CPUs. (see. https://cloud.google.com/vertex-ai/docs/pipelines/machine-types)

memory_limit Optional[str]

The maximum memory limit for this operator. This string value can be a number, or a number followed by "K" (kilobyte), "M" (megabyte), or "G" (gigabyte). At most 624GB is supported.

gpu_limit Optional[int]

The GPU limit (positive number) for the operator. For more information about GPU resources, see: https://cloud.google.com/vertex-ai/docs/training/configure-compute#specifying_gpus

Source code in zenml/integrations/gcp/flavors/vertex_orchestrator_flavor.py
class VertexOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig,
    GoogleCredentialsConfigMixin,
    VertexOrchestratorSettings,
):
    """Configuration for the Vertex orchestrator.

    Attributes:
        project: GCP project name. If `None`, the project will be inferred from
            the environment.
        location: Name of GCP region where the pipeline job will be executed.
            Vertex AI Pipelines is available in the following regions:
            https://cloud.google.com/vertex-ai/docs/general/locations#feature
            -availability
        pipeline_root: a Cloud Storage URI that will be used by the Vertex AI
            Pipelines. If not provided but the artifact store in the stack used
            to execute the pipeline is a
            `zenml.integrations.gcp.artifact_stores.GCPArtifactStore`,
            then a subdirectory of the artifact store will be used.
        encryption_spec_key_name: The Cloud KMS resource identifier of the
            customer managed encryption key used to protect the job. Has the form:
            `projects/<PRJCT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY>`
            . The key needs to be in the same region as where the compute
            resource is created.
        workload_service_account: the service account for workload run-as
            account. Users submitting jobs must have act-as permission on this
            run-as account.
            If not provided, the default service account will be used.
        network: the full name of the Compute Engine Network to which the job
            should be peered. For example, `projects/12345/global/networks/myVPC`
            If not provided, the job will not be peered with any network.
        cpu_limit: The maximum CPU limit for this operator. This string value
            can be a number (integer value for number of CPUs) as string,
            or a number followed by "m", which means 1/1000. You can specify
            at most 96 CPUs.
            (see. https://cloud.google.com/vertex-ai/docs/pipelines/machine-types)
        memory_limit: The maximum memory limit for this operator. This string
            value can be a number, or a number followed by "K" (kilobyte),
            "M" (megabyte), or "G" (gigabyte). At most 624GB is supported.
        gpu_limit: The GPU limit (positive number) for the operator.
            For more information about GPU resources, see:
            https://cloud.google.com/vertex-ai/docs/training/configure-compute#specifying_gpus
    """

    project: Optional[str] = None
    location: str
    pipeline_root: Optional[str] = None
    encryption_spec_key_name: Optional[str] = None
    workload_service_account: Optional[str] = None
    network: Optional[str] = None

    cpu_limit: Optional[str] = None
    memory_limit: Optional[str] = None
    gpu_limit: Optional[int] = None

    _resource_deprecation = deprecation_utils.deprecate_pydantic_attributes(
        "cpu_limit", "memory_limit", "gpu_limit"
    )

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

VertexOrchestratorFlavor (BaseOrchestratorFlavor)

Vertex Orchestrator flavor.

Source code in zenml/integrations/gcp/flavors/vertex_orchestrator_flavor.py
class VertexOrchestratorFlavor(BaseOrchestratorFlavor):
    """Vertex Orchestrator flavor."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return GCP_VERTEX_ORCHESTRATOR_FLAVOR

    @property
    def config_class(self) -> Type[VertexOrchestratorConfig]:
        """Returns VertexOrchestratorConfig config class.

        Returns:
                The config class.
        """
        return VertexOrchestratorConfig

    @property
    def implementation_class(self) -> Type["VertexOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        from zenml.integrations.gcp.orchestrators import VertexOrchestrator

        return VertexOrchestrator
config_class: Type[zenml.integrations.gcp.flavors.vertex_orchestrator_flavor.VertexOrchestratorConfig] property readonly

Returns VertexOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.gcp.flavors.vertex_orchestrator_flavor.VertexOrchestratorConfig]

The config class.

implementation_class: Type[VertexOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[VertexOrchestrator]

Implementation class for this flavor.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

VertexOrchestratorSettings (BaseSettings) pydantic-model

Settings for the Vertex orchestrator.

Attributes:

Name Type Description
synchronous bool

If True, running a pipeline using this orchestrator will block until all steps finished running on Vertex AI Pipelines service.

labels Dict[str, str]

Labels to assign to the pipeline job.

node_selector_constraint Optional[Tuple[str, str]]

Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels. For example a GPU type can be providing by one of the following tuples: - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_A100") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_K80") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P4") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P100") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_T4") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_V100") Hint: the selected region (location) must provide the requested accelerator (see https://cloud.google.com/compute/docs/gpus/gpu-regions-zones).

pod_settings Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings]

Pod settings to apply.

Source code in zenml/integrations/gcp/flavors/vertex_orchestrator_flavor.py
class VertexOrchestratorSettings(BaseSettings):
    """Settings for the Vertex orchestrator.

    Attributes:
        synchronous: If `True`, running a pipeline using this orchestrator will
            block until all steps finished running on Vertex AI Pipelines
            service.
        labels: Labels to assign to the pipeline job.
        node_selector_constraint: Each constraint is a key-value pair label.
            For the container to be eligible to run on a node, the node must have
            each of the constraints appeared as labels.
            For example a GPU type can be providing by one of the following tuples:
                - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_A100")
                - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_K80")
                - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P4")
                - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P100")
                - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_T4")
                - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_V100")
            Hint: the selected region (location) must provide the requested accelerator
            (see https://cloud.google.com/compute/docs/gpus/gpu-regions-zones).
        pod_settings: Pod settings to apply.
    """

    labels: Dict[str, str] = {}
    synchronous: bool = False
    node_selector_constraint: Optional[Tuple[str, str]] = None
    pod_settings: Optional[KubernetesPodSettings] = None

    _node_selector_deprecation = (
        deprecation_utils.deprecate_pydantic_attributes(
            "node_selector_constraint"
        )
    )

vertex_step_operator_flavor

Vertex step operator flavor.

VertexStepOperatorConfig (BaseStepOperatorConfig, GoogleCredentialsConfigMixin, VertexStepOperatorSettings) pydantic-model

Configuration for the Vertex step operator.

Attributes:

Name Type Description
region str

Region name, e.g., europe-west1.

project Optional[str]

GCP project name. If left None, inferred from the environment.

encryption_spec_key_name Optional[str]

Encryption spec key name.

Source code in zenml/integrations/gcp/flavors/vertex_step_operator_flavor.py
class VertexStepOperatorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseStepOperatorConfig,
    GoogleCredentialsConfigMixin,
    VertexStepOperatorSettings,
):
    """Configuration for the Vertex step operator.

    Attributes:
        region: Region name, e.g., `europe-west1`.
        project: GCP project name. If left None, inferred from the
            environment.
        encryption_spec_key_name: Encryption spec key name.
    """

    region: str
    project: Optional[str] = None

    # customer managed encryption key resource name
    # will be applied to all Vertex AI resources if set
    encryption_spec_key_name: Optional[str] = None

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

VertexStepOperatorFlavor (BaseStepOperatorFlavor)

Vertex Step Operator flavor.

Source code in zenml/integrations/gcp/flavors/vertex_step_operator_flavor.py
class VertexStepOperatorFlavor(BaseStepOperatorFlavor):
    """Vertex Step Operator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return GCP_VERTEX_STEP_OPERATOR_FLAVOR

    @property
    def config_class(self) -> Type[VertexStepOperatorConfig]:
        """Returns `VertexStepOperatorConfig` config class.

        Returns:
                The config class.
        """
        return VertexStepOperatorConfig

    @property
    def implementation_class(self) -> Type["VertexStepOperator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.gcp.step_operators import VertexStepOperator

        return VertexStepOperator
config_class: Type[zenml.integrations.gcp.flavors.vertex_step_operator_flavor.VertexStepOperatorConfig] property readonly

Returns VertexStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.gcp.flavors.vertex_step_operator_flavor.VertexStepOperatorConfig]

The config class.

implementation_class: Type[VertexStepOperator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[VertexStepOperator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

VertexStepOperatorSettings (BaseSettings) pydantic-model

Settings for the Vertex step operator.

Attributes:

Name Type Description
accelerator_type Optional[str]

Defines which accelerator (GPU, TPU) is used for the job. Check out out this table to see which accelerator type and count are compatible with your chosen machine type: https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table.

accelerator_count int

Defines number of accelerators to be used for the job. Check out out this table to see which accelerator type and count are compatible with your chosen machine type: https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table.

machine_type str

Machine type specified here https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types.

Source code in zenml/integrations/gcp/flavors/vertex_step_operator_flavor.py
class VertexStepOperatorSettings(BaseSettings):
    """Settings for the Vertex step operator.

    Attributes:
        accelerator_type: Defines which accelerator (GPU, TPU) is used for the
            job. Check out out this table to see which accelerator
            type and count are compatible with your chosen machine type:
            https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table.
        accelerator_count: Defines number of accelerators to be used for the
            job. Check out out this table to see which accelerator
            type and count are compatible with your chosen machine type:
            https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table.
        machine_type: Machine type specified here
            https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types.

    """

    accelerator_type: Optional[str] = None
    accelerator_count: int = 0
    machine_type: str = "n1-standard-4"

google_cloud_function

Utils for the Google Cloud Functions API.

create_cloud_function(directory_path, upload_path, project, location, function_name, credentials=None, timeout=1800)

Create google cloud function from specified directory path.

Parameters:

Name Type Description Default
directory_path str

Local path to directory where function code resides.

required
upload_path str

GCS path where to upload the function code.

required
project str

GCP project ID.

required
location str

GCP location name.

required
function_name str

Name of the function to create.

required
credentials Optional[Credentials]

Credentials to use for GCP services.

None
timeout int

Timeout in seconds.

1800

Returns:

Type Description
str

URI of the created cloud function.

Exceptions:

Type Description
TimeoutError

If function times out.

Source code in zenml/integrations/gcp/google_cloud_function.py
def create_cloud_function(
    directory_path: str,
    upload_path: str,
    project: str,
    location: str,
    function_name: str,
    credentials: Optional["Credentials"] = None,
    timeout: int = 1800,
) -> str:
    """Create google cloud function from specified directory path.

    Args:
        directory_path: Local path to directory where function code resides.
        upload_path: GCS path where to upload the function code.
        project: GCP project ID.
        location: GCP location name.
        function_name: Name of the function to create.
        credentials: Credentials to use for GCP services.
        timeout: Timeout in seconds.

    Returns:
        str: URI of the created cloud function.

    Raises:
        TimeoutError: If function times out.
    """
    sanitized_function_name = function_name.replace("_", "-")
    parent = f"projects/{project}/locations/{location}"
    function_full_name = f"{parent}/functions/{sanitized_function_name}"
    logger.info(f"Creating Google Cloud Function: {function_full_name}")

    storage_source = upload_directory(directory_path, upload_path)

    # Make the request
    get_cloud_functions_api(credentials=credentials).create_function(
        request=CreateFunctionRequest(
            parent=parent,
            function_id=sanitized_function_name,
            function=Function(
                name=function_full_name,
                build_config=BuildConfig(
                    entry_point="trigger_vertex_job",
                    runtime="python38",
                    source=Source(storage_source=storage_source),
                ),
            ),
        )
    )

    state = Function.State.DEPLOYING
    logger.info(
        "Creating function... This might take a few minutes. "
        "Please do not exit the program at this point..."
    )

    start_time = time.time()
    while state == Function.State.DEPLOYING:
        response = get_cloud_functions_api(
            credentials=credentials
        ).get_function(request=GetFunctionRequest(name=function_full_name))
        state = response.state
        logger.info("Still creating... sleeping for 5 seconds...")
        time.sleep(5)

        if time.time() - start_time > timeout:
            raise TimeoutError("Timed out waiting for function to deploy!")

    logger.info(f"Done! Function available at {response.service_config.uri}")
    return str(response.service_config.uri)

get_cloud_functions_api(credentials=None)

Gets the cloud functions API resource client.

Parameters:

Name Type Description Default
credentials Optional[Credentials]

Google cloud credentials.

None

Returns:

Type Description
FunctionServiceClient

Cloud Functions V2 Client.

Source code in zenml/integrations/gcp/google_cloud_function.py
def get_cloud_functions_api(
    credentials: Optional["Credentials"] = None,
) -> functions_v2.FunctionServiceClient:
    """Gets the cloud functions API resource client.

    Args:
        credentials: Google cloud credentials.

    Returns:
        Cloud Functions V2 Client.
    """
    return functions_v2.FunctionServiceClient(credentials=credentials)

upload_directory(directory_path, upload_path)

Uploads local directory to remote one.

Parameters:

Name Type Description Default
upload_path str

GCS path where to upload the zipped function code.

required
directory_path str

Local path of directory to upload.

required

Returns:

Type Description
Storage source (https

//cloud.google.com/functions/docs/reference/rest/v2/projects.locations.functions#StorageSource).

Source code in zenml/integrations/gcp/google_cloud_function.py
def upload_directory(
    directory_path: str,
    upload_path: str,
) -> StorageSource:
    """Uploads local directory to remote one.

    Args:
        upload_path: GCS path where to upload the zipped function code.
        directory_path: Local path of directory to upload.

    Returns:
        Storage source (https://cloud.google.com/functions/docs/reference/rest/v2/projects.locations.functions#StorageSource).
    """
    with tempfile.NamedTemporaryFile(delete=False) as f:
        with open(f.name, "wb") as data:
            with zipfile.ZipFile(data, "w", zipfile.ZIP_DEFLATED) as archive:
                zipdir(directory_path, archive)
            data.seek(0)

    # Copy and remove
    fileio.copy(f.name, upload_path, overwrite=True)
    fileio.remove(f.name)

    # Split the path by "/" character
    bucket, object_path = upload_path.replace("gs://", "").split(
        "/", maxsplit=1
    )

    return StorageSource(
        bucket=bucket,
        object_=object_path,
    )

zipdir(path, ziph)

Zips a directory using an Zipfile object.

Parameters:

Name Type Description Default
path str

Path to zip directory to.

required
ziph ZipFile

A zipfile.Zipfile file object.

required
Source code in zenml/integrations/gcp/google_cloud_function.py
def zipdir(path: str, ziph: zipfile.ZipFile) -> None:
    """Zips a directory using an Zipfile object.

    Args:
        path: Path to zip directory to.
        ziph: A `zipfile.Zipfile` file object.
    """
    for root, _, files in os.walk(path):
        for file in files:
            if file != "__init__.py":
                ziph.write(os.path.join(root, file), file)

google_cloud_scheduler

Utils for the Google Cloud Scheduler API.

create_scheduler_job(project, region, http_uri, body, credentials=None, schedule='* * * * *', time_zone='Etc/UTC')

Creates a Google Cloud Scheduler job.

Job periodically sends POST request to the specified HTTP URI on a schedule.

Parameters:

Name Type Description Default
project str

GCP project ID.

required
region str

GCP region.

required
http_uri str

HTTP URI of the cloud function to call.

required
body Dict[str, Union[Dict[str, str], bool, str]]

The body of values to send to the cloud function in the POST call.

required
schedule str

Cron expression of the schedule. Defaults to " * * *".

'* * * * *'
time_zone str

Time zone of the schedule. Defaults to "Etc/UTC".

'Etc/UTC'
credentials Optional[Credentials]

Credentials to use for GCP services.

None
Source code in zenml/integrations/gcp/google_cloud_scheduler.py
def create_scheduler_job(
    project: str,
    region: str,
    http_uri: str,
    body: Dict[str, Union[Dict[str, str], bool, str, None]],
    credentials: Optional["Credentials"] = None,
    schedule: str = "* * * * *",
    time_zone: str = "Etc/UTC",
) -> None:
    """Creates a Google Cloud Scheduler job.

    Job periodically sends POST request to the specified HTTP URI on a schedule.

    Args:
        project: GCP project ID.
        region: GCP region.
        http_uri: HTTP URI of the cloud function to call.
        body: The body of values to send to the cloud function in the POST call.
        schedule: Cron expression of the schedule. Defaults to "* * * * *".
        time_zone: Time zone of the schedule. Defaults to "Etc/UTC".
        credentials: Credentials to use for GCP services.
    """
    # Create a client.
    client = scheduler.CloudSchedulerClient(credentials=credentials)

    # Construct the fully qualified location path.
    parent = f"projects/{project}/locations/{region}"

    # Use the client to send the job creation request.
    job = client.create_job(
        request=CreateJobRequest(
            parent=parent,
            job=Job(
                http_target=HttpTarget(
                    uri=http_uri,
                    body=json.dumps(body).encode(),
                    http_method=HttpMethod.POST,
                    oidc_token=OidcToken(
                        service_account_email=credentials.signer_email
                        if credentials and hasattr(credentials, "signer_email")
                        else None
                    ),
                ),
                schedule=schedule,
                time_zone=time_zone,
            ),
        )
    )

    logging.debug(f"Created scheduler job. Response: {job}")

google_credentials_mixin

Implementation of the Google credentials mixin.

GoogleCredentialsConfigMixin (StackComponentConfig) pydantic-model

Config mixin for Google Cloud Platform credentials.

Attributes:

Name Type Description
service_account_path Optional[str]

path to the service account credentials file to be used for authentication. If not provided, the default credentials will be used.

Source code in zenml/integrations/gcp/google_credentials_mixin.py
class GoogleCredentialsConfigMixin(StackComponentConfig):
    """Config mixin for Google Cloud Platform credentials.

    Attributes:
        service_account_path: path to the service account credentials file to be
            used for authentication. If not provided, the default credentials
            will be used.
    """

    service_account_path: Optional[str] = None

GoogleCredentialsMixin (StackComponent)

StackComponent mixin to get Google Cloud Platform credentials.

Source code in zenml/integrations/gcp/google_credentials_mixin.py
class GoogleCredentialsMixin(StackComponent):
    """StackComponent mixin to get Google Cloud Platform credentials."""

    @property
    def config(self) -> GoogleCredentialsConfigMixin:
        """Returns the `GoogleCredentialsConfigMixin` config.

        Returns:
            The configuration.
        """
        return cast(GoogleCredentialsConfigMixin, self._config)

    def _get_authentication(self) -> Tuple["Credentials", str]:
        """Get GCP credentials and the project ID associated with the credentials.

        If `service_account_path` is provided, then the credentials will be
        loaded from the file at that path. Otherwise, the default credentials
        will be used.

        Returns:
            A tuple containing the credentials and the project ID associated to
            the credentials.
        """
        from google.auth import default, load_credentials_from_file

        if self.config.service_account_path:
            credentials, project_id = load_credentials_from_file(
                self.config.service_account_path
            )
        else:
            credentials, project_id = default()
        return credentials, project_id
config: GoogleCredentialsConfigMixin property readonly

Returns the GoogleCredentialsConfigMixin config.

Returns:

Type Description
GoogleCredentialsConfigMixin

The configuration.

orchestrators special

Initialization for the VertexAI orchestrator.

vertex_orchestrator

Implementation of the VertexAI orchestrator.

VertexOrchestrator (BaseOrchestrator, GoogleCredentialsMixin)

Orchestrator responsible for running pipelines on Vertex AI.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
class VertexOrchestrator(BaseOrchestrator, GoogleCredentialsMixin):
    """Orchestrator responsible for running pipelines on Vertex AI."""

    _pipeline_root: str

    @property
    def config(self) -> VertexOrchestratorConfig:
        """Returns the `VertexOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(VertexOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Vertex orchestrator.

        Returns:
            The settings class.
        """
        return VertexOrchestratorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates that the stack contains a container registry.

        Also validates that the artifact store is not local.

        Returns:
            A StackValidator instance.
        """

        def _validate_stack_requirements(stack: "Stack") -> Tuple[bool, str]:
            """Validates that all the stack components are not local.

            Args:
                stack: The stack to validate.

            Returns:
                A tuple of (is_valid, error_message).
            """
            # Validate that the container registry is not local.
            container_registry = stack.container_registry
            if container_registry and container_registry.config.is_local:
                return False, (
                    f"The Vertex orchestrator does not support local "
                    f"container registries. You should replace the component '"
                    f"{container_registry.name}' "
                    f"{container_registry.type.value} to a remote one."
                )

            # Validate that the rest of the components are not local.
            for stack_comp in stack.components.values():
                # For Forward compatibility a list of components is returned,
                # but only the first item is relevant for now
                # TODO: [server] make sure the ComponentModel actually has
                #  a local_path property or implement similar check
                local_path = stack_comp.local_path
                if not local_path:
                    continue
                return False, (
                    f"The '{stack_comp.name}' {stack_comp.type.value} is a "
                    f"local stack component. The Vertex AI Pipelines "
                    f"orchestrator requires that all the components in the "
                    f"stack used to execute the pipeline have to be not local, "
                    f"because there is no way for Vertex to connect to your "
                    f"local machine. You should use a flavor of "
                    f"{stack_comp.type.value} other than '"
                    f"{stack_comp.flavor}'."
                )

            # If the `pipeline_root` has not been defined in the orchestrator
            # configuration, and the artifact store is not a GCP artifact store,
            # then raise an error.
            if (
                not self.config.pipeline_root
                and stack.artifact_store.flavor != GCP_ARTIFACT_STORE_FLAVOR
            ):
                return False, (
                    f"The attribute `pipeline_root` has not been set and it "
                    f"cannot be generated using the path of the artifact store "
                    f"because it is not a "
                    f"`zenml.integrations.gcp.artifact_store.GCPArtifactStore`."
                    f" To solve this issue, set the `pipeline_root` attribute "
                    f"manually executing the following command: "
                    f"`zenml orchestrator update {stack.orchestrator.name} "
                    f'--pipeline_root="<Cloud Storage URI>"`.'
                )

            return True, ""

        return StackValidator(
            required_components={StackComponentType.CONTAINER_REGISTRY},
            custom_validation_function=_validate_stack_requirements,
        )

    @property
    def root_directory(self) -> str:
        """Returns path to the root directory for files for this orchestrator.

        Returns:
            The path to the root directory for all files concerning this
            orchestrator.
        """
        return os.path.join(
            get_global_config_directory(), "vertex", str(self.id)
        )

    @property
    def pipeline_directory(self) -> str:
        """Returns path to directory where kubeflow pipelines files are stored.

        Returns:
            Path to the pipeline directory.
        """
        return os.path.join(self.root_directory, "pipelines")

    def _get_authentication(self) -> Tuple["Credentials", str]:
        """Get GCP credentials and the project ID associated with the credentials.

        This function is the same as the super function except that it also checks
        against the value of the `config` class of this orchestrator.

        Returns:
            A tuple containing the credentials and the project ID associated to
            the credentials.
        """
        credentials, project_id = super()._get_authentication()
        if self.config.project and self.config.project != project_id:
            logger.warning(
                "Authenticated with project `%s`, but this orchestrator is "
                "configured to use the project `%s`.",
                project_id,
                self.config.project,
            )

        # If the project was set in the configuration, use it. Otherwise, use
        # the project that was used to authenticate.
        project_id = self.config.project if self.config.project else project_id
        return credentials, project_id

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.

        Raises:
            ValueError: If `cron_expression` is not in passed Schedule.
        """
        if deployment.schedule:
            if (
                deployment.schedule.catchup
                or deployment.schedule.start_time
                or deployment.schedule.end_time
                or deployment.schedule.interval_second
            ):
                logger.warning(
                    "Vertex orchestrator only uses schedules with the "
                    "`cron_expression` property. All other properties "
                    "are ignored."
                )
            if deployment.schedule.cron_expression is None:
                raise ValueError(
                    "Property `cron_expression` must be set when passing "
                    "schedule to a Vertex orchestrator."
                )

        docker_image_builder = PipelineDockerImageBuilder()
        repo_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)

    def _configure_container_resources(
        self,
        container_op: dsl.ContainerOp,
        resource_settings: "ResourceSettings",
        node_selector_constraint: Optional[Tuple[str, str]] = None,
    ) -> None:
        """Adds resource requirements to the container.

        Args:
            container_op: The kubeflow container operation to configure.
            resource_settings: The resource settings to use for this
                container.
            node_selector_constraint: Node selector constraint to apply to
                the container.
        """
        # Set optional CPU, RAM and GPU constraints for the pipeline

        cpu_limit = resource_settings.cpu_count or self.config.cpu_limit

        if cpu_limit is not None:
            container_op = container_op.set_cpu_limit(str(cpu_limit))

        memory_limit = (
            resource_settings.memory[:-1]
            if resource_settings.memory
            else self.config.memory_limit
        )
        if memory_limit is not None:
            container_op = container_op.set_memory_limit(memory_limit)

        gpu_limit = (
            resource_settings.gpu_count
            if resource_settings.gpu_count is not None
            else self.config.gpu_limit
        )
        if gpu_limit is not None and gpu_limit > 0:
            container_op = container_op.set_gpu_limit(gpu_limit)

        if node_selector_constraint:
            constraint_label, value = node_selector_constraint
            if not (
                constraint_label
                == GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                and gpu_limit == 0
            ):
                container_op.add_node_selector_constraint(
                    constraint_label, value
                )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Creates a KFP JSON pipeline.

        # noqa: DAR402

        This is an intermediary representation of the pipeline which is then
        deployed to Vertex AI Pipelines service.

        How it works:
        -------------
        Before this method is called the `prepare_pipeline_deployment()` method
        builds a Docker image that contains the code for the pipeline, all steps
        the context around these files.

        Based on this Docker image a callable is created which builds
        container_ops for each step (`_construct_kfp_pipeline`). The function
        `kfp.components.load_component_from_text` is used to create the
        `ContainerOp`, because using the `dsl.ContainerOp` class directly is
        deprecated when using the Kubeflow SDK v2. The step entrypoint command
        with the entrypoint arguments is the command that will be executed by
        the container created using the previously created Docker image.

        This callable is then compiled into a JSON file that is used as the
        intermediary representation of the Kubeflow pipeline.

        This file then is submitted to the Vertex AI Pipelines service for
        execution.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.

        Raises:
            ValueError: If the attribute `pipeline_root` is not set and it
                can be not generated using the path of the artifact store in the
                stack because it is not a
                `zenml.integrations.gcp.artifact_store.GCPArtifactStore`. Also gets
                raised if attempting to schedule pipeline run without using the
                `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
        """
        orchestrator_run_name = get_orchestrator_run_name(
            pipeline_name=deployment.pipeline.name
        )
        # If the `pipeline_root` has not been defined in the orchestrator
        # configuration,
        # try to create it from the artifact store if it is a
        # `GCPArtifactStore`.
        if not self.config.pipeline_root:
            artifact_store = stack.artifact_store
            self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{deployment.pipeline.name}/{orchestrator_run_name}"
            logger.info(
                "The attribute `pipeline_root` has not been set in the "
                "orchestrator configuration. One has been generated "
                "automatically based on the path of the `GCPArtifactStore` "
                "artifact store in the stack used to execute the pipeline. "
                "The generated `pipeline_root` is `%s`.",
                self._pipeline_root,
            )
        else:
            self._pipeline_root = self.config.pipeline_root

        image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]

        def _construct_kfp_pipeline() -> None:
            """Create a `ContainerOp` for each step.

            This should contain the name of the Docker image and configures the
            entrypoint of the Docker image to run the step.

            Additionally, this gives each `ContainerOp` information about its
            direct downstream steps.

            If this callable is passed to the `compile()` method of
            `KFPV2Compiler` all `dsl.ContainerOp` instances will be
            automatically added to a singular `dsl.Pipeline` instance.
            """
            command = StepEntrypointConfiguration.get_entrypoint_command()
            step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}

            for step_name, step in deployment.steps.items():
                arguments = (
                    StepEntrypointConfiguration.get_entrypoint_arguments(
                        step_name=step_name,
                    )
                )

                # Create the `ContainerOp` for the step. Using the
                # `dsl.ContainerOp`
                # class directly is deprecated when using the Kubeflow SDK v2.
                container_op = kfp.components.load_component_from_text(
                    f"""
                    name: {step.config.name}
                    implementation:
                        container:
                            image: {image_name}
                            command: {command + arguments}"""
                )()

                container_op.set_env_variable(
                    name=ENV_ZENML_VERTEX_RUN_ID,
                    value=dslv2.PIPELINE_JOB_NAME_PLACEHOLDER,
                )

                # Set upstream tasks as a dependency of the current step
                for upstream_step_name in step.spec.upstream_steps:
                    upstream_container_op = step_name_to_container_op[
                        upstream_step_name
                    ]
                    container_op.after(upstream_container_op)

                settings = cast(
                    VertexOrchestratorSettings,
                    self.get_settings(step),
                )
                if settings.pod_settings:
                    apply_pod_settings(
                        container_op=container_op,
                        settings=settings.pod_settings,
                    )

                self._configure_container_resources(
                    container_op=container_op,
                    resource_settings=step.config.resource_settings,
                    node_selector_constraint=settings.node_selector_constraint,
                )
                container_op.set_caching_options(enable_caching=False)

                step_name_to_container_op[step.config.name] = container_op

        # Save the generated pipeline to a file.
        fileio.makedirs(self.pipeline_directory)
        pipeline_file_path = os.path.join(
            self.pipeline_directory,
            f"{orchestrator_run_name}.json",
        )

        # Compile the pipeline using the Kubeflow SDK V2 compiler that allows
        # to generate a JSON representation of the pipeline that can be later
        # upload to Vertex AI Pipelines service.
        KFPV2Compiler().compile(
            pipeline_func=_construct_kfp_pipeline,
            package_path=pipeline_file_path,
            pipeline_name=_clean_pipeline_name(deployment.pipeline.name),
        )
        logger.info(
            "Writing Vertex workflow definition to `%s`.", pipeline_file_path
        )

        settings = cast(
            VertexOrchestratorSettings, self.get_settings(deployment)
        )

        if deployment.schedule:
            logger.info(
                "Scheduling job using Google Cloud Scheduler and Google Cloud Functions..."
            )
            self._upload_and_schedule_pipeline(
                pipeline_name=deployment.pipeline.name,
                run_name=orchestrator_run_name,
                stack=stack,
                schedule=deployment.schedule,
                pipeline_file_path=pipeline_file_path,
                settings=settings,
            )

        else:
            logger.info("No schedule detected. Creating one-off vertex job...")
            # Using the Google Cloud AIPlatform client, upload and execute the
            # pipeline
            # on the Vertex AI Pipelines service.
            self._upload_and_run_pipeline(
                pipeline_name=deployment.pipeline.name,
                pipeline_file_path=pipeline_file_path,
                run_name=orchestrator_run_name,
                settings=settings,
            )

    def _upload_and_schedule_pipeline(
        self,
        pipeline_name: str,
        run_name: str,
        stack: "Stack",
        schedule: "Schedule",
        pipeline_file_path: str,
        settings: VertexOrchestratorSettings,
    ) -> None:
        """Uploads and schedules pipeline on GCP.

        Args:
            pipeline_name: Name of the pipeline.
            run_name: Orchestrator run name.
            stack: The stack the pipeline will run on.
            schedule: The schedule the pipeline will run on.
            pipeline_file_path: Path of the JSON file containing the compiled
                Kubeflow pipeline (compiled with Kubeflow SDK v2).
            settings: Pipeline level settings for this orchestrator.

        Raises:
            ValueError: If the attribute `pipeline_root` is not set and it
                can be not generated using the path of the artifact store in the
                stack because it is not a
                `zenml.integrations.gcp.artifact_store.GCPArtifactStore`. Also gets
                raised if attempting to schedule pipeline run without using the
                `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
        """
        # First, do some validation
        artifact_store = stack.artifact_store
        if artifact_store.flavor != GCP_ARTIFACT_STORE_FLAVOR:
            raise ValueError(
                "Currently, the Vertex AI orchestrator only supports scheduled runs "
                f"in combination with an artifact store of flavor: {GCP_ARTIFACT_STORE_FLAVOR}. "
                f"The current stacks artifact store is of flavor: {artifact_store.flavor}. "
                "Please update your stack accordingly."
            )

            # Copy over the scheduled pipeline to the artifact store
        artifact_store_base_uri = f"{artifact_store.path.rstrip('/')}/vertex_scheduled_pipelines/{pipeline_name}/{run_name}"
        artifact_store_pipeline_uri = (
            f"{artifact_store_base_uri}/vertex_pipeline.json"
        )
        fileio.copy(pipeline_file_path, artifact_store_pipeline_uri)
        logger.info(
            "The scheduled pipeline representation has been "
            "automatically copied to this path of the `GCPArtifactStore`: "
            f"{artifact_store_pipeline_uri}",
        )

        # Get the credentials that would be used to create resources.
        credentials, project_id = self._get_authentication()

        # Create cloud function
        function_uri = create_cloud_function(
            directory_path=vertex_scheduler.__path__[0],  # fixed path
            upload_path=f"{artifact_store_base_uri}/code.zip",
            project=project_id,
            location=self.config.location,
            function_name=run_name,
            credentials=credentials,
        )

        # Create the scheduler job
        body = {
            TEMPLATE_PATH: artifact_store_pipeline_uri,
            JOB_ID: _clean_pipeline_name(pipeline_name),
            PIPELINE_ROOT: self._pipeline_root,
            PARAMETER_VALUES: None,
            ENABLE_CACHING: False,
            ENCRYPTION_SPEC_KEY_NAME: self.config.encryption_spec_key_name,
            LABELS: settings.labels,
            PROJECT: project_id,
            LOCATION: self.config.location,
            WORKLOAD_SERVICE_ACCOUNT: self.config.workload_service_account,
            NETWORK: self.config.network,
        }

        create_scheduler_job(
            project=project_id,
            region=self.config.location,
            http_uri=function_uri,
            body=body,
            schedule=str(schedule.cron_expression),
            credentials=credentials,
        )

    def _upload_and_run_pipeline(
        self,
        pipeline_name: str,
        pipeline_file_path: str,
        run_name: str,
        settings: VertexOrchestratorSettings,
    ) -> None:
        """Uploads and run the pipeline on the Vertex AI Pipelines service.

        Args:
            pipeline_name: Name of the pipeline.
            pipeline_file_path: Path of the JSON file containing the compiled
                Kubeflow pipeline (compiled with Kubeflow SDK v2).
            run_name: Orchestrator run name.
            settings: Pipeline level settings for this orchestrator.
        """
        # We have to replace the hyphens in the run name with underscores
        # and lower case the string, because the Vertex AI Pipelines service
        # requires this format.
        job_id = _clean_pipeline_name(run_name)

        # Get the credentials that would be used to create the Vertex AI
        # Pipelines
        # job.
        credentials, project_id = self._get_authentication()

        # Instantiate the Vertex AI Pipelines job
        run = aiplatform.PipelineJob(
            display_name=pipeline_name,
            template_path=pipeline_file_path,
            job_id=job_id,
            pipeline_root=self._pipeline_root,
            parameter_values=None,
            enable_caching=False,
            encryption_spec_key_name=self.config.encryption_spec_key_name,
            labels=settings.labels,
            credentials=credentials,
            project=project_id,
            location=self.config.location,
        )

        logger.info(
            "Submitting pipeline job with job_id `%s` to Vertex AI Pipelines "
            "service.",
            job_id,
        )

        # Submit the job to Vertex AI Pipelines service.
        try:
            if self.config.workload_service_account:
                logger.info(
                    "The Vertex AI Pipelines job workload will be executed "
                    "using the `%s` "
                    "service account.",
                    self.config.workload_service_account,
                )

            if self.config.network:
                logger.info(
                    "The Vertex AI Pipelines job will be peered with the `%s` "
                    "network.",
                    self.config.network,
                )

            run.submit(
                service_account=self.config.workload_service_account,
                network=self.config.network,
            )
            logger.info(
                "View the Vertex AI Pipelines job at %s", run._dashboard_uri()
            )

            if settings.synchronous:
                logger.info(
                    "Waiting for the Vertex AI Pipelines job to finish..."
                )
                run.wait()

        except google_exceptions.ClientError as e:
            logger.warning(
                "Failed to create the Vertex AI Pipelines job: %s", e
            )

        except RuntimeError as e:
            logger.error(
                "The Vertex AI Pipelines job execution has failed: %s", e
            )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_VERTEX_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_VERTEX_RUN_ID}."
            )
config: VertexOrchestratorConfig property readonly

Returns the VertexOrchestratorConfig config.

Returns:

Type Description
VertexOrchestratorConfig

The configuration.

pipeline_directory: str property readonly

Returns path to directory where kubeflow pipelines files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property readonly

Returns path to the root directory for files for this orchestrator.

Returns:

Type Description
str

The path to the root directory for all files concerning this orchestrator.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Vertex orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates that the stack contains a container registry.

Also validates that the artifact store is not local.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_VERTEX_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_VERTEX_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack)

Creates a KFP JSON pipeline.

noqa: DAR402

This is an intermediary representation of the pipeline which is then deployed to Vertex AI Pipelines service.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a Docker image that contains the code for the pipeline, all steps the context around these files.

Based on this Docker image a callable is created which builds container_ops for each step (_construct_kfp_pipeline). The function kfp.components.load_component_from_text is used to create the ContainerOp, because using the dsl.ContainerOp class directly is deprecated when using the Kubeflow SDK v2. The step entrypoint command with the entrypoint arguments is the command that will be executed by the container created using the previously created Docker image.

This callable is then compiled into a JSON file that is used as the intermediary representation of the Kubeflow pipeline.

This file then is submitted to the Vertex AI Pipelines service for execution.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required

Exceptions:

Type Description
ValueError

If the attribute pipeline_root is not set and it can be not generated using the path of the artifact store in the stack because it is not a zenml.integrations.gcp.artifact_store.GCPArtifactStore. Also gets raised if attempting to schedule pipeline run without using the zenml.integrations.gcp.artifact_store.GCPArtifactStore.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Creates a KFP JSON pipeline.

    # noqa: DAR402

    This is an intermediary representation of the pipeline which is then
    deployed to Vertex AI Pipelines service.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()` method
    builds a Docker image that contains the code for the pipeline, all steps
    the context around these files.

    Based on this Docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`). The function
    `kfp.components.load_component_from_text` is used to create the
    `ContainerOp`, because using the `dsl.ContainerOp` class directly is
    deprecated when using the Kubeflow SDK v2. The step entrypoint command
    with the entrypoint arguments is the command that will be executed by
    the container created using the previously created Docker image.

    This callable is then compiled into a JSON file that is used as the
    intermediary representation of the Kubeflow pipeline.

    This file then is submitted to the Vertex AI Pipelines service for
    execution.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.

    Raises:
        ValueError: If the attribute `pipeline_root` is not set and it
            can be not generated using the path of the artifact store in the
            stack because it is not a
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`. Also gets
            raised if attempting to schedule pipeline run without using the
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
    """
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline.name
    )
    # If the `pipeline_root` has not been defined in the orchestrator
    # configuration,
    # try to create it from the artifact store if it is a
    # `GCPArtifactStore`.
    if not self.config.pipeline_root:
        artifact_store = stack.artifact_store
        self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{deployment.pipeline.name}/{orchestrator_run_name}"
        logger.info(
            "The attribute `pipeline_root` has not been set in the "
            "orchestrator configuration. One has been generated "
            "automatically based on the path of the `GCPArtifactStore` "
            "artifact store in the stack used to execute the pipeline. "
            "The generated `pipeline_root` is `%s`.",
            self._pipeline_root,
        )
    else:
        self._pipeline_root = self.config.pipeline_root

    image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]

    def _construct_kfp_pipeline() -> None:
        """Create a `ContainerOp` for each step.

        This should contain the name of the Docker image and configures the
        entrypoint of the Docker image to run the step.

        Additionally, this gives each `ContainerOp` information about its
        direct downstream steps.

        If this callable is passed to the `compile()` method of
        `KFPV2Compiler` all `dsl.ContainerOp` instances will be
        automatically added to a singular `dsl.Pipeline` instance.
        """
        command = StepEntrypointConfiguration.get_entrypoint_command()
        step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}

        for step_name, step in deployment.steps.items():
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                )
            )

            # Create the `ContainerOp` for the step. Using the
            # `dsl.ContainerOp`
            # class directly is deprecated when using the Kubeflow SDK v2.
            container_op = kfp.components.load_component_from_text(
                f"""
                name: {step.config.name}
                implementation:
                    container:
                        image: {image_name}
                        command: {command + arguments}"""
            )()

            container_op.set_env_variable(
                name=ENV_ZENML_VERTEX_RUN_ID,
                value=dslv2.PIPELINE_JOB_NAME_PLACEHOLDER,
            )

            # Set upstream tasks as a dependency of the current step
            for upstream_step_name in step.spec.upstream_steps:
                upstream_container_op = step_name_to_container_op[
                    upstream_step_name
                ]
                container_op.after(upstream_container_op)

            settings = cast(
                VertexOrchestratorSettings,
                self.get_settings(step),
            )
            if settings.pod_settings:
                apply_pod_settings(
                    container_op=container_op,
                    settings=settings.pod_settings,
                )

            self._configure_container_resources(
                container_op=container_op,
                resource_settings=step.config.resource_settings,
                node_selector_constraint=settings.node_selector_constraint,
            )
            container_op.set_caching_options(enable_caching=False)

            step_name_to_container_op[step.config.name] = container_op

    # Save the generated pipeline to a file.
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory,
        f"{orchestrator_run_name}.json",
    )

    # Compile the pipeline using the Kubeflow SDK V2 compiler that allows
    # to generate a JSON representation of the pipeline that can be later
    # upload to Vertex AI Pipelines service.
    KFPV2Compiler().compile(
        pipeline_func=_construct_kfp_pipeline,
        package_path=pipeline_file_path,
        pipeline_name=_clean_pipeline_name(deployment.pipeline.name),
    )
    logger.info(
        "Writing Vertex workflow definition to `%s`.", pipeline_file_path
    )

    settings = cast(
        VertexOrchestratorSettings, self.get_settings(deployment)
    )

    if deployment.schedule:
        logger.info(
            "Scheduling job using Google Cloud Scheduler and Google Cloud Functions..."
        )
        self._upload_and_schedule_pipeline(
            pipeline_name=deployment.pipeline.name,
            run_name=orchestrator_run_name,
            stack=stack,
            schedule=deployment.schedule,
            pipeline_file_path=pipeline_file_path,
            settings=settings,
        )

    else:
        logger.info("No schedule detected. Creating one-off vertex job...")
        # Using the Google Cloud AIPlatform client, upload and execute the
        # pipeline
        # on the Vertex AI Pipelines service.
        self._upload_and_run_pipeline(
            pipeline_name=deployment.pipeline.name,
            pipeline_file_path=pipeline_file_path,
            run_name=orchestrator_run_name,
            settings=settings,
        )
prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required

Exceptions:

Type Description
ValueError

If cron_expression is not in passed Schedule.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.

    Raises:
        ValueError: If `cron_expression` is not in passed Schedule.
    """
    if deployment.schedule:
        if (
            deployment.schedule.catchup
            or deployment.schedule.start_time
            or deployment.schedule.end_time
            or deployment.schedule.interval_second
        ):
            logger.warning(
                "Vertex orchestrator only uses schedules with the "
                "`cron_expression` property. All other properties "
                "are ignored."
            )
        if deployment.schedule.cron_expression is None:
            raise ValueError(
                "Property `cron_expression` must be set when passing "
                "schedule to a Vertex orchestrator."
            )

    docker_image_builder = PipelineDockerImageBuilder()
    repo_digest = docker_image_builder.build_and_push_docker_image(
        deployment=deployment, stack=stack
    )
    deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)

vertex_scheduler special

Loading the vertex scheduler package.

main

Entrypoint for the scheduled vertex job.

trigger_vertex_job(request)

Processes the incoming HTTP request.

Parameters:

Name Type Description Default
request Request

HTTP request object.

required

Returns:

Type Description
Response

The response text or any set of values that can be turned into a Response.

Source code in zenml/integrations/gcp/orchestrators/vertex_scheduler/main.py
def trigger_vertex_job(request: "Request") -> "Response":
    """Processes the incoming HTTP request.

    Args:
        request: HTTP request object.

    Returns:
        The response text or any set of values that can be turned into a Response.
    """
    # decode http request payload and translate into JSON object
    request_str = request.data.decode("utf-8")
    request_json = json.loads(request_str)

    display_name = f"{request_json[JOB_ID]}-scheduled-{random.Random().getrandbits(32):08x}"

    run = aiplatform.PipelineJob(
        display_name=display_name,
        template_path=request_json[TEMPLATE_PATH],
        job_id=display_name,
        pipeline_root=request_json[PIPELINE_ROOT],
        parameter_values=request_json[PARAMETER_VALUES],
        enable_caching=request_json[ENABLE_CACHING],
        encryption_spec_key_name=request_json[ENCRYPTION_SPEC_KEY_NAME],
        labels=request_json[LABELS],
        project=request_json[PROJECT],
        location=request_json[LOCATION],
    )

    workload_service_account = request_json[WORKLOAD_SERVICE_ACCOUNT]
    network = request_json[NETWORK]

    if workload_service_account:
        logging.info(
            "The Vertex AI Pipelines job workload will be executed "
            "using the `%s` "
            "service account.",
            workload_service_account,
        )

    if network:
        logging.info(
            "The Vertex AI Pipelines job will be peered with the `%s` "
            "network.",
            network,
        )

    run.submit(
        service_account=workload_service_account,
        network=network,
    )
    return f"{display_name} submitted!"

secrets_manager special

ZenML integration for GCP Secrets Manager.

The GCP Secrets Manager allows your pipeline to directly access the GCP secrets manager and use the secrets within during runtime.

gcp_secrets_manager

Implementation of the GCP Secrets Manager.

GCPSecretsManager (BaseSecretsManager)

Class to interact with the GCP secrets manager.

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
class GCPSecretsManager(BaseSecretsManager):
    """Class to interact with the GCP secrets manager."""

    CLIENT: ClassVar[Any] = None

    @property
    def config(self) -> GCPSecretsManagerConfig:
        """Returns the `GCPSecretsManagerConfig` config.

        Returns:
            The configuration.
        """
        return cast(GCPSecretsManagerConfig, self._config)

    @classmethod
    def _ensure_client_connected(cls) -> None:
        if cls.CLIENT is None:
            cls.CLIENT = secretmanager.SecretManagerServiceClient()

    @property
    def parent_name(self) -> str:
        """Construct the GCP parent path to the secret manager.

        Returns:
            The parent path to the secret manager
        """
        return f"projects/{self.config.project_id}"

    def _convert_secret_content(
        self, secret: BaseSecretSchema
    ) -> Dict[str, str]:
        """Convert the secret content into a Google compatible representation.

        This method implements two currently supported modes of adapting between
        the naming schemas used for ZenML secrets and Google secrets:

        * for a scoped Secrets Manager, a Google secret is created for each
        ZenML secret with a name that reflects the ZenML secret name and scope
        and a value that contains all its key-value pairs in JSON format.

        * for an unscoped (i.e. legacy) Secrets Manager, this method creates
        multiple Google secret entries for a single ZenML secret by adding the
        secret name to the key name of each secret key-value pair. This allows
        using the same key across multiple secrets. This is only kept for
        backwards compatibility and will be removed some time in the future.

        Args:
            secret: The ZenML secret

        Returns:
            A dictionary with the Google secret name as key and the secret
            contents as value.
        """
        if self.config.scope == SecretsManagerScope.NONE:
            # legacy per-key secret mapping
            return {f"{secret.name}_{k}": v for k, v in secret.content.items()}

        return {
            self._get_scoped_secret_name(
                secret.name, separator=ZENML_GCP_SECRET_SCOPE_PATH_SEPARATOR
            ): json.dumps(secret_to_dict(secret)),
        }

    def _get_secret_labels(
        self, secret: BaseSecretSchema
    ) -> List[Tuple[str, str]]:
        """Return a list of Google secret label values for a given secret.

        Args:
            secret: the secret object

        Returns:
            A list of Google secret label values
        """
        if self.config.scope == SecretsManagerScope.NONE:
            # legacy per-key secret labels
            return [
                (ZENML_GROUP_KEY, secret.name),
                (ZENML_SCHEMA_NAME, secret.TYPE),
            ]

        metadata = self._get_secret_metadata(secret)
        return list(metadata.items())

    def _get_secret_scope_filters(
        self,
        secret_name: Optional[str] = None,
    ) -> str:
        """Return a Google filter expression for the entire scope or just a scoped secret.

        These filters can be used when querying the Google Secrets Manager
        for all secrets or for a single secret available in the configured
        scope (see https://cloud.google.com/secret-manager/docs/filtering).

        Args:
            secret_name: Optional secret name to include in the scope metadata.

        Returns:
            Google filter expression uniquely identifying all secrets
            or a named secret within the configured scope.
        """
        if self.config.scope == SecretsManagerScope.NONE:
            # legacy per-key secret label filters
            if secret_name:
                return f"labels.{ZENML_GROUP_KEY}={secret_name}"
            else:
                return f"labels.{ZENML_GROUP_KEY}:*"

        metadata = self._get_secret_scope_metadata(secret_name)
        filters = [f"labels.{label}={v}" for (label, v) in metadata.items()]
        if secret_name:
            filters.append(f"name:{secret_name}")

        return " AND ".join(filters)

    def _list_secrets(self, secret_name: Optional[str] = None) -> List[str]:
        """List all secrets matching a name.

        This method lists all the secrets in the current scope without loading
        their contents. An optional secret name can be supplied to filter out
        all but a single secret identified by name.

        Args:
            secret_name: Optional secret name to filter for.

        Returns:
            A list of secret names in the current scope and the optional
            secret name.
        """
        self._ensure_client_connected()

        set_of_secrets = set()

        # List all secrets.
        for secret in self.CLIENT.list_secrets(
            request={
                "parent": self.parent_name,
                "filter": self._get_secret_scope_filters(secret_name),
            }
        ):
            if self.config.scope == SecretsManagerScope.NONE:
                name = secret.labels[ZENML_GROUP_KEY]
            else:
                name = secret.labels[ZENML_SECRET_NAME_LABEL]

            # filter by secret name, if one was given
            if name and (not secret_name or name == secret_name):
                set_of_secrets.add(name)

        return list(set_of_secrets)

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: the secret to register

        Raises:
            SecretExistsError: if the secret already exists
        """
        validate_gcp_secret_name_or_namespace(secret.name)
        self._ensure_client_connected()

        if self._list_secrets(secret.name):
            raise SecretExistsError(
                f"A Secret with the name {secret.name} already exists"
            )

        adjusted_content = self._convert_secret_content(secret)
        for k, v in adjusted_content.items():
            # Create the secret, this only creates an empty secret with the
            #  supplied name.
            gcp_secret = self.CLIENT.create_secret(
                request={
                    "parent": self.parent_name,
                    "secret_id": k,
                    "secret": {
                        "replication": {"automatic": {}},
                        "labels": self._get_secret_labels(secret),
                    },
                }
            )

            logger.debug("Created empty secret: %s", gcp_secret.name)

            self.CLIENT.add_secret_version(
                request={
                    "parent": gcp_secret.name,
                    "payload": {"data": str(v).encode()},
                }
            )

            logger.debug("Added value to secret.")

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Get a secret by its name.

        Args:
            secret_name: the name of the secret to get

        Returns:
            The secret.

        Raises:
            KeyError: if the secret does not exist
        """
        validate_gcp_secret_name_or_namespace(secret_name)
        self._ensure_client_connected()

        zenml_secret: Optional[BaseSecretSchema] = None

        if self.config.scope == SecretsManagerScope.NONE:
            # Legacy secrets are mapped to multiple Google secrets, one for
            # each secret key

            secret_contents = {}
            zenml_schema_name = ""

            # List all secrets.
            for google_secret in self.CLIENT.list_secrets(
                request={
                    "parent": self.parent_name,
                    "filter": self._get_secret_scope_filters(secret_name),
                }
            ):
                secret_version_name = google_secret.name + "/versions/latest"

                response = self.CLIENT.access_secret_version(
                    request={"name": secret_version_name}
                )

                secret_value = response.payload.data.decode("UTF-8")

                secret_key = remove_group_name_from_key(
                    google_secret.name.split("/")[-1], secret_name
                )

                secret_contents[secret_key] = secret_value

                zenml_schema_name = google_secret.labels[ZENML_SCHEMA_NAME]

            if not secret_contents:
                raise KeyError(
                    f"Can't find the specified secret '{secret_name}'"
                )

            secret_contents["name"] = secret_name

            secret_schema = SecretSchemaClassRegistry.get_class(
                secret_schema=zenml_schema_name
            )
            zenml_secret = secret_schema(**secret_contents)

        else:
            # Scoped secrets are mapped 1-to-1 with Google secrets

            google_secret_name = self.CLIENT.secret_path(
                self.config.project_id,
                self._get_scoped_secret_name(
                    secret_name,
                    separator=ZENML_GCP_SECRET_SCOPE_PATH_SEPARATOR,
                ),
            )

            try:
                # fetch the latest secret version
                google_secret = self.CLIENT.get_secret(name=google_secret_name)
            except google_exceptions.NotFound:
                raise KeyError(
                    f"Can't find the specified secret '{secret_name}'"
                )

            # make sure the secret has the correct scope labels to filter out
            # unscoped secrets with similar names
            scope_labels = self._get_secret_scope_metadata(secret_name)
            # all scope labels need to be included in the google secret labels,
            # otherwise the secret does not belong to the current scope
            if not scope_labels.items() <= google_secret.labels.items():
                raise KeyError(
                    f"Can't find the specified secret '{secret_name}'"
                )

            try:
                # fetch the latest secret version
                response = self.CLIENT.access_secret_version(
                    name=f"{google_secret_name}/versions/latest"
                )
            except google_exceptions.NotFound:
                raise KeyError(
                    f"Can't find the specified secret '{secret_name}'"
                )

            secret_value = response.payload.data.decode("UTF-8")
            zenml_secret = secret_from_dict(
                json.loads(secret_value), secret_name=secret_name
            )

        return zenml_secret

    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys.

        Returns:
            A list of all secret keys
        """
        return self._list_secrets()

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret by creating new versions of the existing secrets.

        Args:
            secret: the secret to update

        Raises:
            KeyError: if the secret does not exist
        """
        validate_gcp_secret_name_or_namespace(secret.name)
        self._ensure_client_connected()

        if not self._list_secrets(secret.name):
            raise KeyError(f"Can't find the specified secret '{secret.name}'")

        adjusted_content = self._convert_secret_content(secret)

        for k, v in adjusted_content.items():
            # Create the secret, this only creates an empty secret with the
            #  supplied name.
            google_secret_name = self.CLIENT.secret_path(
                self.config.project_id, k
            )
            payload = {"data": str(v).encode()}

            self.CLIENT.add_secret_version(
                request={"parent": google_secret_name, "payload": payload}
            )

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret by name.

        Args:
            secret_name: the name of the secret to delete

        Raises:
            KeyError: if the secret no longer exists
        """
        validate_gcp_secret_name_or_namespace(secret_name)
        self._ensure_client_connected()

        if not self._list_secrets(secret_name):
            raise KeyError(f"Can't find the specified secret '{secret_name}'")

        # Go through all gcp secrets and delete the ones with the secret_name
        # as label.
        for secret in self.CLIENT.list_secrets(
            request={
                "parent": self.parent_name,
                "filter": self._get_secret_scope_filters(secret_name),
            }
        ):
            self.CLIENT.delete_secret(request={"name": secret.name})

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
        self._ensure_client_connected()

        # List all secrets.
        for secret in self.CLIENT.list_secrets(
            request={
                "parent": self.parent_name,
                "filter": self._get_secret_scope_filters(),
            }
        ):
            logger.info(f"Deleting Google secret {secret.name}")
            self.CLIENT.delete_secret(request={"name": secret.name})
config: GCPSecretsManagerConfig property readonly

Returns the GCPSecretsManagerConfig config.

Returns:

Type Description
GCPSecretsManagerConfig

The configuration.

parent_name: str property readonly

Construct the GCP parent path to the secret manager.

Returns:

Type Description
str

The parent path to the secret manager

delete_all_secrets(self)

Delete all existing secrets.

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def delete_all_secrets(self) -> None:
    """Delete all existing secrets."""
    self._ensure_client_connected()

    # List all secrets.
    for secret in self.CLIENT.list_secrets(
        request={
            "parent": self.parent_name,
            "filter": self._get_secret_scope_filters(),
        }
    ):
        logger.info(f"Deleting Google secret {secret.name}")
        self.CLIENT.delete_secret(request={"name": secret.name})
delete_secret(self, secret_name)

Delete an existing secret by name.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to delete

required

Exceptions:

Type Description
KeyError

if the secret no longer exists

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret by name.

    Args:
        secret_name: the name of the secret to delete

    Raises:
        KeyError: if the secret no longer exists
    """
    validate_gcp_secret_name_or_namespace(secret_name)
    self._ensure_client_connected()

    if not self._list_secrets(secret_name):
        raise KeyError(f"Can't find the specified secret '{secret_name}'")

    # Go through all gcp secrets and delete the ones with the secret_name
    # as label.
    for secret in self.CLIENT.list_secrets(
        request={
            "parent": self.parent_name,
            "filter": self._get_secret_scope_filters(secret_name),
        }
    ):
        self.CLIENT.delete_secret(request={"name": secret.name})
get_all_secret_keys(self)

Get all secret keys.

Returns:

Type Description
List[str]

A list of all secret keys

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
    """Get all secret keys.

    Returns:
        A list of all secret keys
    """
    return self._list_secrets()
get_secret(self, secret_name)

Get a secret by its name.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to get

required

Returns:

Type Description
BaseSecretSchema

The secret.

Exceptions:

Type Description
KeyError

if the secret does not exist

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Get a secret by its name.

    Args:
        secret_name: the name of the secret to get

    Returns:
        The secret.

    Raises:
        KeyError: if the secret does not exist
    """
    validate_gcp_secret_name_or_namespace(secret_name)
    self._ensure_client_connected()

    zenml_secret: Optional[BaseSecretSchema] = None

    if self.config.scope == SecretsManagerScope.NONE:
        # Legacy secrets are mapped to multiple Google secrets, one for
        # each secret key

        secret_contents = {}
        zenml_schema_name = ""

        # List all secrets.
        for google_secret in self.CLIENT.list_secrets(
            request={
                "parent": self.parent_name,
                "filter": self._get_secret_scope_filters(secret_name),
            }
        ):
            secret_version_name = google_secret.name + "/versions/latest"

            response = self.CLIENT.access_secret_version(
                request={"name": secret_version_name}
            )

            secret_value = response.payload.data.decode("UTF-8")

            secret_key = remove_group_name_from_key(
                google_secret.name.split("/")[-1], secret_name
            )

            secret_contents[secret_key] = secret_value

            zenml_schema_name = google_secret.labels[ZENML_SCHEMA_NAME]

        if not secret_contents:
            raise KeyError(
                f"Can't find the specified secret '{secret_name}'"
            )

        secret_contents["name"] = secret_name

        secret_schema = SecretSchemaClassRegistry.get_class(
            secret_schema=zenml_schema_name
        )
        zenml_secret = secret_schema(**secret_contents)

    else:
        # Scoped secrets are mapped 1-to-1 with Google secrets

        google_secret_name = self.CLIENT.secret_path(
            self.config.project_id,
            self._get_scoped_secret_name(
                secret_name,
                separator=ZENML_GCP_SECRET_SCOPE_PATH_SEPARATOR,
            ),
        )

        try:
            # fetch the latest secret version
            google_secret = self.CLIENT.get_secret(name=google_secret_name)
        except google_exceptions.NotFound:
            raise KeyError(
                f"Can't find the specified secret '{secret_name}'"
            )

        # make sure the secret has the correct scope labels to filter out
        # unscoped secrets with similar names
        scope_labels = self._get_secret_scope_metadata(secret_name)
        # all scope labels need to be included in the google secret labels,
        # otherwise the secret does not belong to the current scope
        if not scope_labels.items() <= google_secret.labels.items():
            raise KeyError(
                f"Can't find the specified secret '{secret_name}'"
            )

        try:
            # fetch the latest secret version
            response = self.CLIENT.access_secret_version(
                name=f"{google_secret_name}/versions/latest"
            )
        except google_exceptions.NotFound:
            raise KeyError(
                f"Can't find the specified secret '{secret_name}'"
            )

        secret_value = response.payload.data.decode("UTF-8")
        zenml_secret = secret_from_dict(
            json.loads(secret_value), secret_name=secret_name
        )

    return zenml_secret
register_secret(self, secret)

Registers a new secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to register

required

Exceptions:

Type Description
SecretExistsError

if the secret already exists

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    Args:
        secret: the secret to register

    Raises:
        SecretExistsError: if the secret already exists
    """
    validate_gcp_secret_name_or_namespace(secret.name)
    self._ensure_client_connected()

    if self._list_secrets(secret.name):
        raise SecretExistsError(
            f"A Secret with the name {secret.name} already exists"
        )

    adjusted_content = self._convert_secret_content(secret)
    for k, v in adjusted_content.items():
        # Create the secret, this only creates an empty secret with the
        #  supplied name.
        gcp_secret = self.CLIENT.create_secret(
            request={
                "parent": self.parent_name,
                "secret_id": k,
                "secret": {
                    "replication": {"automatic": {}},
                    "labels": self._get_secret_labels(secret),
                },
            }
        )

        logger.debug("Created empty secret: %s", gcp_secret.name)

        self.CLIENT.add_secret_version(
            request={
                "parent": gcp_secret.name,
                "payload": {"data": str(v).encode()},
            }
        )

        logger.debug("Added value to secret.")
update_secret(self, secret)

Update an existing secret by creating new versions of the existing secrets.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to update

required

Exceptions:

Type Description
KeyError

if the secret does not exist

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret by creating new versions of the existing secrets.

    Args:
        secret: the secret to update

    Raises:
        KeyError: if the secret does not exist
    """
    validate_gcp_secret_name_or_namespace(secret.name)
    self._ensure_client_connected()

    if not self._list_secrets(secret.name):
        raise KeyError(f"Can't find the specified secret '{secret.name}'")

    adjusted_content = self._convert_secret_content(secret)

    for k, v in adjusted_content.items():
        # Create the secret, this only creates an empty secret with the
        #  supplied name.
        google_secret_name = self.CLIENT.secret_path(
            self.config.project_id, k
        )
        payload = {"data": str(v).encode()}

        self.CLIENT.add_secret_version(
            request={"parent": google_secret_name, "payload": payload}
        )
remove_group_name_from_key(combined_key_name, group_name)

Removes the secret group name from the secret key.

Parameters:

Name Type Description Default
combined_key_name str

Full name as it is within the gcp secrets manager

required
group_name str

Group name (the ZenML Secret name)

required

Returns:

Type Description
str

The cleaned key

Exceptions:

Type Description
RuntimeError

If the group name is not found in the key

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
def remove_group_name_from_key(combined_key_name: str, group_name: str) -> str:
    """Removes the secret group name from the secret key.

    Args:
        combined_key_name: Full name as it is within the gcp secrets manager
        group_name: Group name (the ZenML Secret name)

    Returns:
        The cleaned key

    Raises:
        RuntimeError: If the group name is not found in the key
    """
    if combined_key_name.startswith(group_name + "_"):
        return combined_key_name[len(group_name + "_") :]
    else:
        raise RuntimeError(
            f"Key-name `{combined_key_name}` does not have the "
            f"prefix `{group_name}`. Key could not be "
            f"extracted."
        )

step_operators special

Initialization for the VertexAI Step Operator.

vertex_step_operator

Implementation of a VertexAI step operator.

Code heavily inspired by TFX Implementation: https://github.com/tensorflow/tfx/blob/master/tfx/extensions/ google_cloud_ai_platform/training_clients.py

VertexStepOperator (BaseStepOperator, GoogleCredentialsMixin)

Step operator to run a step on Vertex AI.

This class defines code that can set up a Vertex AI environment and run the ZenML entrypoint command in it.

Source code in zenml/integrations/gcp/step_operators/vertex_step_operator.py
class VertexStepOperator(BaseStepOperator, GoogleCredentialsMixin):
    """Step operator to run a step on Vertex AI.

    This class defines code that can set up a Vertex AI environment and run the
    ZenML entrypoint command in it.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initializes the step operator and validates the accelerator type.

        Args:
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.
        """
        super().__init__(*args, **kwargs)

    @property
    def config(self) -> VertexStepOperatorConfig:
        """Returns the `VertexStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(VertexStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Vertex step operator.

        Returns:
            The settings class.
        """
        return VertexStepOperatorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote container
            registry and a remote artifact store.
        """

        def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The Vertex step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the Vertex "
                    "step operator."
                )

            container_registry = stack.container_registry
            assert container_registry is not None

            if container_registry.config.is_local:
                return False, (
                    "The Vertex step operator runs code remotely and "
                    "needs to push/pull Docker images, but the "
                    f"container registry `{container_registry.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote container registry when using the "
                    "Vertex step operator."
                )

            return True, ""

        return StackValidator(
            required_components={StackComponentType.CONTAINER_REGISTRY},
            custom_validation_function=_validate_remote_components,
        )

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        steps_to_run = [
            step
            for step in deployment.steps.values()
            if step.config.step_operator == self.name
        ]
        if not steps_to_run:
            return
        docker_image_builder = PipelineDockerImageBuilder()
        image_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment,
            stack=stack,
        )
        for step in steps_to_run:
            step.config.extra[VERTEX_DOCKER_IMAGE_DIGEST_KEY] = image_digest

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
    ) -> None:
        """Launches a step on VertexAI.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.

        Raises:
            RuntimeError: If the run fails.
            ConnectionError: If the run fails due to a connection error.
        """
        resource_settings = info.config.resource_settings
        if resource_settings.cpu_count or resource_settings.memory:
            logger.warning(
                "Specifying cpus or memory is not supported for "
                "the Vertex step operator. If you want to run this step "
                "operator on specific resources, you can do so by configuring "
                "a different machine_type type like this: "
                "`zenml step-operator update %s "
                "--machine_type=<MACHINE_TYPE>`",
                self.name,
            )
        settings = cast(VertexStepOperatorSettings, self.get_settings(info))
        validate_accelerator_type(settings.accelerator_type)

        job_labels = {"source": f"zenml-{__version__.replace('.', '_')}"}

        # Step 1: Authenticate with Google
        credentials, project_id = self._get_authentication()
        if self.config.project:
            if self.config.project != project_id:
                logger.warning(
                    "Authenticated with project `%s`, but this orchestrator is "
                    "configured to use the project `%s`.",
                    project_id,
                    self.config.project,
                )
        else:
            self.config.project = project_id

        image_name = info.config.extra[VERTEX_DOCKER_IMAGE_DIGEST_KEY]

        # Step 3: Launch the job
        # The AI Platform services require regional API endpoints.
        client_options = {
            "api_endpoint": self.config.region + VERTEX_ENDPOINT_SUFFIX
        }
        # Initialize client that will be used to create and send requests.
        # This client only needs to be created once, and can be reused for multiple requests.
        client = aiplatform.gapic.JobServiceClient(
            credentials=credentials, client_options=client_options
        )
        accelerator_count = (
            resource_settings.gpu_count or settings.accelerator_count
        )
        custom_job = {
            "display_name": info.run_name,
            "job_spec": {
                "worker_pool_specs": [
                    {
                        "machine_spec": {
                            "machine_type": settings.machine_type,
                            "accelerator_type": settings.accelerator_type,
                            "accelerator_count": accelerator_count
                            if settings.accelerator_type
                            else 0,
                        },
                        "replica_count": 1,
                        "container_spec": {
                            "image_uri": image_name,
                            "command": entrypoint_command,
                            "args": [],
                        },
                    }
                ]
            },
            "labels": job_labels,
            "encryption_spec": {
                "kmsKeyName": self.config.encryption_spec_key_name
            }
            if self.config.encryption_spec_key_name
            else {},
        }
        logger.debug("Vertex AI Job=%s", custom_job)

        parent = (
            f"projects/{self.config.project}/locations/{self.config.region}"
        )
        logger.info(
            "Submitting custom job='%s', path='%s' to Vertex AI Training.",
            custom_job["display_name"],
            parent,
        )
        response = client.create_custom_job(
            parent=parent, custom_job=custom_job
        )
        logger.debug("Vertex AI response:", response)

        # Step 4: Monitor the job

        # Monitors the long-running operation by polling the job state
        # periodically, and retries the polling when a transient connectivity
        # issue is encountered.
        #
        # Long-running operation monitoring:
        #   The possible states of "get job" response can be found at
        #   https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#State
        #   where SUCCEEDED/FAILED/CANCELED are considered to be final states.
        #   The following logic will keep polling the state of the job until
        #   the job enters a final state.
        #
        # During the polling, if a connection error was encountered, the GET
        # request will be retried by recreating the Python API client to
        # refresh the lifecycle of the connection being used. See
        # https://github.com/googleapis/google-api-python-client/issues/218
        # for a detailed description of the problem. If the error persists for
        # _CONNECTION_ERROR_RETRY_LIMIT consecutive attempts, the function
        # will raise ConnectionError.
        retry_count = 0
        job_id = response.name

        while response.state not in VERTEX_JOB_STATES_COMPLETED:
            time.sleep(POLLING_INTERVAL_IN_SECONDS)
            try:
                response = client.get_custom_job(name=job_id)
                retry_count = 0
            # Handle transient connection error.
            except ConnectionError as err:
                if retry_count < CONNECTION_ERROR_RETRY_LIMIT:
                    retry_count += 1
                    logger.warning(
                        "ConnectionError (%s) encountered when polling job: "
                        "%s. Trying to recreate the API client.",
                        err,
                        job_id,
                    )
                    # Recreate the Python API client.
                    client = aiplatform.gapic.JobServiceClient(
                        client_options=client_options
                    )
                else:
                    logger.error(
                        "Request failed after %s retries.",
                        CONNECTION_ERROR_RETRY_LIMIT,
                    )
                    raise

            if response.state in VERTEX_JOB_STATES_FAILED:
                err_msg = (
                    "Job '{}' did not succeed.  Detailed response {}.".format(
                        job_id, response
                    )
                )
                logger.error(err_msg)
                raise RuntimeError(err_msg)

        # Cloud training complete
        logger.info("Job '%s' successful.", job_id)
config: VertexStepOperatorConfig property readonly

Returns the VertexStepOperatorConfig config.

Returns:

Type Description
VertexStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Vertex step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote container registry and a remote artifact store.

__init__(self, *args, **kwargs) special

Initializes the step operator and validates the accelerator type.

Parameters:

Name Type Description Default
*args Any

Variable length argument list.

()
**kwargs Any

Arbitrary keyword arguments.

{}
Source code in zenml/integrations/gcp/step_operators/vertex_step_operator.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initializes the step operator and validates the accelerator type.

    Args:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    super().__init__(*args, **kwargs)
launch(self, info, entrypoint_command)

Launches a step on VertexAI.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required

Exceptions:

Type Description
RuntimeError

If the run fails.

ConnectionError

If the run fails due to a connection error.

Source code in zenml/integrations/gcp/step_operators/vertex_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
) -> None:
    """Launches a step on VertexAI.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.

    Raises:
        RuntimeError: If the run fails.
        ConnectionError: If the run fails due to a connection error.
    """
    resource_settings = info.config.resource_settings
    if resource_settings.cpu_count or resource_settings.memory:
        logger.warning(
            "Specifying cpus or memory is not supported for "
            "the Vertex step operator. If you want to run this step "
            "operator on specific resources, you can do so by configuring "
            "a different machine_type type like this: "
            "`zenml step-operator update %s "
            "--machine_type=<MACHINE_TYPE>`",
            self.name,
        )
    settings = cast(VertexStepOperatorSettings, self.get_settings(info))
    validate_accelerator_type(settings.accelerator_type)

    job_labels = {"source": f"zenml-{__version__.replace('.', '_')}"}

    # Step 1: Authenticate with Google
    credentials, project_id = self._get_authentication()
    if self.config.project:
        if self.config.project != project_id:
            logger.warning(
                "Authenticated with project `%s`, but this orchestrator is "
                "configured to use the project `%s`.",
                project_id,
                self.config.project,
            )
    else:
        self.config.project = project_id

    image_name = info.config.extra[VERTEX_DOCKER_IMAGE_DIGEST_KEY]

    # Step 3: Launch the job
    # The AI Platform services require regional API endpoints.
    client_options = {
        "api_endpoint": self.config.region + VERTEX_ENDPOINT_SUFFIX
    }
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.JobServiceClient(
        credentials=credentials, client_options=client_options
    )
    accelerator_count = (
        resource_settings.gpu_count or settings.accelerator_count
    )
    custom_job = {
        "display_name": info.run_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": settings.machine_type,
                        "accelerator_type": settings.accelerator_type,
                        "accelerator_count": accelerator_count
                        if settings.accelerator_type
                        else 0,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": image_name,
                        "command": entrypoint_command,
                        "args": [],
                    },
                }
            ]
        },
        "labels": job_labels,
        "encryption_spec": {
            "kmsKeyName": self.config.encryption_spec_key_name
        }
        if self.config.encryption_spec_key_name
        else {},
    }
    logger.debug("Vertex AI Job=%s", custom_job)

    parent = (
        f"projects/{self.config.project}/locations/{self.config.region}"
    )
    logger.info(
        "Submitting custom job='%s', path='%s' to Vertex AI Training.",
        custom_job["display_name"],
        parent,
    )
    response = client.create_custom_job(
        parent=parent, custom_job=custom_job
    )
    logger.debug("Vertex AI response:", response)

    # Step 4: Monitor the job

    # Monitors the long-running operation by polling the job state
    # periodically, and retries the polling when a transient connectivity
    # issue is encountered.
    #
    # Long-running operation monitoring:
    #   The possible states of "get job" response can be found at
    #   https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#State
    #   where SUCCEEDED/FAILED/CANCELED are considered to be final states.
    #   The following logic will keep polling the state of the job until
    #   the job enters a final state.
    #
    # During the polling, if a connection error was encountered, the GET
    # request will be retried by recreating the Python API client to
    # refresh the lifecycle of the connection being used. See
    # https://github.com/googleapis/google-api-python-client/issues/218
    # for a detailed description of the problem. If the error persists for
    # _CONNECTION_ERROR_RETRY_LIMIT consecutive attempts, the function
    # will raise ConnectionError.
    retry_count = 0
    job_id = response.name

    while response.state not in VERTEX_JOB_STATES_COMPLETED:
        time.sleep(POLLING_INTERVAL_IN_SECONDS)
        try:
            response = client.get_custom_job(name=job_id)
            retry_count = 0
        # Handle transient connection error.
        except ConnectionError as err:
            if retry_count < CONNECTION_ERROR_RETRY_LIMIT:
                retry_count += 1
                logger.warning(
                    "ConnectionError (%s) encountered when polling job: "
                    "%s. Trying to recreate the API client.",
                    err,
                    job_id,
                )
                # Recreate the Python API client.
                client = aiplatform.gapic.JobServiceClient(
                    client_options=client_options
                )
            else:
                logger.error(
                    "Request failed after %s retries.",
                    CONNECTION_ERROR_RETRY_LIMIT,
                )
                raise

        if response.state in VERTEX_JOB_STATES_FAILED:
            err_msg = (
                "Job '{}' did not succeed.  Detailed response {}.".format(
                    job_id, response
                )
            )
            logger.error(err_msg)
            raise RuntimeError(err_msg)

    # Cloud training complete
    logger.info("Job '%s' successful.", job_id)
prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/integrations/gcp/step_operators/vertex_step_operator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    steps_to_run = [
        step
        for step in deployment.steps.values()
        if step.config.step_operator == self.name
    ]
    if not steps_to_run:
        return
    docker_image_builder = PipelineDockerImageBuilder()
    image_digest = docker_image_builder.build_and_push_docker_image(
        deployment=deployment,
        stack=stack,
    )
    for step in steps_to_run:
        step.config.extra[VERTEX_DOCKER_IMAGE_DIGEST_KEY] = image_digest
validate_accelerator_type(accelerator_type=None)

Validates that the accelerator type is valid.

Parameters:

Name Type Description Default
accelerator_type Optional[str]

The accelerator type to validate.

None

Exceptions:

Type Description
ValueError

If the accelerator type is not valid.

Source code in zenml/integrations/gcp/step_operators/vertex_step_operator.py
def validate_accelerator_type(accelerator_type: Optional[str] = None) -> None:
    """Validates that the accelerator type is valid.

    Args:
        accelerator_type: The accelerator type to validate.

    Raises:
        ValueError: If the accelerator type is not valid.
    """
    accepted_vals = list(aiplatform.gapic.AcceleratorType.__members__.keys())
    if accelerator_type and accelerator_type.upper() not in accepted_vals:
        raise ValueError(
            f"Accelerator must be one of the following: {accepted_vals}"
        )