Skip to content

Gcp

zenml.integrations.gcp

Initialization of the GCP ZenML integration.

The GCP integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores and provides an io module to handle file operations on Google Cloud Storage (GCS).

The Vertex AI integration submodule provides a way to run ZenML pipelines in a Vertex AI environment.

Attributes

GCP = 'gcp' module-attribute

GCP_ARTIFACT_STORE_FLAVOR = 'gcp' module-attribute

GCP_CONNECTOR_TYPE = 'gcp' module-attribute

GCP_IMAGE_BUILDER_FLAVOR = 'gcp' module-attribute

GCP_RESOURCE_TYPE = 'gcp-generic' module-attribute

GCP_VERTEX_EXPERIMENT_TRACKER_FLAVOR = 'vertex' module-attribute

GCP_VERTEX_ORCHESTRATOR_FLAVOR = 'vertex' module-attribute

GCP_VERTEX_STEP_OPERATOR_FLAVOR = 'vertex' module-attribute

GCS_RESOURCE_TYPE = 'gcs-bucket' module-attribute

Classes

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

GcpIntegration

Bases: Integration

Definition of Google Cloud Platform integration for ZenML.

Functions
activate() -> None classmethod

Activate the GCP integration.

Source code in src/zenml/integrations/gcp/__init__.py
60
61
62
63
@classmethod
def activate(cls) -> None:
    """Activate the GCP integration."""
    from zenml.integrations.gcp import service_connectors  # noqa
flavors() -> List[Type[Flavor]] classmethod

Declare the stack component flavors for the GCP integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/gcp/__init__.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the GCP integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.gcp.flavors import (
        GCPArtifactStoreFlavor,
        GCPImageBuilderFlavor,
        VertexExperimentTrackerFlavor,
        VertexOrchestratorFlavor,
        VertexStepOperatorFlavor,
    )

    return [
        GCPArtifactStoreFlavor,
        GCPImageBuilderFlavor,
        VertexExperimentTrackerFlavor,
        VertexOrchestratorFlavor,
        VertexStepOperatorFlavor,
    ]

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

artifact_stores

Initialization of the GCP Artifact Store.

Classes
GCPArtifactStore(*args: Any, **kwargs: Any)

Bases: BaseArtifactStore, AuthenticationMixin

Artifact Store for Google Cloud Storage based artifacts.

Source code in src/zenml/artifact_stores/base_artifact_store.py
430
431
432
433
434
435
436
437
438
439
440
441
442
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initiate the Pydantic object and register the corresponding filesystem.

    Args:
        *args: The positional arguments to pass to the Pydantic object.
        **kwargs: The keyword arguments to pass to the Pydantic object.
    """
    super(BaseArtifactStore, self).__init__(*args, **kwargs)

    # If running in a ZenML server environment, we don't register
    # the filesystems. We always use the artifact stores directly.
    if ENV_ZENML_SERVER not in os.environ:
        self._register()
Attributes
config: GCPArtifactStoreConfig property

Returns the GCPArtifactStoreConfig config.

Returns:

Type Description
GCPArtifactStoreConfig

The configuration.

filesystem: gcsfs.GCSFileSystem property

The gcsfs filesystem to access this artifact store.

Returns:

Type Description
GCSFileSystem

The gcsfs filesystem to access this artifact store.

Functions
copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file.

Parameters:

Name Type Description Default
src PathType

The path to copy from.

required
dst PathType

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )
    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(path: PathType) -> bool

Check whether a path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
145
146
147
148
149
150
151
152
153
154
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials() -> Optional[Union[Dict[str, Any], gcp_credentials.Credentials]]

Returns the credentials for the GCP Artifact Store if configured.

Returns:

Type Description
Optional[Union[Dict[str, Any], Credentials]]

The credentials.

Raises:

Type Description
RuntimeError

If the linked connector returns the wrong type of client.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def get_credentials(
    self,
) -> Optional[Union[Dict[str, Any], gcp_credentials.Credentials]]:
    """Returns the credentials for the GCP Artifact Store if configured.

    Returns:
        The credentials.

    Raises:
        RuntimeError: If the linked connector returns the wrong type of
            client.
    """
    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, storage.Client):
            raise RuntimeError(
                f"Expected a google.cloud.storage.Client while trying to "
                f"use the linked connector, but got {type(client)}."
            )
        return client._credentials

    secret = self.get_typed_authentication_secret(
        expected_schema_type=GCPSecretSchema
    )
    return secret.get_credential_dict() if secret else None
glob(pattern: PathType) -> List[PathType]

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern PathType

The glob pattern to match, see details above.

required

Returns:

Type Description
List[PathType]

A list of paths that match the given glob pattern.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
      in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [
        f"{GCP_PATH_PREFIX}{path}"
        for path in self.filesystem.glob(path=pattern)
    ]
isdir(path: PathType) -> bool

Check whether a path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
177
178
179
180
181
182
183
184
185
186
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(path: PathType) -> List[PathType]

Return a list of files in a directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to list.

required

Returns:

Type Description
List[PathType]

A list of paths of files in the directory.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path of the directory to list.

    Returns:
        A list of paths of files in the directory.
    """
    path_without_prefix = convert_to_str(path)
    if path_without_prefix.startswith(GCP_PATH_PREFIX):
        path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by GCP.

        Args:
            file_dict: A file info dict returned by the GCP filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path_without_prefix) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
        # gcsfs.listdir also returns the root directory, so we filter
        # it out here
        if _extract_basename(dict_)
    ]
makedirs(path: PathType) -> None

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path PathType

The path of the directory to create.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
222
223
224
225
226
227
228
229
230
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(path: PathType) -> None

Create a directory at the given path.

Parameters:

Name Type Description Default
path PathType

The path of the directory to create.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
232
233
234
235
236
237
238
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedir(path=path)
open(path: PathType, mode: str = 'r') -> Any

Open a file at the given path.

Parameters:

Name Type Description Default
path PathType

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object that can be used to read or write to the file.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object that can be used to read or write to the file.
    """
    if mode in ("a", "ab"):
        logger.warning(
            "GCS Filesystem is immutable, so append mode will overwrite existing files."
        )
    return self.filesystem.open(path=path, mode=mode)
remove(path: PathType) -> None

Remove the file at the given path.

Parameters:

Name Type Description Default
path PathType

The path of the file to remove.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
240
241
242
243
244
245
246
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename source file to destination file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(path: PathType) -> None

Remove the given directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to remove.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
274
275
276
277
278
279
280
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(path: PathType) -> int

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
293
294
295
296
297
298
299
300
301
302
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(path: PathType) -> Dict[str, Any]

Return stat info for the given path.

Parameters:

Name Type Description Default
path PathType

the path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the stat info.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
282
283
284
285
286
287
288
289
290
291
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: the path to get stat info for.

    Returns:
        A dictionary with the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., None]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

directory path, a list of directories inside the current directory

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

and a list of files inside the current directory.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files
Modules
gcp_artifact_store

Implementation of the GCP Artifact Store.

Classes
GCPArtifactStore(*args: Any, **kwargs: Any)

Bases: BaseArtifactStore, AuthenticationMixin

Artifact Store for Google Cloud Storage based artifacts.

Source code in src/zenml/artifact_stores/base_artifact_store.py
430
431
432
433
434
435
436
437
438
439
440
441
442
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initiate the Pydantic object and register the corresponding filesystem.

    Args:
        *args: The positional arguments to pass to the Pydantic object.
        **kwargs: The keyword arguments to pass to the Pydantic object.
    """
    super(BaseArtifactStore, self).__init__(*args, **kwargs)

    # If running in a ZenML server environment, we don't register
    # the filesystems. We always use the artifact stores directly.
    if ENV_ZENML_SERVER not in os.environ:
        self._register()
Attributes
config: GCPArtifactStoreConfig property

Returns the GCPArtifactStoreConfig config.

Returns:

Type Description
GCPArtifactStoreConfig

The configuration.

filesystem: gcsfs.GCSFileSystem property

The gcsfs filesystem to access this artifact store.

Returns:

Type Description
GCSFileSystem

The gcsfs filesystem to access this artifact store.

Functions
copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file.

Parameters:

Name Type Description Default
src PathType

The path to copy from.

required
dst PathType

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )
    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(path: PathType) -> bool

Check whether a path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
145
146
147
148
149
150
151
152
153
154
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials() -> Optional[Union[Dict[str, Any], gcp_credentials.Credentials]]

Returns the credentials for the GCP Artifact Store if configured.

Returns:

Type Description
Optional[Union[Dict[str, Any], Credentials]]

The credentials.

Raises:

Type Description
RuntimeError

If the linked connector returns the wrong type of client.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def get_credentials(
    self,
) -> Optional[Union[Dict[str, Any], gcp_credentials.Credentials]]:
    """Returns the credentials for the GCP Artifact Store if configured.

    Returns:
        The credentials.

    Raises:
        RuntimeError: If the linked connector returns the wrong type of
            client.
    """
    connector = self.get_connector()
    if connector:
        client = connector.connect()
        if not isinstance(client, storage.Client):
            raise RuntimeError(
                f"Expected a google.cloud.storage.Client while trying to "
                f"use the linked connector, but got {type(client)}."
            )
        return client._credentials

    secret = self.get_typed_authentication_secret(
        expected_schema_type=GCPSecretSchema
    )
    return secret.get_credential_dict() if secret else None
glob(pattern: PathType) -> List[PathType]

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern PathType

The glob pattern to match, see details above.

required

Returns:

Type Description
List[PathType]

A list of paths that match the given glob pattern.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
      in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [
        f"{GCP_PATH_PREFIX}{path}"
        for path in self.filesystem.glob(path=pattern)
    ]
isdir(path: PathType) -> bool

Check whether a path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
177
178
179
180
181
182
183
184
185
186
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(path: PathType) -> List[PathType]

Return a list of files in a directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to list.

required

Returns:

Type Description
List[PathType]

A list of paths of files in the directory.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path of the directory to list.

    Returns:
        A list of paths of files in the directory.
    """
    path_without_prefix = convert_to_str(path)
    if path_without_prefix.startswith(GCP_PATH_PREFIX):
        path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by GCP.

        Args:
            file_dict: A file info dict returned by the GCP filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path_without_prefix) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
        # gcsfs.listdir also returns the root directory, so we filter
        # it out here
        if _extract_basename(dict_)
    ]
makedirs(path: PathType) -> None

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path PathType

The path of the directory to create.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
222
223
224
225
226
227
228
229
230
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(path: PathType) -> None

Create a directory at the given path.

Parameters:

Name Type Description Default
path PathType

The path of the directory to create.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
232
233
234
235
236
237
238
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedir(path=path)
open(path: PathType, mode: str = 'r') -> Any

Open a file at the given path.

Parameters:

Name Type Description Default
path PathType

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object that can be used to read or write to the file.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object that can be used to read or write to the file.
    """
    if mode in ("a", "ab"):
        logger.warning(
            "GCS Filesystem is immutable, so append mode will overwrite existing files."
        )
    return self.filesystem.open(path=path, mode=mode)
remove(path: PathType) -> None

Remove the file at the given path.

Parameters:

Name Type Description Default
path PathType

The path of the file to remove.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
240
241
242
243
244
245
246
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename source file to destination file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(path: PathType) -> None

Remove the given directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to remove.

required
Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
274
275
276
277
278
279
280
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(path: PathType) -> int

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
293
294
295
296
297
298
299
300
301
302
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(path: PathType) -> Dict[str, Any]

Return stat info for the given path.

Parameters:

Name Type Description Default
path PathType

the path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the stat info.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
282
283
284
285
286
287
288
289
290
291
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: the path to get stat info for.

    Returns:
        A dictionary with the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., None]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

directory path, a list of directories inside the current directory

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

and a list of files inside the current directory.

Source code in src/zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files
Functions

constants

Constants for the VertexAI integration.

experiment_trackers

Initialization for the VertexAI experiment tracker.

Classes
VertexExperimentTracker(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseExperimentTracker, GoogleCredentialsMixin

Track experiments using VertexAI.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: VertexExperimentTrackerConfig property

Returns the VertexExperimentTrackerConfig config.

Returns:

Type Description
VertexExperimentTrackerConfig

The configuration.

settings_class: Type[VertexExperimentTrackerSettings] property

Returns the BaseSettings settings class.

Returns:

Type Description
Type[VertexExperimentTrackerSettings]

The settings class.

Functions
cleanup_step_run(info: StepRunInfo, step_failed: bool) -> None

Stops the VertexAI run.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required
step_failed bool

Whether the step failed or not.

required
Source code in src/zenml/integrations/gcp/experiment_trackers/vertex_experiment_tracker.py
202
203
204
205
206
207
208
209
210
211
212
213
214
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
    """Stops the VertexAI run.

    Args:
        info: Info about the step that was executed.
        step_failed: Whether the step failed or not.
    """
    state = (
        execution.Execution.State.FAILED
        if step_failed
        else execution.Execution.State.COMPLETE
    )
    aiplatform.end_run(state=state)
get_step_run_metadata(info: StepRunInfo) -> Dict[str, MetadataType]

Get component- and step-specific metadata after a step ran.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/gcp/experiment_trackers/vertex_experiment_tracker.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_step_run_metadata(
    self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
    """Get component- and step-specific metadata after a step ran.

    Args:
        info: Info about the step that was executed.

    Returns:
        A dictionary of metadata.
    """
    experiment_name = self._get_experiment_name(info=info)
    run_name = self._get_run_name(info=info)
    tensorboard_resource_name = self._get_tensorboard_resource_name(
        experiment=experiment_name
    )
    dashboard_url = self._get_dashboard_url(experiment=experiment_name)
    return {
        METADATA_EXPERIMENT_TRACKER_URL: Uri(dashboard_url),
        "tensorboard_resource_name": tensorboard_resource_name or "",
        "vertex_run_name": run_name,
    }
prepare_step_run(info: StepRunInfo) -> None

Configures a VertexAI run.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that will be executed.

required
Source code in src/zenml/integrations/gcp/experiment_trackers/vertex_experiment_tracker.py
65
66
67
68
69
70
71
72
73
def prepare_step_run(self, info: "StepRunInfo") -> None:
    """Configures a VertexAI run.

    Args:
        info: Info about the step that will be executed.
    """
    self._initialize_vertex(info=info)
    self.experiment_name = self._get_experiment_name(info=info)
    self.run_name = self._get_run_name(info=info)
Modules
vertex_experiment_tracker

Implementation of the VertexAI experiment tracker for ZenML.

Classes
VertexExperimentTracker(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseExperimentTracker, GoogleCredentialsMixin

Track experiments using VertexAI.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: VertexExperimentTrackerConfig property

Returns the VertexExperimentTrackerConfig config.

Returns:

Type Description
VertexExperimentTrackerConfig

The configuration.

settings_class: Type[VertexExperimentTrackerSettings] property

Returns the BaseSettings settings class.

Returns:

Type Description
Type[VertexExperimentTrackerSettings]

The settings class.

Functions
cleanup_step_run(info: StepRunInfo, step_failed: bool) -> None

Stops the VertexAI run.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required
step_failed bool

Whether the step failed or not.

required
Source code in src/zenml/integrations/gcp/experiment_trackers/vertex_experiment_tracker.py
202
203
204
205
206
207
208
209
210
211
212
213
214
def cleanup_step_run(self, info: "StepRunInfo", step_failed: bool) -> None:
    """Stops the VertexAI run.

    Args:
        info: Info about the step that was executed.
        step_failed: Whether the step failed or not.
    """
    state = (
        execution.Execution.State.FAILED
        if step_failed
        else execution.Execution.State.COMPLETE
    )
    aiplatform.end_run(state=state)
get_step_run_metadata(info: StepRunInfo) -> Dict[str, MetadataType]

Get component- and step-specific metadata after a step ran.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that was executed.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/gcp/experiment_trackers/vertex_experiment_tracker.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def get_step_run_metadata(
    self, info: "StepRunInfo"
) -> Dict[str, "MetadataType"]:
    """Get component- and step-specific metadata after a step ran.

    Args:
        info: Info about the step that was executed.

    Returns:
        A dictionary of metadata.
    """
    experiment_name = self._get_experiment_name(info=info)
    run_name = self._get_run_name(info=info)
    tensorboard_resource_name = self._get_tensorboard_resource_name(
        experiment=experiment_name
    )
    dashboard_url = self._get_dashboard_url(experiment=experiment_name)
    return {
        METADATA_EXPERIMENT_TRACKER_URL: Uri(dashboard_url),
        "tensorboard_resource_name": tensorboard_resource_name or "",
        "vertex_run_name": run_name,
    }
prepare_step_run(info: StepRunInfo) -> None

Configures a VertexAI run.

Parameters:

Name Type Description Default
info StepRunInfo

Info about the step that will be executed.

required
Source code in src/zenml/integrations/gcp/experiment_trackers/vertex_experiment_tracker.py
65
66
67
68
69
70
71
72
73
def prepare_step_run(self, info: "StepRunInfo") -> None:
    """Configures a VertexAI run.

    Args:
        info: Info about the step that will be executed.
    """
    self._initialize_vertex(info=info)
    self.experiment_name = self._get_experiment_name(info=info)
    self.run_name = self._get_run_name(info=info)
Functions

flavors

GCP integration flavors.

Classes
GCPArtifactStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseArtifactStoreConfig, AuthenticationConfigMixin

Configuration for GCP Artifact Store.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
GCPArtifactStoreFlavor

Bases: BaseArtifactStoreFlavor

Flavor of the GCP artifact store.

Attributes
config_class: Type[GCPArtifactStoreConfig] property

Returns GCPArtifactStoreConfig config class.

Returns:

Type Description
Type[GCPArtifactStoreConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GCPArtifactStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[GCPArtifactStore]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

GCPImageBuilderConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseImageBuilderConfig, GoogleCredentialsConfigMixin

Google Cloud Builder image builder configuration.

Attributes:

Name Type Description
cloud_builder_image str

The name of the Docker image to use for the build steps. Defaults to gcr.io/cloud-builders/docker.

network str

The network name to which the build container will be attached while building the Docker image. More information about this: https://cloud.google.com/build/docs/build-config-file-schema#network. Defaults to cloudbuild.

build_timeout PositiveInt

The timeout of the build in seconds. More information about this parameter: https://cloud.google.com/build/docs/build-config-file-schema#timeout_2 Defaults to 3600.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
GCPImageBuilderFlavor

Bases: BaseImageBuilderFlavor

Google Cloud Builder image builder flavor.

Attributes
config_class: Type[BaseImageBuilderConfig] property

The config class.

Returns:

Type Description
Type[BaseImageBuilderConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GCPImageBuilder] property

Implementation class.

Returns:

Type Description
Type[GCPImageBuilder]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

The flavor name.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

VertexExperimentTrackerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseExperimentTrackerConfig, GoogleCredentialsConfigMixin, VertexExperimentTrackerSettings

Config for the VertexAI experiment tracker.

Attributes:

Name Type Description
location Optional[str]

Optional. The default location to use when making API calls. If not set defaults to us-central1.

staging_bucket Optional[str]

Optional. The default staging bucket to use to stage artifacts when making API calls. In the form gs://...

network Optional[str]

Optional. The full name of the Compute Engine network to which jobs and resources should be peered. E.g. "projects/12345/global/networks/myVPC". Private services access must already be configured for the network. If specified, all eligible jobs and resources created will be peered with this VPC.

encryption_spec_key_name Optional[str]

Optional. The Cloud KMS resource identifier of the customer managed encryption key used to protect a resource. Has the form: projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key. The key needs to be in the same region as where the compute resource is created.

api_endpoint str
Optional. The desired API endpoint,
e.g., us-central1-aiplatform.googleapis.com
api_key str

Optional. The API key to use for service calls. NOTE: Not all services support API keys.

api_transport str

Optional. The transport method which is either 'grpc' or 'rest'. NOTE: "rest" transport functionality is currently in a beta state (preview).

request_metadata Optional[Dict[str, Any]]

Optional. Additional gRPC metadata to send with every client request.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
VertexExperimentTrackerFlavor

Bases: BaseExperimentTrackerFlavor

Flavor for the VertexAI experiment tracker.

Attributes
config_class: Type[VertexExperimentTrackerConfig] property

Returns VertexExperimentTrackerConfig config class.

Returns:

Type Description
Type[VertexExperimentTrackerConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VertexExperimentTracker] property

Implementation class for this flavor.

Returns:

Type Description
Type[VertexExperimentTracker]

The implementation class.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

VertexOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, GoogleCredentialsConfigMixin, VertexOrchestratorSettings

Configuration for the Vertex orchestrator.

Attributes:

Name Type Description
location str

Name of GCP region where the pipeline job will be executed. Vertex AI Pipelines is available in the following regions: https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability

pipeline_root Optional[str]

a Cloud Storage URI that will be used by the Vertex AI Pipelines. If not provided but the artifact store in the stack used to execute the pipeline is a zenml.integrations.gcp.artifact_stores.GCPArtifactStore, then a subdirectory of the artifact store will be used.

encryption_spec_key_name Optional[str]

The Cloud KMS resource identifier of the customer managed encryption key used to protect the job. Has the form: projects/<PROJECT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY> . The key needs to be in the same region as where the compute resource is created.

workload_service_account Optional[str]

the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. If not provided, the Compute Engine default service account for the GCP project in which the pipeline is running is used.

function_service_account Optional[str]

the service account for cloud function run-as account, for scheduled pipelines. This service account must have the act-as permission on the workload_service_account. If not provided, the Compute Engine default service account for the GCP project in which the pipeline is running is used.

scheduler_service_account Optional[str]

the service account used by the Google Cloud Scheduler to trigger and authenticate to the pipeline Cloud Function on a schedule. If not provided, the Compute Engine default service account for the GCP project in which the pipeline is running is used.

network Optional[str]

the full name of the Compute Engine Network to which the job should be peered. For example, projects/12345/global/networks/myVPC If not provided, the job will not be peered with any network.

cpu_limit Optional[str]

The maximum CPU limit for this operator. This string value can be a number (integer value for number of CPUs) as string, or a number followed by "m", which means 1/1000. You can specify at most 96 CPUs. (see. https://cloud.google.com/vertex-ai/docs/pipelines/machine-types)

memory_limit Optional[str]

The maximum memory limit for this operator. This string value can be a number, or a number followed by "K" (kilobyte), "M" (megabyte), or "G" (gigabyte). At most 624GB is supported.

gpu_limit Optional[int]

The GPU limit (positive number) for the operator. For more information about GPU resources, see: https://cloud.google.com/vertex-ai/docs/training/configure-compute#specifying_gpus

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

VertexOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Vertex Orchestrator flavor.

Attributes
config_class: Type[VertexOrchestratorConfig] property

Returns VertexOrchestratorConfig config class.

Returns:

Type Description
Type[VertexOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VertexOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[VertexOrchestrator]

Implementation class for this flavor.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

VertexStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, GoogleCredentialsConfigMixin, VertexStepOperatorSettings

Configuration for the Vertex step operator.

Attributes:

Name Type Description
region str

Region name, e.g., europe-west1.

encryption_spec_key_name Optional[str]

Encryption spec key name.

network Optional[str]

The full name of the Compute Engine network to which the Job should be peered. For example, projects/12345/global/networks/myVPC

reserved_ip_ranges Optional[str]

A list of names for the reserved ip ranges under the VPC network that can be used for this job. If set, we will deploy the job within the provided ip ranges. Otherwise, the job will be deployed to any ip ranges under the provided VPC network.

service_account Optional[str]

Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

VertexStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Vertex Step Operator flavor.

Attributes
config_class: Type[VertexStepOperatorConfig] property

Returns VertexStepOperatorConfig config class.

Returns:

Type Description
Type[VertexStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VertexStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[VertexStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

Modules
gcp_artifact_store_flavor

GCP artifact store flavor.

Classes
GCPArtifactStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseArtifactStoreConfig, AuthenticationConfigMixin

Configuration for GCP Artifact Store.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
GCPArtifactStoreFlavor

Bases: BaseArtifactStoreFlavor

Flavor of the GCP artifact store.

Attributes
config_class: Type[GCPArtifactStoreConfig] property

Returns GCPArtifactStoreConfig config class.

Returns:

Type Description
Type[GCPArtifactStoreConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GCPArtifactStore] property

Implementation class for this flavor.

Returns:

Type Description
Type[GCPArtifactStore]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

gcp_image_builder_flavor

Google Cloud image builder flavor.

Classes
GCPImageBuilderConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseImageBuilderConfig, GoogleCredentialsConfigMixin

Google Cloud Builder image builder configuration.

Attributes:

Name Type Description
cloud_builder_image str

The name of the Docker image to use for the build steps. Defaults to gcr.io/cloud-builders/docker.

network str

The network name to which the build container will be attached while building the Docker image. More information about this: https://cloud.google.com/build/docs/build-config-file-schema#network. Defaults to cloudbuild.

build_timeout PositiveInt

The timeout of the build in seconds. More information about this parameter: https://cloud.google.com/build/docs/build-config-file-schema#timeout_2 Defaults to 3600.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
GCPImageBuilderFlavor

Bases: BaseImageBuilderFlavor

Google Cloud Builder image builder flavor.

Attributes
config_class: Type[BaseImageBuilderConfig] property

The config class.

Returns:

Type Description
Type[BaseImageBuilderConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[GCPImageBuilder] property

Implementation class.

Returns:

Type Description
Type[GCPImageBuilder]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

The flavor name.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

vertex_experiment_tracker_flavor

Vertex experiment tracker flavor.

Classes
VertexExperimentTrackerConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseExperimentTrackerConfig, GoogleCredentialsConfigMixin, VertexExperimentTrackerSettings

Config for the VertexAI experiment tracker.

Attributes:

Name Type Description
location Optional[str]

Optional. The default location to use when making API calls. If not set defaults to us-central1.

staging_bucket Optional[str]

Optional. The default staging bucket to use to stage artifacts when making API calls. In the form gs://...

network Optional[str]

Optional. The full name of the Compute Engine network to which jobs and resources should be peered. E.g. "projects/12345/global/networks/myVPC". Private services access must already be configured for the network. If specified, all eligible jobs and resources created will be peered with this VPC.

encryption_spec_key_name Optional[str]

Optional. The Cloud KMS resource identifier of the customer managed encryption key used to protect a resource. Has the form: projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key. The key needs to be in the same region as where the compute resource is created.

api_endpoint str
Optional. The desired API endpoint,
e.g., us-central1-aiplatform.googleapis.com
api_key str

Optional. The API key to use for service calls. NOTE: Not all services support API keys.

api_transport str

Optional. The transport method which is either 'grpc' or 'rest'. NOTE: "rest" transport functionality is currently in a beta state (preview).

request_metadata Optional[Dict[str, Any]]

Optional. Additional gRPC metadata to send with every client request.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
VertexExperimentTrackerFlavor

Bases: BaseExperimentTrackerFlavor

Flavor for the VertexAI experiment tracker.

Attributes
config_class: Type[VertexExperimentTrackerConfig] property

Returns VertexExperimentTrackerConfig config class.

Returns:

Type Description
Type[VertexExperimentTrackerConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VertexExperimentTracker] property

Implementation class for this flavor.

Returns:

Type Description
Type[VertexExperimentTracker]

The implementation class.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

VertexExperimentTrackerSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the VertexAI experiment tracker.

Attributes:

Name Type Description
experiment Optional[str]

The VertexAI experiment name.

experiment_tensorboard Optional[Union[str, bool]]

The VertexAI experiment tensorboard.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions
vertex_orchestrator_flavor

Vertex orchestrator flavor.

Classes
VertexOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, GoogleCredentialsConfigMixin, VertexOrchestratorSettings

Configuration for the Vertex orchestrator.

Attributes:

Name Type Description
location str

Name of GCP region where the pipeline job will be executed. Vertex AI Pipelines is available in the following regions: https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability

pipeline_root Optional[str]

a Cloud Storage URI that will be used by the Vertex AI Pipelines. If not provided but the artifact store in the stack used to execute the pipeline is a zenml.integrations.gcp.artifact_stores.GCPArtifactStore, then a subdirectory of the artifact store will be used.

encryption_spec_key_name Optional[str]

The Cloud KMS resource identifier of the customer managed encryption key used to protect the job. Has the form: projects/<PROJECT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY> . The key needs to be in the same region as where the compute resource is created.

workload_service_account Optional[str]

the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. If not provided, the Compute Engine default service account for the GCP project in which the pipeline is running is used.

function_service_account Optional[str]

the service account for cloud function run-as account, for scheduled pipelines. This service account must have the act-as permission on the workload_service_account. If not provided, the Compute Engine default service account for the GCP project in which the pipeline is running is used.

scheduler_service_account Optional[str]

the service account used by the Google Cloud Scheduler to trigger and authenticate to the pipeline Cloud Function on a schedule. If not provided, the Compute Engine default service account for the GCP project in which the pipeline is running is used.

network Optional[str]

the full name of the Compute Engine Network to which the job should be peered. For example, projects/12345/global/networks/myVPC If not provided, the job will not be peered with any network.

cpu_limit Optional[str]

The maximum CPU limit for this operator. This string value can be a number (integer value for number of CPUs) as string, or a number followed by "m", which means 1/1000. You can specify at most 96 CPUs. (see. https://cloud.google.com/vertex-ai/docs/pipelines/machine-types)

memory_limit Optional[str]

The maximum memory limit for this operator. This string value can be a number, or a number followed by "K" (kilobyte), "M" (megabyte), or "G" (gigabyte). At most 624GB is supported.

gpu_limit Optional[int]

The GPU limit (positive number) for the operator. For more information about GPU resources, see: https://cloud.google.com/vertex-ai/docs/training/configure-compute#specifying_gpus

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronous or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronous or not.

VertexOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Vertex Orchestrator flavor.

Attributes
config_class: Type[VertexOrchestratorConfig] property

Returns VertexOrchestratorConfig config class.

Returns:

Type Description
Type[VertexOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VertexOrchestrator] property

Implementation class for this flavor.

Returns:

Type Description
Type[VertexOrchestrator]

Implementation class for this flavor.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

VertexOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the Vertex orchestrator.

Attributes:

Name Type Description
synchronous bool

If True, the client running a pipeline using this orchestrator waits until all steps finish running. If False, the client returns immediately and the pipeline is executed asynchronously. Defaults to True.

labels Dict[str, str]

Labels to assign to the pipeline job.

node_selector_constraint Optional[Tuple[str, str]]

Each constraint is a key-value pair label. For the container to be eligible to run on a node, the node must have each of the constraints appeared as labels. For example a GPU type can be providing by one of the following tuples: - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_A100") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_K80") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P4") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_P100") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_T4") - ("cloud.google.com/gke-accelerator", "NVIDIA_TESLA_V100") Hint: the selected region (location) must provide the requested accelerator (see https://cloud.google.com/compute/docs/gpus/gpu-regions-zones).

pod_settings Optional[KubernetesPodSettings]

Pod settings to apply.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
vertex_step_operator_flavor

Vertex step operator flavor.

Classes
VertexStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, GoogleCredentialsConfigMixin, VertexStepOperatorSettings

Configuration for the Vertex step operator.

Attributes:

Name Type Description
region str

Region name, e.g., europe-west1.

encryption_spec_key_name Optional[str]

Encryption spec key name.

network Optional[str]

The full name of the Compute Engine network to which the Job should be peered. For example, projects/12345/global/networks/myVPC

reserved_ip_ranges Optional[str]

A list of names for the reserved ip ranges under the VPC network that can be used for this job. If set, we will deploy the job within the provided ip ranges. Otherwise, the job will be deployed to any ip ranges under the provided VPC network.

service_account Optional[str]

Specifies the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

VertexStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Vertex Step Operator flavor.

Attributes
config_class: Type[VertexStepOperatorConfig] property

Returns VertexStepOperatorConfig config class.

Returns:

Type Description
Type[VertexStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[VertexStepOperator] property

Implementation class for this flavor.

Returns:

Type Description
Type[VertexStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

VertexStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: VertexCustomJobParameters, BaseSettings

Settings for the Vertex step operator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)

google_credentials_mixin

Implementation of the Google credentials mixin.

Classes
GoogleCredentialsConfigMixin(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: StackComponentConfig

Config mixin for Google Cloud Platform credentials.

Attributes:

Name Type Description
project Optional[str]

GCP project name. If None, the project will be inferred from the environment.

service_account_path Optional[str]

path to the service account credentials file to be used for authentication. If not provided, the default credentials will be used.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
GoogleCredentialsMixin(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: StackComponent

StackComponent mixin to get Google Cloud Platform credentials.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: GoogleCredentialsConfigMixin property

Returns the GoogleCredentialsConfigMixin config.

Returns:

Type Description
GoogleCredentialsConfigMixin

The configuration.

Functions

image_builders

Initialization for the GCP image builder.

Classes
GCPImageBuilder(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseImageBuilder, GoogleCredentialsMixin

Google Cloud Builder image builder implementation.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: GCPImageBuilderConfig property

The stack component configuration.

Returns:

Type Description
GCPImageBuilderConfig

The configuration.

is_building_locally: bool property

Whether the image builder builds the images on the client machine.

Returns:

Type Description
bool

True if the image builder builds locally, False otherwise.

validator: Optional[StackValidator] property

Validates the stack for the GCP Image Builder.

The GCP Image Builder requires a remote container registry to push the image to, and a GCP Artifact Store to upload the build context, so Cloud Build can access it.

Returns:

Type Description
Optional[StackValidator]

Stack validator.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Dict[str, Any], container_registry: Optional[BaseContainerRegistry] = None) -> str

Builds and pushes a Docker image.

Parameters:

Name Type Description Default
image_name str

Name of the image to build and push.

required
build_context BuildContext

The build context to use for the image.

required
docker_build_options Dict[str, Any]

Docker build options.

required
container_registry Optional[BaseContainerRegistry]

Optional container registry to push to.

None

Returns:

Type Description
str

The Docker image name with digest.

Raises:

Type Description
RuntimeError

If no container registry is passed.

RuntimeError

If the Cloud Build build fails.

Source code in src/zenml/integrations/gcp/image_builders/gcp_image_builder.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Dict[str, Any],
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Builds and pushes a Docker image.

    Args:
        image_name: Name of the image to build and push.
        build_context: The build context to use for the image.
        docker_build_options: Docker build options.
        container_registry: Optional container registry to push to.

    Returns:
        The Docker image name with digest.

    Raises:
        RuntimeError: If no container registry is passed.
        RuntimeError: If the Cloud Build build fails.
    """
    if not container_registry:
        raise RuntimeError(
            "The GCP Image Builder requires a container registry to push "
            "the image to. Please provide one and try again."
        )

    logger.info("Using Cloud Build to build image `%s`", image_name)
    cloud_build_context = self._upload_build_context(
        build_context=build_context,
        parent_path_directory_name="cloud-build-contexts",
    )
    build = self._configure_cloud_build(
        image_name=image_name,
        cloud_build_context=cloud_build_context,
        build_options=docker_build_options,
    )
    image_digest = self._run_cloud_build(build=build)
    image_name_without_tag, _ = image_name.rsplit(":", 1)
    image_name_with_digest = f"{image_name_without_tag}@{image_digest}"
    return image_name_with_digest
Modules
gcp_image_builder

Google Cloud Builder image builder implementation.

Classes
GCPImageBuilder(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseImageBuilder, GoogleCredentialsMixin

Google Cloud Builder image builder implementation.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: GCPImageBuilderConfig property

The stack component configuration.

Returns:

Type Description
GCPImageBuilderConfig

The configuration.

is_building_locally: bool property

Whether the image builder builds the images on the client machine.

Returns:

Type Description
bool

True if the image builder builds locally, False otherwise.

validator: Optional[StackValidator] property

Validates the stack for the GCP Image Builder.

The GCP Image Builder requires a remote container registry to push the image to, and a GCP Artifact Store to upload the build context, so Cloud Build can access it.

Returns:

Type Description
Optional[StackValidator]

Stack validator.

Functions
build(image_name: str, build_context: BuildContext, docker_build_options: Dict[str, Any], container_registry: Optional[BaseContainerRegistry] = None) -> str

Builds and pushes a Docker image.

Parameters:

Name Type Description Default
image_name str

Name of the image to build and push.

required
build_context BuildContext

The build context to use for the image.

required
docker_build_options Dict[str, Any]

Docker build options.

required
container_registry Optional[BaseContainerRegistry]

Optional container registry to push to.

None

Returns:

Type Description
str

The Docker image name with digest.

Raises:

Type Description
RuntimeError

If no container registry is passed.

RuntimeError

If the Cloud Build build fails.

Source code in src/zenml/integrations/gcp/image_builders/gcp_image_builder.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def build(
    self,
    image_name: str,
    build_context: "BuildContext",
    docker_build_options: Dict[str, Any],
    container_registry: Optional["BaseContainerRegistry"] = None,
) -> str:
    """Builds and pushes a Docker image.

    Args:
        image_name: Name of the image to build and push.
        build_context: The build context to use for the image.
        docker_build_options: Docker build options.
        container_registry: Optional container registry to push to.

    Returns:
        The Docker image name with digest.

    Raises:
        RuntimeError: If no container registry is passed.
        RuntimeError: If the Cloud Build build fails.
    """
    if not container_registry:
        raise RuntimeError(
            "The GCP Image Builder requires a container registry to push "
            "the image to. Please provide one and try again."
        )

    logger.info("Using Cloud Build to build image `%s`", image_name)
    cloud_build_context = self._upload_build_context(
        build_context=build_context,
        parent_path_directory_name="cloud-build-contexts",
    )
    build = self._configure_cloud_build(
        image_name=image_name,
        cloud_build_context=cloud_build_context,
        build_options=docker_build_options,
    )
    image_digest = self._run_cloud_build(build=build)
    image_name_without_tag, _ = image_name.rsplit(":", 1)
    image_name_with_digest = f"{image_name_without_tag}@{image_digest}"
    return image_name_with_digest
Functions

orchestrators

Initialization for the VertexAI orchestrator.

Classes
VertexOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator, GoogleCredentialsMixin

Orchestrator responsible for running pipelines on Vertex AI.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: VertexOrchestratorConfig property

Returns the VertexOrchestratorConfig config.

Returns:

Type Description
VertexOrchestratorConfig

The configuration.

pipeline_directory: str property

Returns path to directory where kubeflow pipelines files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property

Returns path to the root directory for files for this orchestrator.

Returns:

Type Description
str

The path to the root directory for all files concerning this

str

orchestrator.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Vertex orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates that the stack contains a container registry.

Also validates that the artifact store is not local.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
compute_metadata(job: aiplatform.PipelineJob) -> Iterator[Dict[str, MetadataType]]

Generate run metadata based on the corresponding Vertex PipelineJob.

Parameters:

Name Type Description Default
job PipelineJob

The corresponding PipelineJob object.

required

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
def compute_metadata(
    self, job: aiplatform.PipelineJob
) -> Iterator[Dict[str, MetadataType]]:
    """Generate run metadata based on the corresponding Vertex PipelineJob.

    Args:
        job: The corresponding PipelineJob object.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    metadata: Dict[str, MetadataType] = {}

    # Orchestrator Run ID
    if run_id := self._compute_orchestrator_run_id(job):
        metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id

    # URL to the Vertex's pipeline view
    if orchestrator_url := self._compute_orchestrator_url(job):
        metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)

    # URL to the corresponding Logs Explorer page
    if logs_url := self._compute_orchestrator_logs_url(job):
        metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url)

    yield metadata
fetch_status(run: PipelineRunResponse) -> ExecutionStatus

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required

Returns:

Type Description
ExecutionStatus

the actual status of the pipeline job.

Raises:

Type Description
AssertionError

If the run was not executed by to this orchestrator.

ValueError

If it fetches an unknown state or if we can not fetch the orchestrator run ID.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.

    Returns:
        the actual status of the pipeline job.

    Raises:
        AssertionError: If the run was not executed by to this orchestrator.
        ValueError: If it fetches an unknown state or if we can not fetch
            the orchestrator run ID.
    """
    # Make sure that the stack exists and is accessible
    if run.stack is None:
        raise ValueError(
            "The stack that the run was executed on is not available "
            "anymore."
        )

    # Make sure that the run belongs to this orchestrator
    assert (
        self.id
        == run.stack.components[StackComponentType.ORCHESTRATOR][0].id
    )

    # Initialize the Vertex client
    credentials, project_id = self._get_authentication()
    aiplatform.init(
        project=project_id,
        location=self.config.location,
        credentials=credentials,
    )

    # Fetch the status of the PipelineJob
    if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
        run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
    elif run.orchestrator_run_id is not None:
        run_id = run.orchestrator_run_id
    else:
        raise ValueError(
            "Can not find the orchestrator run ID, thus can not fetch "
            "the status."
        )
    status = aiplatform.PipelineJob.get(run_id).state

    # Map the potential outputs to ZenML ExecutionStatus. Potential values:
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/describe_pipeline_execution.html#
    if status in [PipelineState.PIPELINE_STATE_UNSPECIFIED]:
        return run.status
    elif status in [
        PipelineState.PIPELINE_STATE_QUEUED,
        PipelineState.PIPELINE_STATE_PENDING,
    ]:
        return ExecutionStatus.INITIALIZING
    elif status in [
        PipelineState.PIPELINE_STATE_RUNNING,
        PipelineState.PIPELINE_STATE_PAUSED,
    ]:
        return ExecutionStatus.RUNNING
    elif status in [PipelineState.PIPELINE_STATE_SUCCEEDED]:
        return ExecutionStatus.COMPLETED

    elif status in [
        PipelineState.PIPELINE_STATE_FAILED,
        PipelineState.PIPELINE_STATE_CANCELLING,
        PipelineState.PIPELINE_STATE_CANCELLED,
    ]:
        return ExecutionStatus.FAILED
    else:
        raise ValueError("Unknown status for the pipeline job.")
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_VERTEX_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_VERTEX_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    run_url = (
        f"https://console.cloud.google.com/vertex-ai/locations/"
        f"{self.config.location}/pipelines/runs/"
        f"{self.get_orchestrator_run_id()}"
    )
    if self.config.project:
        run_url += f"?project={self.config.project}"
    return {
        METADATA_ORCHESTRATOR_URL: Uri(run_url),
    }
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Iterator[Dict[str, MetadataType]]

Creates a KFP JSON pipeline.

noqa: DAR402

This is an intermediary representation of the pipeline which is then deployed to Vertex AI Pipelines service.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a Docker image that contains the code for the pipeline, all steps the context around these files.

Based on this Docker image a callable is created which builds container_ops for each step (_construct_kfp_pipeline). The function kfp.components.load_component_from_text is used to create the ContainerOp, because using the dsl.ContainerOp class directly is deprecated when using the Kubeflow SDK v2. The step entrypoint command with the entrypoint arguments is the command that will be executed by the container created using the previously created Docker image.

This callable is then compiled into a JSON file that is used as the intermediary representation of the Kubeflow pipeline.

This file then is submitted to the Vertex AI Pipelines service for execution.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
ValueError

If the attribute pipeline_root is not set, and it can be not generated using the path of the artifact store in the stack because it is not a zenml.integrations.gcp.artifact_store.GCPArtifactStore. Also gets raised if attempting to schedule pipeline run without using the zenml.integrations.gcp.artifact_store.GCPArtifactStore.

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Iterator[Dict[str, MetadataType]]:
    """Creates a KFP JSON pipeline.

    # noqa: DAR402

    This is an intermediary representation of the pipeline which is then
    deployed to Vertex AI Pipelines service.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()` method
    builds a Docker image that contains the code for the pipeline, all steps
    the context around these files.

    Based on this Docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`). The function
    `kfp.components.load_component_from_text` is used to create the
    `ContainerOp`, because using the `dsl.ContainerOp` class directly is
    deprecated when using the Kubeflow SDK v2. The step entrypoint command
    with the entrypoint arguments is the command that will be executed by
    the container created using the previously created Docker image.

    This callable is then compiled into a JSON file that is used as the
    intermediary representation of the Kubeflow pipeline.

    This file then is submitted to the Vertex AI Pipelines service for
    execution.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        ValueError: If the attribute `pipeline_root` is not set, and it
            can be not generated using the path of the artifact store in the
            stack because it is not a
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`. Also gets
            raised if attempting to schedule pipeline run without using the
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    # If the `pipeline_root` has not been defined in the orchestrator
    # configuration,
    # try to create it from the artifact store if it is a
    # `GCPArtifactStore`.
    if not self.config.pipeline_root:
        artifact_store = stack.artifact_store
        self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{deployment.pipeline_configuration.name}/{orchestrator_run_name}"
        logger.info(
            "The attribute `pipeline_root` has not been set in the "
            "orchestrator configuration. One has been generated "
            "automatically based on the path of the `GCPArtifactStore` "
            "artifact store in the stack used to execute the pipeline. "
            "The generated `pipeline_root` is `%s`.",
            self._pipeline_root,
        )
    else:
        self._pipeline_root = self.config.pipeline_root

    def _create_dynamic_pipeline() -> Any:
        """Create a dynamic pipeline including each step.

        Returns:
            pipeline_func
        """
        step_name_to_dynamic_component: Dict[str, BaseComponent] = {}

        for step_name, step in deployment.step_configurations.items():
            image = self.get_image(
                deployment=deployment,
                step_name=step_name,
            )
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment.id,
                )
            )
            component = self._create_container_component(
                image, command, arguments, step_name
            )
            step_settings = cast(
                VertexOrchestratorSettings, self.get_settings(step)
            )
            pod_settings = step_settings.pod_settings
            if pod_settings:
                if pod_settings.host_ipc:
                    logger.warning(
                        "Host IPC is set to `True` but not supported in "
                        "this orchestrator. Ignoring..."
                    )
                if pod_settings.affinity:
                    logger.warning(
                        "Affinity is set but not supported in Vertex with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.tolerations:
                    logger.warning(
                        "Tolerations are set but not supported in "
                        "Vertex with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volumes:
                    logger.warning(
                        "Volumes are set but not supported in Vertex with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volume_mounts:
                    logger.warning(
                        "Volume mounts are set but not supported in "
                        "Vertex with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.env or pod_settings.env_from:
                    logger.warning(
                        "Environment variables are set but not supported "
                        "in Vertex with Vertex Pipelines 2.x. Ignoring..."
                    )
                for key in pod_settings.node_selectors:
                    if (
                        key
                        != GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                    ):
                        logger.warning(
                            "Vertex only allows the %s node selector, "
                            "ignoring the node selector %s.",
                            GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            key,
                        )

            step_name_to_dynamic_component[step_name] = component

        environment[ENV_ZENML_VERTEX_RUN_ID] = (
            dsl.PIPELINE_JOB_NAME_PLACEHOLDER
        )

        @dsl.pipeline(  # type: ignore[misc]
            display_name=orchestrator_run_name,
        )
        def dynamic_pipeline() -> None:
            """Dynamic pipeline."""
            # iterate through the components one by one
            # (from step_name_to_dynamic_component)
            for (
                component_name,
                component,
            ) in step_name_to_dynamic_component.items():
                # for each component, check to see what other steps are
                # upstream of it
                step = deployment.step_configurations[component_name]
                upstream_step_components = [
                    step_name_to_dynamic_component[upstream_step_name]
                    for upstream_step_name in step.spec.upstream_steps
                ]

                step_settings = cast(
                    VertexOrchestratorSettings, self.get_settings(step)
                )

                use_custom_training_job = (
                    step_settings.custom_job_parameters is not None
                )

                if use_custom_training_job:
                    if not step.config.resource_settings.empty:
                        logger.warning(
                            "Ignoring resource settings because "
                            "the step is running as a custom training job. "
                            "Use `custom_job_parameters.machine_type` "
                            "to configure the machine type instead."
                        )
                    if step_settings.node_selector_constraint:
                        logger.warning(
                            "Ignoring node selector constraint because "
                            "the step is running as a custom training job. "
                            "Use `custom_job_parameters.accelerator_type` "
                            "to configure the accelerator type instead."
                        )
                    component = self._convert_to_custom_training_job(
                        component,
                        settings=step_settings,
                        environment=environment,
                    )
                    task = (
                        component()
                        .set_display_name(name=component_name)
                        .set_caching_options(enable_caching=False)
                        .after(*upstream_step_components)
                    )
                else:
                    task = (
                        component()
                        .set_display_name(
                            name=component_name,
                        )
                        .set_caching_options(enable_caching=False)
                        .after(*upstream_step_components)
                    )
                    for key, value in environment.items():
                        task = task.set_env_variable(name=key, value=value)

                    pod_settings = step_settings.pod_settings

                    node_selector_constraint: Optional[Tuple[str, str]] = (
                        None
                    )
                    if pod_settings and (
                        GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                        in pod_settings.node_selectors.keys()
                    ):
                        node_selector_constraint = (
                            GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            pod_settings.node_selectors[
                                GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                            ],
                        )
                    elif step_settings.node_selector_constraint:
                        node_selector_constraint = (
                            GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            step_settings.node_selector_constraint[1],
                        )

                    self._configure_container_resources(
                        dynamic_component=task,
                        resource_settings=step.config.resource_settings,
                        node_selector_constraint=node_selector_constraint,
                    )

        return dynamic_pipeline

    # Save the generated pipeline to a file.
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory,
        f"{orchestrator_run_name}.json",
    )

    # Compile the pipeline using the Kubeflow SDK V2 compiler that allows
    # to generate a JSON representation of the pipeline that can be later
    # upload to Vertex AI Pipelines service.
    Compiler().compile(
        pipeline_func=_create_dynamic_pipeline(),
        package_path=pipeline_file_path,
        pipeline_name=_clean_pipeline_name(
            deployment.pipeline_configuration.name
        ),
    )

    logger.info(
        "Writing Vertex workflow definition to `%s`.", pipeline_file_path
    )

    settings = cast(
        VertexOrchestratorSettings, self.get_settings(deployment)
    )

    # Using the Google Cloud AIPlatform client, upload and execute the
    # pipeline on the Vertex AI Pipelines service.
    if metadata := self._upload_and_run_pipeline(
        pipeline_name=deployment.pipeline_configuration.name,
        pipeline_file_path=pipeline_file_path,
        run_name=orchestrator_run_name,
        settings=settings,
        schedule=deployment.schedule,
    ):
        yield from metadata
prepare_pipeline_deployment(deployment: PipelineDeploymentResponse, stack: Stack) -> None

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required

Raises:

Type Description
ValueError

If cron_expression is not in passed Schedule.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.

    Raises:
        ValueError: If `cron_expression` is not in passed Schedule.
    """
    if deployment.schedule:
        if (
            deployment.schedule.catchup
            or deployment.schedule.interval_second
        ):
            logger.warning(
                "Vertex orchestrator only uses schedules with the "
                "`cron_expression` property, with optional `start_time` "
                "and/or `end_time`. All other properties are ignored."
            )
        if deployment.schedule.cron_expression is None:
            raise ValueError(
                "Property `cron_expression` must be set when passing "
                "schedule to a Vertex orchestrator."
            )
Modules
vertex_orchestrator

Implementation of the VertexAI orchestrator.

Classes
VertexOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator, GoogleCredentialsMixin

Orchestrator responsible for running pipelines on Vertex AI.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: VertexOrchestratorConfig property

Returns the VertexOrchestratorConfig config.

Returns:

Type Description
VertexOrchestratorConfig

The configuration.

pipeline_directory: str property

Returns path to directory where kubeflow pipelines files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property

Returns path to the root directory for files for this orchestrator.

Returns:

Type Description
str

The path to the root directory for all files concerning this

str

orchestrator.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Vertex orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates that the stack contains a container registry.

Also validates that the artifact store is not local.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
compute_metadata(job: aiplatform.PipelineJob) -> Iterator[Dict[str, MetadataType]]

Generate run metadata based on the corresponding Vertex PipelineJob.

Parameters:

Name Type Description Default
job PipelineJob

The corresponding PipelineJob object.

required

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
def compute_metadata(
    self, job: aiplatform.PipelineJob
) -> Iterator[Dict[str, MetadataType]]:
    """Generate run metadata based on the corresponding Vertex PipelineJob.

    Args:
        job: The corresponding PipelineJob object.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    metadata: Dict[str, MetadataType] = {}

    # Orchestrator Run ID
    if run_id := self._compute_orchestrator_run_id(job):
        metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id

    # URL to the Vertex's pipeline view
    if orchestrator_url := self._compute_orchestrator_url(job):
        metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)

    # URL to the corresponding Logs Explorer page
    if logs_url := self._compute_orchestrator_logs_url(job):
        metadata[METADATA_ORCHESTRATOR_LOGS_URL] = Uri(logs_url)

    yield metadata
fetch_status(run: PipelineRunResponse) -> ExecutionStatus

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required

Returns:

Type Description
ExecutionStatus

the actual status of the pipeline job.

Raises:

Type Description
AssertionError

If the run was not executed by to this orchestrator.

ValueError

If it fetches an unknown state or if we can not fetch the orchestrator run ID.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.

    Returns:
        the actual status of the pipeline job.

    Raises:
        AssertionError: If the run was not executed by to this orchestrator.
        ValueError: If it fetches an unknown state or if we can not fetch
            the orchestrator run ID.
    """
    # Make sure that the stack exists and is accessible
    if run.stack is None:
        raise ValueError(
            "The stack that the run was executed on is not available "
            "anymore."
        )

    # Make sure that the run belongs to this orchestrator
    assert (
        self.id
        == run.stack.components[StackComponentType.ORCHESTRATOR][0].id
    )

    # Initialize the Vertex client
    credentials, project_id = self._get_authentication()
    aiplatform.init(
        project=project_id,
        location=self.config.location,
        credentials=credentials,
    )

    # Fetch the status of the PipelineJob
    if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
        run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
    elif run.orchestrator_run_id is not None:
        run_id = run.orchestrator_run_id
    else:
        raise ValueError(
            "Can not find the orchestrator run ID, thus can not fetch "
            "the status."
        )
    status = aiplatform.PipelineJob.get(run_id).state

    # Map the potential outputs to ZenML ExecutionStatus. Potential values:
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/describe_pipeline_execution.html#
    if status in [PipelineState.PIPELINE_STATE_UNSPECIFIED]:
        return run.status
    elif status in [
        PipelineState.PIPELINE_STATE_QUEUED,
        PipelineState.PIPELINE_STATE_PENDING,
    ]:
        return ExecutionStatus.INITIALIZING
    elif status in [
        PipelineState.PIPELINE_STATE_RUNNING,
        PipelineState.PIPELINE_STATE_PAUSED,
    ]:
        return ExecutionStatus.RUNNING
    elif status in [PipelineState.PIPELINE_STATE_SUCCEEDED]:
        return ExecutionStatus.COMPLETED

    elif status in [
        PipelineState.PIPELINE_STATE_FAILED,
        PipelineState.PIPELINE_STATE_CANCELLING,
        PipelineState.PIPELINE_STATE_CANCELLED,
    ]:
        return ExecutionStatus.FAILED
    else:
        raise ValueError("Unknown status for the pipeline job.")
get_orchestrator_run_id() -> str

Returns the active orchestrator run id.

Raises:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_VERTEX_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_VERTEX_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    run_url = (
        f"https://console.cloud.google.com/vertex-ai/locations/"
        f"{self.config.location}/pipelines/runs/"
        f"{self.get_orchestrator_run_id()}"
    )
    if self.config.project:
        run_url += f"?project={self.config.project}"
    return {
        METADATA_ORCHESTRATOR_URL: Uri(run_url),
    }
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Iterator[Dict[str, MetadataType]]

Creates a KFP JSON pipeline.

noqa: DAR402

This is an intermediary representation of the pipeline which is then deployed to Vertex AI Pipelines service.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a Docker image that contains the code for the pipeline, all steps the context around these files.

Based on this Docker image a callable is created which builds container_ops for each step (_construct_kfp_pipeline). The function kfp.components.load_component_from_text is used to create the ContainerOp, because using the dsl.ContainerOp class directly is deprecated when using the Kubeflow SDK v2. The step entrypoint command with the entrypoint arguments is the command that will be executed by the container created using the previously created Docker image.

This callable is then compiled into a JSON file that is used as the intermediary representation of the Kubeflow pipeline.

This file then is submitted to the Vertex AI Pipelines service for execution.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
ValueError

If the attribute pipeline_root is not set, and it can be not generated using the path of the artifact store in the stack because it is not a zenml.integrations.gcp.artifact_store.GCPArtifactStore. Also gets raised if attempting to schedule pipeline run without using the zenml.integrations.gcp.artifact_store.GCPArtifactStore.

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Iterator[Dict[str, MetadataType]]:
    """Creates a KFP JSON pipeline.

    # noqa: DAR402

    This is an intermediary representation of the pipeline which is then
    deployed to Vertex AI Pipelines service.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()` method
    builds a Docker image that contains the code for the pipeline, all steps
    the context around these files.

    Based on this Docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`). The function
    `kfp.components.load_component_from_text` is used to create the
    `ContainerOp`, because using the `dsl.ContainerOp` class directly is
    deprecated when using the Kubeflow SDK v2. The step entrypoint command
    with the entrypoint arguments is the command that will be executed by
    the container created using the previously created Docker image.

    This callable is then compiled into a JSON file that is used as the
    intermediary representation of the Kubeflow pipeline.

    This file then is submitted to the Vertex AI Pipelines service for
    execution.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        ValueError: If the attribute `pipeline_root` is not set, and it
            can be not generated using the path of the artifact store in the
            stack because it is not a
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`. Also gets
            raised if attempting to schedule pipeline run without using the
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    orchestrator_run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    # If the `pipeline_root` has not been defined in the orchestrator
    # configuration,
    # try to create it from the artifact store if it is a
    # `GCPArtifactStore`.
    if not self.config.pipeline_root:
        artifact_store = stack.artifact_store
        self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{deployment.pipeline_configuration.name}/{orchestrator_run_name}"
        logger.info(
            "The attribute `pipeline_root` has not been set in the "
            "orchestrator configuration. One has been generated "
            "automatically based on the path of the `GCPArtifactStore` "
            "artifact store in the stack used to execute the pipeline. "
            "The generated `pipeline_root` is `%s`.",
            self._pipeline_root,
        )
    else:
        self._pipeline_root = self.config.pipeline_root

    def _create_dynamic_pipeline() -> Any:
        """Create a dynamic pipeline including each step.

        Returns:
            pipeline_func
        """
        step_name_to_dynamic_component: Dict[str, BaseComponent] = {}

        for step_name, step in deployment.step_configurations.items():
            image = self.get_image(
                deployment=deployment,
                step_name=step_name,
            )
            command = StepEntrypointConfiguration.get_entrypoint_command()
            arguments = (
                StepEntrypointConfiguration.get_entrypoint_arguments(
                    step_name=step_name,
                    deployment_id=deployment.id,
                )
            )
            component = self._create_container_component(
                image, command, arguments, step_name
            )
            step_settings = cast(
                VertexOrchestratorSettings, self.get_settings(step)
            )
            pod_settings = step_settings.pod_settings
            if pod_settings:
                if pod_settings.host_ipc:
                    logger.warning(
                        "Host IPC is set to `True` but not supported in "
                        "this orchestrator. Ignoring..."
                    )
                if pod_settings.affinity:
                    logger.warning(
                        "Affinity is set but not supported in Vertex with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.tolerations:
                    logger.warning(
                        "Tolerations are set but not supported in "
                        "Vertex with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volumes:
                    logger.warning(
                        "Volumes are set but not supported in Vertex with "
                        "Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.volume_mounts:
                    logger.warning(
                        "Volume mounts are set but not supported in "
                        "Vertex with Kubeflow Pipelines 2.x. Ignoring..."
                    )
                if pod_settings.env or pod_settings.env_from:
                    logger.warning(
                        "Environment variables are set but not supported "
                        "in Vertex with Vertex Pipelines 2.x. Ignoring..."
                    )
                for key in pod_settings.node_selectors:
                    if (
                        key
                        != GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                    ):
                        logger.warning(
                            "Vertex only allows the %s node selector, "
                            "ignoring the node selector %s.",
                            GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            key,
                        )

            step_name_to_dynamic_component[step_name] = component

        environment[ENV_ZENML_VERTEX_RUN_ID] = (
            dsl.PIPELINE_JOB_NAME_PLACEHOLDER
        )

        @dsl.pipeline(  # type: ignore[misc]
            display_name=orchestrator_run_name,
        )
        def dynamic_pipeline() -> None:
            """Dynamic pipeline."""
            # iterate through the components one by one
            # (from step_name_to_dynamic_component)
            for (
                component_name,
                component,
            ) in step_name_to_dynamic_component.items():
                # for each component, check to see what other steps are
                # upstream of it
                step = deployment.step_configurations[component_name]
                upstream_step_components = [
                    step_name_to_dynamic_component[upstream_step_name]
                    for upstream_step_name in step.spec.upstream_steps
                ]

                step_settings = cast(
                    VertexOrchestratorSettings, self.get_settings(step)
                )

                use_custom_training_job = (
                    step_settings.custom_job_parameters is not None
                )

                if use_custom_training_job:
                    if not step.config.resource_settings.empty:
                        logger.warning(
                            "Ignoring resource settings because "
                            "the step is running as a custom training job. "
                            "Use `custom_job_parameters.machine_type` "
                            "to configure the machine type instead."
                        )
                    if step_settings.node_selector_constraint:
                        logger.warning(
                            "Ignoring node selector constraint because "
                            "the step is running as a custom training job. "
                            "Use `custom_job_parameters.accelerator_type` "
                            "to configure the accelerator type instead."
                        )
                    component = self._convert_to_custom_training_job(
                        component,
                        settings=step_settings,
                        environment=environment,
                    )
                    task = (
                        component()
                        .set_display_name(name=component_name)
                        .set_caching_options(enable_caching=False)
                        .after(*upstream_step_components)
                    )
                else:
                    task = (
                        component()
                        .set_display_name(
                            name=component_name,
                        )
                        .set_caching_options(enable_caching=False)
                        .after(*upstream_step_components)
                    )
                    for key, value in environment.items():
                        task = task.set_env_variable(name=key, value=value)

                    pod_settings = step_settings.pod_settings

                    node_selector_constraint: Optional[Tuple[str, str]] = (
                        None
                    )
                    if pod_settings and (
                        GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                        in pod_settings.node_selectors.keys()
                    ):
                        node_selector_constraint = (
                            GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            pod_settings.node_selectors[
                                GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL
                            ],
                        )
                    elif step_settings.node_selector_constraint:
                        node_selector_constraint = (
                            GKE_ACCELERATOR_NODE_SELECTOR_CONSTRAINT_LABEL,
                            step_settings.node_selector_constraint[1],
                        )

                    self._configure_container_resources(
                        dynamic_component=task,
                        resource_settings=step.config.resource_settings,
                        node_selector_constraint=node_selector_constraint,
                    )

        return dynamic_pipeline

    # Save the generated pipeline to a file.
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory,
        f"{orchestrator_run_name}.json",
    )

    # Compile the pipeline using the Kubeflow SDK V2 compiler that allows
    # to generate a JSON representation of the pipeline that can be later
    # upload to Vertex AI Pipelines service.
    Compiler().compile(
        pipeline_func=_create_dynamic_pipeline(),
        package_path=pipeline_file_path,
        pipeline_name=_clean_pipeline_name(
            deployment.pipeline_configuration.name
        ),
    )

    logger.info(
        "Writing Vertex workflow definition to `%s`.", pipeline_file_path
    )

    settings = cast(
        VertexOrchestratorSettings, self.get_settings(deployment)
    )

    # Using the Google Cloud AIPlatform client, upload and execute the
    # pipeline on the Vertex AI Pipelines service.
    if metadata := self._upload_and_run_pipeline(
        pipeline_name=deployment.pipeline_configuration.name,
        pipeline_file_path=pipeline_file_path,
        run_name=orchestrator_run_name,
        settings=settings,
        schedule=deployment.schedule,
    ):
        yield from metadata
prepare_pipeline_deployment(deployment: PipelineDeploymentResponse, stack: Stack) -> None

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required

Raises:

Type Description
ValueError

If cron_expression is not in passed Schedule.

Source code in src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.

    Raises:
        ValueError: If `cron_expression` is not in passed Schedule.
    """
    if deployment.schedule:
        if (
            deployment.schedule.catchup
            or deployment.schedule.interval_second
        ):
            logger.warning(
                "Vertex orchestrator only uses schedules with the "
                "`cron_expression` property, with optional `start_time` "
                "and/or `end_time`. All other properties are ignored."
            )
        if deployment.schedule.cron_expression is None:
            raise ValueError(
                "Property `cron_expression` must be set when passing "
                "schedule to a Vertex orchestrator."
            )
Functions Modules

service_connectors

ZenML GCP Service Connector.

Classes
GCPServiceConnector(**kwargs: Any)

Bases: ServiceConnector

GCP service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Functions
get_session(auth_method: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None) -> Tuple[gcp_credentials.Credentials, Optional[datetime.datetime]]

Get a GCP session object with credentials for the specified resource.

Parameters:

Name Type Description Default
auth_method str

The authentication method to use.

required
resource_type Optional[str]

The resource type to get credentials for.

None
resource_id Optional[str]

The resource ID to get credentials for.

None

Returns:

Type Description
Credentials

GCP session with credentials for the specified resource and its

Optional[datetime]

expiration timestamp, if applicable.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
def get_session(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> Tuple[gcp_credentials.Credentials, Optional[datetime.datetime]]:
    """Get a GCP session object with credentials for the specified resource.

    Args:
        auth_method: The authentication method to use.
        resource_type: The resource type to get credentials for.
        resource_id: The resource ID to get credentials for.

    Returns:
        GCP session with credentials for the specified resource and its
        expiration timestamp, if applicable.
    """
    # We maintain a cache of all sessions to avoid re-authenticating
    # multiple times for the same resource
    key = (auth_method, resource_type, resource_id)
    if key in self._session_cache:
        session, expires_at = self._session_cache[key]
        if expires_at is None:
            return session, None

        # Refresh expired sessions

        # check if the token expires in the near future
        if expires_at > utc_now(tz_aware=expires_at) + datetime.timedelta(
            minutes=GCP_SESSION_EXPIRATION_BUFFER
        ):
            return session, expires_at

    logger.debug(
        f"Creating GCP authentication session for auth method "
        f"'{auth_method}', resource type '{resource_type}' and resource ID "
        f"'{resource_id}'..."
    )
    session, expires_at = self._authenticate(
        auth_method, resource_type, resource_id
    )
    self._session_cache[key] = (session, expires_at)
    return session, expires_at
Modules
gcp_service_connector

GCP Service Connector.

The GCP Service Connector implements various authentication methods for GCP services:

  • Explicit GCP service account key
Classes
GCPAuthenticationMethods

Bases: StrEnum

GCP Authentication methods.

GCPBaseConfig

Bases: AuthenticationConfig

GCP base configuration.

Attributes
gcp_project_id: str property

Get the GCP project ID.

This method must be implemented by subclasses to ensure that the GCP project ID is always available.

Raises:

Type Description
NotImplementedError

If the method is not implemented.

GCPBaseProjectIDConfig

Bases: GCPBaseConfig

GCP base configuration with included project ID.

Attributes
gcp_project_id: str property

Get the GCP project ID.

Returns:

Type Description
str

The GCP project ID.

GCPExternalAccountConfig

Bases: GCPBaseProjectIDConfig, GCPExternalAccountCredentials

GCP external account configuration.

GCPExternalAccountCredentials

Bases: AuthenticationConfig

GCP external account credentials.

Functions
validate_external_account_json(value: PlainSerializedSecretStr) -> PlainSerializedSecretStr classmethod

Validate the external account credentials JSON.

Parameters:

Name Type Description Default
value PlainSerializedSecretStr

The external account credentials JSON.

required

Returns:

Type Description
PlainSerializedSecretStr

The validated external account credentials JSON.

Raises:

Type Description
ValueError

If the external account credentials JSON is invalid.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
@field_validator("external_account_json")
@classmethod
def validate_external_account_json(
    cls, value: PlainSerializedSecretStr
) -> PlainSerializedSecretStr:
    """Validate the external account credentials JSON.

    Args:
        value: The external account credentials JSON.

    Returns:
        The validated external account credentials JSON.

    Raises:
        ValueError: If the external account credentials JSON is invalid.
    """
    try:
        external_account_info = json.loads(value.get_secret_value())
    except json.JSONDecodeError as e:
        raise ValueError(
            f"GCP external account credentials is not a valid JSON: {e}"
        )

    # Check that all fields are present
    required_fields = [
        "type",
        "subject_token_type",
        "token_url",
    ]
    # Compute missing fields
    missing_fields = set(required_fields) - set(
        external_account_info.keys()
    )
    if missing_fields:
        raise ValueError(
            f"GCP external account credentials JSON is missing required "
            f"fields: {', '.join(list(missing_fields))}"
        )

    if external_account_info["type"] != "external_account":
        raise ValueError(
            "The JSON does not contain GCP external account credentials. "
            f'The "type" field is set to {external_account_info["type"]} '
            "instead of 'external_account'."
        )

    return value
validate_service_account_dict(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Convert the external account credentials to JSON if given in dict format.

Parameters:

Name Type Description Default
data Dict[str, Any]

The configuration values.

required

Returns:

Type Description
Dict[str, Any]

The validated configuration values.

Raises:

Type Description
ValueError

If the external account credentials JSON is invalid.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
@model_validator(mode="before")
@classmethod
@before_validator_handler
def validate_service_account_dict(
    cls, data: Dict[str, Any]
) -> Dict[str, Any]:
    """Convert the external account credentials to JSON if given in dict format.

    Args:
        data: The configuration values.

    Returns:
        The validated configuration values.

    Raises:
        ValueError: If the external account credentials JSON is invalid.
    """
    external_account_json = data.get("external_account_json")
    if isinstance(external_account_json, dict):
        data["external_account_json"] = json.dumps(
            data["external_account_json"]
        )
    elif isinstance(external_account_json, str):
        # Check if the external account JSON is base64 encoded and decode it
        if re.match(r"^[A-Za-z0-9+/=]+$", external_account_json):
            try:
                data["external_account_json"] = base64.b64decode(
                    external_account_json
                ).decode("utf-8")
            except Exception as e:
                raise ValueError(
                    f"Failed to decode base64 encoded external account JSON: {e}"
                )

    return data
GCPOAuth2Token

Bases: AuthenticationConfig

GCP OAuth 2.0 token credentials.

GCPOAuth2TokenConfig

Bases: GCPBaseProjectIDConfig, GCPOAuth2Token

GCP OAuth 2.0 configuration.

GCPServiceAccountConfig

Bases: GCPBaseConfig, GCPServiceAccountCredentials

GCP service account configuration.

Attributes
gcp_project_id: str property

Get the GCP project ID.

When a service account JSON is provided, the project ID can be extracted from it instead of being provided explicitly.

Returns:

Type Description
str

The GCP project ID.

GCPServiceAccountCredentials

Bases: AuthenticationConfig

GCP service account credentials.

Functions
validate_service_account_dict(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Convert the service account credentials to JSON if given in dict format.

Parameters:

Name Type Description Default
data Dict[str, Any]

The configuration values.

required

Returns:

Type Description
Dict[str, Any]

The validated configuration values.

Raises:

Type Description
ValueError

If the service account credentials JSON is invalid.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
@model_validator(mode="before")
@classmethod
@before_validator_handler
def validate_service_account_dict(
    cls, data: Dict[str, Any]
) -> Dict[str, Any]:
    """Convert the service account credentials to JSON if given in dict format.

    Args:
        data: The configuration values.

    Returns:
        The validated configuration values.

    Raises:
        ValueError: If the service account credentials JSON is invalid.
    """
    service_account_json = data.get("service_account_json")
    if isinstance(service_account_json, dict):
        data["service_account_json"] = json.dumps(
            data["service_account_json"]
        )
    elif isinstance(service_account_json, str):
        # Check if the service account JSON is base64 encoded and decode it
        if re.match(r"^[A-Za-z0-9+/=]+$", service_account_json):
            try:
                data["service_account_json"] = base64.b64decode(
                    service_account_json
                ).decode("utf-8")
            except Exception as e:
                raise ValueError(
                    f"Failed to decode base64 encoded service account JSON: {e}"
                )

    return data
validate_service_account_json(value: PlainSerializedSecretStr) -> PlainSerializedSecretStr classmethod

Validate the service account credentials JSON.

Parameters:

Name Type Description Default
value PlainSerializedSecretStr

The service account credentials JSON.

required

Returns:

Type Description
PlainSerializedSecretStr

The validated service account credentials JSON.

Raises:

Type Description
ValueError

If the service account credentials JSON is invalid.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
@field_validator("service_account_json")
@classmethod
def validate_service_account_json(
    cls, value: PlainSerializedSecretStr
) -> PlainSerializedSecretStr:
    """Validate the service account credentials JSON.

    Args:
        value: The service account credentials JSON.

    Returns:
        The validated service account credentials JSON.

    Raises:
        ValueError: If the service account credentials JSON is invalid.
    """
    try:
        service_account_info = json.loads(value.get_secret_value())
    except json.JSONDecodeError as e:
        raise ValueError(
            f"GCP service account credentials is not a valid JSON: {e}"
        )

    # Check that all fields are present
    required_fields = [
        "type",
        "project_id",
        "private_key_id",
        "private_key",
        "client_email",
        "client_id",
        "auth_uri",
        "token_uri",
        "auth_provider_x509_cert_url",
        "client_x509_cert_url",
    ]
    # Compute missing fields
    missing_fields = set(required_fields) - set(
        service_account_info.keys()
    )
    if missing_fields:
        raise ValueError(
            f"GCP service account credentials JSON is missing required "
            f"fields: {', '.join(list(missing_fields))}"
        )

    if service_account_info["type"] != "service_account":
        raise ValueError(
            "The JSON does not contain GCP service account credentials. "
            f'The "type" field is set to {service_account_info["type"]} '
            "instead of 'service_account'."
        )

    return value
GCPServiceAccountImpersonationConfig

Bases: GCPServiceAccountConfig

GCP service account impersonation configuration.

GCPServiceConnector(**kwargs: Any)

Bases: ServiceConnector

GCP service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Functions
get_session(auth_method: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None) -> Tuple[gcp_credentials.Credentials, Optional[datetime.datetime]]

Get a GCP session object with credentials for the specified resource.

Parameters:

Name Type Description Default
auth_method str

The authentication method to use.

required
resource_type Optional[str]

The resource type to get credentials for.

None
resource_id Optional[str]

The resource ID to get credentials for.

None

Returns:

Type Description
Credentials

GCP session with credentials for the specified resource and its

Optional[datetime]

expiration timestamp, if applicable.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
def get_session(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> Tuple[gcp_credentials.Credentials, Optional[datetime.datetime]]:
    """Get a GCP session object with credentials for the specified resource.

    Args:
        auth_method: The authentication method to use.
        resource_type: The resource type to get credentials for.
        resource_id: The resource ID to get credentials for.

    Returns:
        GCP session with credentials for the specified resource and its
        expiration timestamp, if applicable.
    """
    # We maintain a cache of all sessions to avoid re-authenticating
    # multiple times for the same resource
    key = (auth_method, resource_type, resource_id)
    if key in self._session_cache:
        session, expires_at = self._session_cache[key]
        if expires_at is None:
            return session, None

        # Refresh expired sessions

        # check if the token expires in the near future
        if expires_at > utc_now(tz_aware=expires_at) + datetime.timedelta(
            minutes=GCP_SESSION_EXPIRATION_BUFFER
        ):
            return session, expires_at

    logger.debug(
        f"Creating GCP authentication session for auth method "
        f"'{auth_method}', resource type '{resource_type}' and resource ID "
        f"'{resource_id}'..."
    )
    session, expires_at = self._authenticate(
        auth_method, resource_type, resource_id
    )
    self._session_cache[key] = (session, expires_at)
    return session, expires_at
GCPUserAccountConfig

Bases: GCPBaseProjectIDConfig, GCPUserAccountCredentials

GCP user account configuration.

GCPUserAccountCredentials

Bases: AuthenticationConfig

GCP user account credentials.

Functions
validate_user_account_dict(data: Dict[str, Any]) -> Dict[str, Any] classmethod

Convert the user account credentials to JSON if given in dict format.

Parameters:

Name Type Description Default
data Dict[str, Any]

The configuration values.

required

Returns:

Type Description
Dict[str, Any]

The validated configuration values.

Raises:

Type Description
ValueError

If the user account credentials JSON is invalid.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@model_validator(mode="before")
@classmethod
@before_validator_handler
def validate_user_account_dict(
    cls, data: Dict[str, Any]
) -> Dict[str, Any]:
    """Convert the user account credentials to JSON if given in dict format.

    Args:
        data: The configuration values.

    Returns:
        The validated configuration values.

    Raises:
        ValueError: If the user account credentials JSON is invalid.
    """
    user_account_json = data.get("user_account_json")
    if isinstance(user_account_json, dict):
        data["user_account_json"] = json.dumps(data["user_account_json"])
    elif isinstance(user_account_json, str):
        # Check if the user account JSON is base64 encoded and decode it
        if re.match(r"^[A-Za-z0-9+/=]+$", user_account_json):
            try:
                data["user_account_json"] = base64.b64decode(
                    user_account_json
                ).decode("utf-8")
            except Exception as e:
                raise ValueError(
                    f"Failed to decode base64 encoded user account JSON: {e}"
                )
    return data
validate_user_account_json(value: PlainSerializedSecretStr) -> PlainSerializedSecretStr classmethod

Validate the user account credentials JSON.

Parameters:

Name Type Description Default
value PlainSerializedSecretStr

The user account credentials JSON.

required

Returns:

Type Description
PlainSerializedSecretStr

The validated user account credentials JSON.

Raises:

Type Description
ValueError

If the user account credentials JSON is invalid.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@field_validator("user_account_json")
@classmethod
def validate_user_account_json(
    cls, value: PlainSerializedSecretStr
) -> PlainSerializedSecretStr:
    """Validate the user account credentials JSON.

    Args:
        value: The user account credentials JSON.

    Returns:
        The validated user account credentials JSON.

    Raises:
        ValueError: If the user account credentials JSON is invalid.
    """
    try:
        user_account_info = json.loads(value.get_secret_value())
    except json.JSONDecodeError as e:
        raise ValueError(
            f"GCP user account credentials is not a valid JSON: {e}"
        )

    # Check that all fields are present
    required_fields = [
        "type",
        "refresh_token",
        "client_secret",
        "client_id",
    ]
    # Compute missing fields
    missing_fields = set(required_fields) - set(user_account_info.keys())
    if missing_fields:
        raise ValueError(
            f"GCP user account credentials JSON is missing required "
            f"fields: {', '.join(list(missing_fields))}"
        )

    if user_account_info["type"] != "authorized_user":
        raise ValueError(
            "The JSON does not contain GCP user account credentials. The "
            f'"type" field is set to {user_account_info["type"]} '
            "instead of 'authorized_user'."
        )

    return value
ZenMLAwsSecurityCredentialsSupplier

Bases: _DefaultAwsSecurityCredentialsSupplier

An improved version of the GCP external account credential supplier for AWS.

The original GCP external account credential supplier only provides rudimentary support for extracting AWS credentials from environment variables or the AWS metadata service. This version improves on that by using the boto3 library itself (if available), which uses the entire range of implicit authentication features packed into it.

Without this improvement, sts.AssumeRoleWithWebIdentity authentication is not supported for EKS pods and the EC2 attached role credentials are used instead (see: https://medium.com/@derek10cloud/gcp-workload-identity-federation-doesnt-yet-support-eks-irsa-in-aws-a3c71877671a).

Functions
get_aws_region(context: Any, request: Any) -> str

Get the AWS region from the local environment.

This method is a copy of the original method from the google.auth.aws._DefaultAwsSecurityCredentialsSupplier class. It has been modified to use the boto3 library to extract the AWS region from the local environment.

Parameters:

Name Type Description Default
context Any

The context to use to get the security credentials.

required
request Any

The request to use to get the security credentials.

required

Returns:

Type Description
str

The AWS region.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
def get_aws_region(self, context: Any, request: Any) -> str:
    """Get the AWS region from the local environment.

    This method is a copy of the original method from the
    `google.auth.aws._DefaultAwsSecurityCredentialsSupplier` class. It has
    been modified to use the boto3 library to extract the AWS
    region from the local environment.

    Args:
        context: The context to use to get the security credentials.
        request: The request to use to get the security credentials.

    Returns:
        The AWS region.
    """
    try:
        import boto3

        session = boto3.Session()
        if session.region_name:
            return session.region_name  # type: ignore[no-any-return]
    except ImportError:
        pass

    logger.debug(
        "Failed to extract AWS region from the local environment "
        "using the boto3 library. Falling back to the original "
        "implementation."
    )

    return super().get_aws_region(  # type: ignore[no-any-return]
        context, request
    )
get_aws_security_credentials(context: Any, request: Any) -> gcp_aws.AwsSecurityCredentials

Get the security credentials from the local environment.

This method is a copy of the original method from the google.auth.aws._DefaultAwsSecurityCredentialsSupplier class. It has been modified to use the boto3 library to extract the AWS credentials from the local environment.

Parameters:

Name Type Description Default
context Any

The context to use to get the security credentials.

required
request Any

The request to use to get the security credentials.

required

Returns:

Type Description
AwsSecurityCredentials

The AWS temporary security credentials.

Source code in src/zenml/integrations/gcp/service_connectors/gcp_service_connector.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
def get_aws_security_credentials(
    self, context: Any, request: Any
) -> gcp_aws.AwsSecurityCredentials:
    """Get the security credentials from the local environment.

    This method is a copy of the original method from the
    `google.auth.aws._DefaultAwsSecurityCredentialsSupplier` class. It has
    been modified to use the boto3 library to extract the AWS credentials
    from the local environment.

    Args:
        context: The context to use to get the security credentials.
        request: The request to use to get the security credentials.

    Returns:
        The AWS temporary security credentials.
    """
    try:
        import boto3

        session = boto3.Session()
        credentials = session.get_credentials()
        if credentials is not None:
            creds = credentials.get_frozen_credentials()
            return gcp_aws.AwsSecurityCredentials(
                creds.access_key,
                creds.secret_key,
                creds.token,
            )
    except ImportError:
        pass

    logger.debug(
        "Failed to extract AWS credentials from the local environment "
        "using the boto3 library. Falling back to the original "
        "implementation."
    )

    return super().get_aws_security_credentials(context, request)
ZenMLGCPAWSExternalAccountCredentials

Bases: Credentials

An improved version of the GCP external account credential for AWS.

The original GCP external account credential only provides rudimentary support for extracting AWS credentials from environment variables or the AWS metadata service. This version improves on that by using the boto3 library itself (if available), which uses the entire range of implicit authentication features packed into it.

Without this improvement, sts.AssumeRoleWithWebIdentity authentication is not supported for EKS pods and the EC2 attached role credentials are used instead (see: https://medium.com/@derek10cloud/gcp-workload-identity-federation-doesnt-yet-support-eks-irsa-in-aws-a3c71877671a).

IMPORTANT: subclassing this class only works with the google-auth library version lower than 2.29.0. Starting from version 2.29.0, the AWS logic has been moved to a separate google.auth.aws._DefaultAwsSecurityCredentialsSupplier class that can be subclassed instead and supplied as the aws_security_credentials_supplier parameter to the google.auth.aws.Credentials class.

Functions

step_operators

Initialization for the VertexAI Step Operator.

Classes
VertexStepOperator(*args: Any, **kwargs: Any)

Bases: BaseStepOperator, GoogleCredentialsMixin

Step operator to run a step on Vertex AI.

This class defines code that can set up a Vertex AI environment and run the ZenML entrypoint command in it.

Initializes the step operator and validates the accelerator type.

Parameters:

Name Type Description Default
*args Any

Variable length argument list.

()
**kwargs Any

Arbitrary keyword arguments.

{}
Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
81
82
83
84
85
86
87
88
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initializes the step operator and validates the accelerator type.

    Args:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    super().__init__(*args, **kwargs)
Attributes
config: VertexStepOperatorConfig property

Returns the VertexStepOperatorConfig config.

Returns:

Type Description
VertexStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Vertex step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=VERTEX_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on VertexAI.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required

Raises:

Type Description
RuntimeError

If the run fails.

Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on VertexAI.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.

    Raises:
        RuntimeError: If the run fails.
    """
    resource_settings = info.config.resource_settings
    if resource_settings.cpu_count or resource_settings.memory:
        logger.warning(
            "Specifying cpus or memory is not supported for "
            "the Vertex step operator. If you want to run this step "
            "operator on specific resources, you can do so by configuring "
            "a different machine_type type like this: "
            "`zenml step-operator update %s "
            "--machine_type=<MACHINE_TYPE>`",
            self.name,
        )
    settings = cast(VertexStepOperatorSettings, self.get_settings(info))
    validate_accelerator_type(settings.accelerator_type)

    job_labels = {"source": f"zenml-{__version__.replace('.', '_')}"}

    # Step 1: Authenticate with Google
    credentials, project_id = self._get_authentication()

    image_name = info.get_image(key=VERTEX_DOCKER_IMAGE_KEY)

    # Step 3: Launch the job
    # The AI Platform services require regional API endpoints.
    client_options = {
        "api_endpoint": self.config.region + VERTEX_ENDPOINT_SUFFIX
    }
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for
    # multiple requests.
    client = aiplatform.gapic.JobServiceClient(
        credentials=credentials, client_options=client_options
    )
    accelerator_count = (
        resource_settings.gpu_count or settings.accelerator_count
    )
    custom_job = {
        "display_name": info.run_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": settings.machine_type,
                        "accelerator_type": settings.accelerator_type,
                        "accelerator_count": accelerator_count
                        if settings.accelerator_type
                        else 0,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": image_name,
                        "command": entrypoint_command,
                        "args": [],
                        "env": [
                            {"name": key, "value": value}
                            for key, value in environment.items()
                        ],
                    },
                    "disk_spec": {
                        "boot_disk_type": settings.boot_disk_type,
                        "boot_disk_size_gb": settings.boot_disk_size_gb,
                    },
                }
            ],
            "service_account": self.config.service_account,
            "network": self.config.network,
            "reserved_ip_ranges": (
                self.config.reserved_ip_ranges.split(",")
                if self.config.reserved_ip_ranges
                else []
            ),
            "persistent_resource_id": settings.persistent_resource_id,
        },
        "labels": job_labels,
        "encryption_spec": {
            "kmsKeyName": self.config.encryption_spec_key_name
        }
        if self.config.encryption_spec_key_name
        else {},
    }
    logger.debug("Vertex AI Job=%s", custom_job)

    parent = f"projects/{project_id}/locations/{self.config.region}"
    logger.info(
        "Submitting custom job='%s', path='%s' to Vertex AI Training.",
        custom_job["display_name"],
        parent,
    )
    info.force_write_logs()
    response = client.create_custom_job(
        parent=parent, custom_job=custom_job
    )
    logger.debug("Vertex AI response:", response)

    # Step 4: Monitor the job

    # Monitors the long-running operation by polling the job state
    # periodically, and retries the polling when a transient connectivity
    # issue is encountered.
    #
    # Long-running operation monitoring:
    #   The possible states of "get job" response can be found at
    #   https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#State
    #   where SUCCEEDED/FAILED/CANCELED are considered to be final states.
    #   The following logic will keep polling the state of the job until
    #   the job enters a final state.
    #
    # During the polling, if a connection error was encountered, the GET
    # request will be retried by recreating the Python API client to
    # refresh the lifecycle of the connection being used. See
    # https://github.com/googleapis/google-api-python-client/issues/218
    # for a detailed description of the problem. If the error persists for
    # _CONNECTION_ERROR_RETRY_LIMIT consecutive attempts, the function
    # will raise ConnectionError.
    retry_count = 0
    job_id = response.name

    while response.state not in VERTEX_JOB_STATES_COMPLETED:
        time.sleep(POLLING_INTERVAL_IN_SECONDS)
        try:
            response = client.get_custom_job(name=job_id)
            retry_count = 0
        # Handle transient connection errors and credential expiration by
        # recreating the Python API client.
        except (ConnectionError, ServerError) as err:
            if retry_count < CONNECTION_ERROR_RETRY_LIMIT:
                retry_count += 1
                logger.warning(
                    f"Error encountered when polling job "
                    f"{job_id}: {err}\nRetrying...",
                )
                # This call will refresh the credentials if they expired.
                credentials, project_id = self._get_authentication()
                # Recreate the Python API client.
                client = aiplatform.gapic.JobServiceClient(
                    credentials=credentials, client_options=client_options
                )
            else:
                logger.exception(
                    "Request failed after %s retries.",
                    CONNECTION_ERROR_RETRY_LIMIT,
                )
                raise RuntimeError(
                    f"Request failed after {CONNECTION_ERROR_RETRY_LIMIT} "
                    f"retries: {err}"
                )
        if response.state in VERTEX_JOB_STATES_FAILED:
            err_msg = (
                "Job '{}' did not succeed.  Detailed response {}.".format(
                    job_id, response
                )
            )
            logger.error(err_msg)
            raise RuntimeError(err_msg)

    # Cloud training complete
    logger.info("Job '%s' successful.", job_id)
Modules
vertex_step_operator

Implementation of a VertexAI step operator.

Code heavily inspired by TFX Implementation: https://github.com/tensorflow/tfx/blob/master/tfx/extensions/ google_cloud_ai_platform/training_clients.py

Classes
VertexStepOperator(*args: Any, **kwargs: Any)

Bases: BaseStepOperator, GoogleCredentialsMixin

Step operator to run a step on Vertex AI.

This class defines code that can set up a Vertex AI environment and run the ZenML entrypoint command in it.

Initializes the step operator and validates the accelerator type.

Parameters:

Name Type Description Default
*args Any

Variable length argument list.

()
**kwargs Any

Arbitrary keyword arguments.

{}
Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
81
82
83
84
85
86
87
88
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initializes the step operator and validates the accelerator type.

    Args:
        *args: Variable length argument list.
        **kwargs: Arbitrary keyword arguments.
    """
    super().__init__(*args, **kwargs)
Attributes
config: VertexStepOperatorConfig property

Returns the VertexStepOperatorConfig config.

Returns:

Type Description
VertexStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the Vertex step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=VERTEX_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on VertexAI.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required

Raises:

Type Description
RuntimeError

If the run fails.

Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on VertexAI.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.

    Raises:
        RuntimeError: If the run fails.
    """
    resource_settings = info.config.resource_settings
    if resource_settings.cpu_count or resource_settings.memory:
        logger.warning(
            "Specifying cpus or memory is not supported for "
            "the Vertex step operator. If you want to run this step "
            "operator on specific resources, you can do so by configuring "
            "a different machine_type type like this: "
            "`zenml step-operator update %s "
            "--machine_type=<MACHINE_TYPE>`",
            self.name,
        )
    settings = cast(VertexStepOperatorSettings, self.get_settings(info))
    validate_accelerator_type(settings.accelerator_type)

    job_labels = {"source": f"zenml-{__version__.replace('.', '_')}"}

    # Step 1: Authenticate with Google
    credentials, project_id = self._get_authentication()

    image_name = info.get_image(key=VERTEX_DOCKER_IMAGE_KEY)

    # Step 3: Launch the job
    # The AI Platform services require regional API endpoints.
    client_options = {
        "api_endpoint": self.config.region + VERTEX_ENDPOINT_SUFFIX
    }
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for
    # multiple requests.
    client = aiplatform.gapic.JobServiceClient(
        credentials=credentials, client_options=client_options
    )
    accelerator_count = (
        resource_settings.gpu_count or settings.accelerator_count
    )
    custom_job = {
        "display_name": info.run_name,
        "job_spec": {
            "worker_pool_specs": [
                {
                    "machine_spec": {
                        "machine_type": settings.machine_type,
                        "accelerator_type": settings.accelerator_type,
                        "accelerator_count": accelerator_count
                        if settings.accelerator_type
                        else 0,
                    },
                    "replica_count": 1,
                    "container_spec": {
                        "image_uri": image_name,
                        "command": entrypoint_command,
                        "args": [],
                        "env": [
                            {"name": key, "value": value}
                            for key, value in environment.items()
                        ],
                    },
                    "disk_spec": {
                        "boot_disk_type": settings.boot_disk_type,
                        "boot_disk_size_gb": settings.boot_disk_size_gb,
                    },
                }
            ],
            "service_account": self.config.service_account,
            "network": self.config.network,
            "reserved_ip_ranges": (
                self.config.reserved_ip_ranges.split(",")
                if self.config.reserved_ip_ranges
                else []
            ),
            "persistent_resource_id": settings.persistent_resource_id,
        },
        "labels": job_labels,
        "encryption_spec": {
            "kmsKeyName": self.config.encryption_spec_key_name
        }
        if self.config.encryption_spec_key_name
        else {},
    }
    logger.debug("Vertex AI Job=%s", custom_job)

    parent = f"projects/{project_id}/locations/{self.config.region}"
    logger.info(
        "Submitting custom job='%s', path='%s' to Vertex AI Training.",
        custom_job["display_name"],
        parent,
    )
    info.force_write_logs()
    response = client.create_custom_job(
        parent=parent, custom_job=custom_job
    )
    logger.debug("Vertex AI response:", response)

    # Step 4: Monitor the job

    # Monitors the long-running operation by polling the job state
    # periodically, and retries the polling when a transient connectivity
    # issue is encountered.
    #
    # Long-running operation monitoring:
    #   The possible states of "get job" response can be found at
    #   https://cloud.google.com/ai-platform/training/docs/reference/rest/v1/projects.jobs#State
    #   where SUCCEEDED/FAILED/CANCELED are considered to be final states.
    #   The following logic will keep polling the state of the job until
    #   the job enters a final state.
    #
    # During the polling, if a connection error was encountered, the GET
    # request will be retried by recreating the Python API client to
    # refresh the lifecycle of the connection being used. See
    # https://github.com/googleapis/google-api-python-client/issues/218
    # for a detailed description of the problem. If the error persists for
    # _CONNECTION_ERROR_RETRY_LIMIT consecutive attempts, the function
    # will raise ConnectionError.
    retry_count = 0
    job_id = response.name

    while response.state not in VERTEX_JOB_STATES_COMPLETED:
        time.sleep(POLLING_INTERVAL_IN_SECONDS)
        try:
            response = client.get_custom_job(name=job_id)
            retry_count = 0
        # Handle transient connection errors and credential expiration by
        # recreating the Python API client.
        except (ConnectionError, ServerError) as err:
            if retry_count < CONNECTION_ERROR_RETRY_LIMIT:
                retry_count += 1
                logger.warning(
                    f"Error encountered when polling job "
                    f"{job_id}: {err}\nRetrying...",
                )
                # This call will refresh the credentials if they expired.
                credentials, project_id = self._get_authentication()
                # Recreate the Python API client.
                client = aiplatform.gapic.JobServiceClient(
                    credentials=credentials, client_options=client_options
                )
            else:
                logger.exception(
                    "Request failed after %s retries.",
                    CONNECTION_ERROR_RETRY_LIMIT,
                )
                raise RuntimeError(
                    f"Request failed after {CONNECTION_ERROR_RETRY_LIMIT} "
                    f"retries: {err}"
                )
        if response.state in VERTEX_JOB_STATES_FAILED:
            err_msg = (
                "Job '{}' did not succeed.  Detailed response {}.".format(
                    job_id, response
                )
            )
            logger.error(err_msg)
            raise RuntimeError(err_msg)

    # Cloud training complete
    logger.info("Job '%s' successful.", job_id)
Functions
validate_accelerator_type(accelerator_type: Optional[str] = None) -> None

Validates that the accelerator type is valid.

Parameters:

Name Type Description Default
accelerator_type Optional[str]

The accelerator type to validate.

None

Raises:

Type Description
ValueError

If the accelerator type is not valid.

Source code in src/zenml/integrations/gcp/step_operators/vertex_step_operator.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def validate_accelerator_type(accelerator_type: Optional[str] = None) -> None:
    """Validates that the accelerator type is valid.

    Args:
        accelerator_type: The accelerator type to validate.

    Raises:
        ValueError: If the accelerator type is not valid.
    """
    accepted_vals = list(aiplatform.gapic.AcceleratorType.__members__.keys())
    if accelerator_type and accelerator_type.upper() not in accepted_vals:
        raise ValueError(
            f"Accelerator must be one of the following: {accepted_vals}"
        )

vertex_custom_job_parameters

Vertex custom job parameter model.

Classes
VertexCustomJobParameters

Bases: BaseModel

Settings for the Vertex custom job parameters.

Attributes:

Name Type Description
accelerator_type Optional[str]

Defines which accelerator (GPU, TPU) is used for the job. Check out out this table to see which accelerator type and count are compatible with your chosen machine type: https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table.

accelerator_count int

Defines number of accelerators to be used for the job. Check out out this table to see which accelerator type and count are compatible with your chosen machine type: https://cloud.google.com/vertex-ai/docs/training/configure-compute#gpu-compatibility-table.

machine_type str

Machine type specified here https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types.

boot_disk_size_gb int

Size of the boot disk in GB. (Default: 100) https://cloud.google.com/vertex-ai/docs/training/configure-compute#boot_disk_options

boot_disk_type str

Type of the boot disk. (Default: pd-ssd) https://cloud.google.com/vertex-ai/docs/training/configure-compute#boot_disk_options

persistent_resource_id Optional[str]

The ID of the persistent resource to use for the job. https://cloud.google.com/vertex-ai/docs/training/persistent-resource-overview

service_account Optional[str]

Specifies the service account to be used.