Skip to content

Azure

zenml.integrations.azure

Initialization of the ZenML Azure integration.

The Azure integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores, and an io module to handle file operations on Azure Blob Storage. The Azure Step Operator integration submodule provides a way to run ZenML steps in AzureML.

Attributes

AZURE = 'azure' module-attribute

AZUREML_ORCHESTRATOR_FLAVOR = 'azureml' module-attribute

AZUREML_STEP_OPERATOR_FLAVOR = 'azureml' module-attribute

AZURE_ARTIFACT_STORE_FLAVOR = 'azure' module-attribute

AZURE_CONNECTOR_TYPE = 'azure' module-attribute

AZURE_RESOURCE_TYPE = 'azure-generic' module-attribute

BLOB_RESOURCE_TYPE = 'blob-container' module-attribute

Classes

AzureIntegration

Bases: Integration

Definition of Azure integration for ZenML.

Functions
activate() -> None classmethod

Activate the Azure integration.

Source code in src/zenml/integrations/azure/__init__.py
59
60
61
62
@classmethod
def activate(cls) -> None:
    """Activate the Azure integration."""
    from zenml.integrations.azure import service_connectors  # noqa
flavors() -> List[Type[Flavor]] classmethod

Declares the flavors for the integration.

Returns:

Type Description
List[Type[Flavor]]

List of stack component flavors for this integration.

Source code in src/zenml/integrations/azure/__init__.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declares the flavors for the integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.azure.flavors import (
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
        AzureMLOrchestratorFlavor,
    )

    return [
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
        AzureMLOrchestratorFlavor,
    ]

Flavor

Class for ZenML Flavors.

Attributes
config_class: Type[StackComponentConfig] abstractmethod property

Returns StackComponentConfig config class.

Returns:

Type Description
Type[StackComponentConfig]

The config class.

config_schema: Dict[str, Any] property

The config schema for a flavor.

Returns:

Type Description
Dict[str, Any]

The config schema.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[StackComponent] abstractmethod property

Implementation class for this flavor.

Returns:

Type Description
Type[StackComponent]

The implementation class for this flavor.

logo_url: Optional[str] property

A url to represent the flavor in the dashboard.

Returns:

Type Description
Optional[str]

The flavor logo.

name: str abstractmethod property

The flavor name.

Returns:

Type Description
str

The flavor name.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

type: StackComponentType abstractmethod property

The stack component type.

Returns:

Type Description
StackComponentType

The stack component type.

Functions
from_model(flavor_model: FlavorResponse) -> Flavor classmethod

Loads a flavor from a model.

Parameters:

Name Type Description Default
flavor_model FlavorResponse

The model to load from.

required

Raises:

Type Description
CustomFlavorImportError

If the custom flavor can't be imported.

ImportError

If the flavor can't be imported.

Returns:

Type Description
Flavor

The loaded flavor.

Source code in src/zenml/stack/flavor.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_model(cls, flavor_model: FlavorResponse) -> "Flavor":
    """Loads a flavor from a model.

    Args:
        flavor_model: The model to load from.

    Raises:
        CustomFlavorImportError: If the custom flavor can't be imported.
        ImportError: If the flavor can't be imported.

    Returns:
        The loaded flavor.
    """
    try:
        flavor = source_utils.load(flavor_model.source)()
    except (ModuleNotFoundError, ImportError, NotImplementedError) as err:
        if flavor_model.is_custom:
            flavor_module, _ = flavor_model.source.rsplit(".", maxsplit=1)
            expected_file_path = os.path.join(
                source_utils.get_source_root(),
                flavor_module.replace(".", os.path.sep),
            )
            raise CustomFlavorImportError(
                f"Couldn't import custom flavor {flavor_model.name}: "
                f"{err}. Make sure the custom flavor class "
                f"`{flavor_model.source}` is importable. If it is part of "
                "a library, make sure it is installed. If "
                "it is a local code file, make sure it exists at "
                f"`{expected_file_path}.py`."
            )
        else:
            raise ImportError(
                f"Couldn't import flavor {flavor_model.name}: {err}"
            )
    return cast(Flavor, flavor)
generate_default_docs_url() -> str

Generate the doc urls for all inbuilt and integration flavors.

Note that this method is not going to be useful for custom flavors, which do not have any docs in the main zenml docs.

Returns:

Type Description
str

The complete url to the zenml documentation

Source code in src/zenml/stack/flavor.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def generate_default_docs_url(self) -> str:
    """Generate the doc urls for all inbuilt and integration flavors.

    Note that this method is not going to be useful for custom flavors,
    which do not have any docs in the main zenml docs.

    Returns:
        The complete url to the zenml documentation
    """
    from zenml import __version__

    component_type = self.type.plural.replace("_", "-")
    name = self.name.replace("_", "-")

    try:
        is_latest = is_latest_zenml_version()
    except RuntimeError:
        # We assume in error cases that we are on the latest version
        is_latest = True

    if is_latest:
        base = "https://docs.zenml.io"
    else:
        base = f"https://zenml-io.gitbook.io/zenml-legacy-documentation/v/{__version__}"
    return f"{base}/stack-components/{component_type}/{name}"
generate_default_sdk_docs_url() -> str

Generate SDK docs url for a flavor.

Returns:

Type Description
str

The complete url to the zenml SDK docs

Source code in src/zenml/stack/flavor.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_default_sdk_docs_url(self) -> str:
    """Generate SDK docs url for a flavor.

    Returns:
        The complete url to the zenml SDK docs
    """
    from zenml import __version__

    base = f"https://sdkdocs.zenml.io/{__version__}"

    component_type = self.type.plural

    if "zenml.integrations" in self.__module__:
        # Get integration name out of module path which will look something
        #  like this "zenml.integrations.<integration>....
        integration = self.__module__.split(
            "zenml.integrations.", maxsplit=1
        )[1].split(".")[0]

        return (
            f"{base}/integration_code_docs"
            f"/integrations-{integration}/#{self.__module__}"
        )

    else:
        return (
            f"{base}/core_code_docs/core-{component_type}/"
            f"#{self.__module__}"
        )
to_model(integration: Optional[str] = None, is_custom: bool = True) -> FlavorRequest

Converts a flavor to a model.

Parameters:

Name Type Description Default
integration Optional[str]

The integration to use for the model.

None
is_custom bool

Whether the flavor is a custom flavor.

True

Returns:

Type Description
FlavorRequest

The model.

Source code in src/zenml/stack/flavor.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def to_model(
    self,
    integration: Optional[str] = None,
    is_custom: bool = True,
) -> FlavorRequest:
    """Converts a flavor to a model.

    Args:
        integration: The integration to use for the model.
        is_custom: Whether the flavor is a custom flavor.

    Returns:
        The model.
    """
    connector_requirements = self.service_connector_requirements
    connector_type = (
        connector_requirements.connector_type
        if connector_requirements
        else None
    )
    resource_type = (
        connector_requirements.resource_type
        if connector_requirements
        else None
    )
    resource_id_attr = (
        connector_requirements.resource_id_attr
        if connector_requirements
        else None
    )

    model = FlavorRequest(
        name=self.name,
        type=self.type,
        source=source_utils.resolve(self.__class__).import_path,
        config_schema=self.config_schema,
        connector_type=connector_type,
        connector_resource_type=resource_type,
        connector_resource_id_attr=resource_id_attr,
        integration=integration,
        logo_url=self.logo_url,
        docs_url=self.docs_url,
        sdk_docs_url=self.sdk_docs_url,
        is_custom=is_custom,
    )
    return model

Integration

Base class for integration in ZenML.

Functions
activate() -> None classmethod

Abstract method to activate the integration.

Source code in src/zenml/integrations/integration.py
175
176
177
@classmethod
def activate(cls) -> None:
    """Abstract method to activate the integration."""
check_installation() -> bool classmethod

Method to check whether the required packages are installed.

Returns:

Type Description
bool

True if all required packages are installed, False otherwise.

Source code in src/zenml/integrations/integration.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
@classmethod
def check_installation(cls) -> bool:
    """Method to check whether the required packages are installed.

    Returns:
        True if all required packages are installed, False otherwise.
    """
    for r in cls.get_requirements():
        try:
            # First check if the base package is installed
            dist = pkg_resources.get_distribution(r)

            # Next, check if the dependencies (including extras) are
            # installed
            deps: List[Requirement] = []

            _, extras = parse_requirement(r)
            if extras:
                extra_list = extras[1:-1].split(",")
                for extra in extra_list:
                    try:
                        requirements = dist.requires(extras=[extra])  # type: ignore[arg-type]
                    except pkg_resources.UnknownExtra as e:
                        logger.debug(f"Unknown extra: {str(e)}")
                        return False
                    deps.extend(requirements)
            else:
                deps = dist.requires()

            for ri in deps:
                try:
                    # Remove the "extra == ..." part from the requirement string
                    cleaned_req = re.sub(
                        r"; extra == \"\w+\"", "", str(ri)
                    )
                    pkg_resources.get_distribution(cleaned_req)
                except pkg_resources.DistributionNotFound as e:
                    logger.debug(
                        f"Unable to find required dependency "
                        f"'{e.req}' for requirement '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False
                except pkg_resources.VersionConflict as e:
                    logger.debug(
                        f"Package version '{e.dist}' does not match "
                        f"version '{e.req}' required by '{r}' "
                        f"necessary for integration '{cls.NAME}'."
                    )
                    return False

        except pkg_resources.DistributionNotFound as e:
            logger.debug(
                f"Unable to find required package '{e.req}' for "
                f"integration {cls.NAME}."
            )
            return False
        except pkg_resources.VersionConflict as e:
            logger.debug(
                f"Package version '{e.dist}' does not match version "
                f"'{e.req}' necessary for integration {cls.NAME}."
            )
            return False

    logger.debug(
        f"Integration {cls.NAME} is installed correctly with "
        f"requirements {cls.get_requirements()}."
    )
    return True
flavors() -> List[Type[Flavor]] classmethod

Abstract method to declare new stack component flavors.

Returns:

Type Description
List[Type[Flavor]]

A list of new stack component flavors.

Source code in src/zenml/integrations/integration.py
179
180
181
182
183
184
185
186
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Abstract method to declare new stack component flavors.

    Returns:
        A list of new stack component flavors.
    """
    return []
get_requirements(target_os: Optional[str] = None, python_version: Optional[str] = None) -> List[str] classmethod

Method to get the requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None
python_version Optional[str]

The Python version to use for the requirements.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def get_requirements(
    cls,
    target_os: Optional[str] = None,
    python_version: Optional[str] = None,
) -> List[str]:
    """Method to get the requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.
        python_version: The Python version to use for the requirements.

    Returns:
        A list of requirements.
    """
    return cls.REQUIREMENTS
get_uninstall_requirements(target_os: Optional[str] = None) -> List[str] classmethod

Method to get the uninstall requirements for the integration.

Parameters:

Name Type Description Default
target_os Optional[str]

The target operating system to get the requirements for.

None

Returns:

Type Description
List[str]

A list of requirements.

Source code in src/zenml/integrations/integration.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def get_uninstall_requirements(
    cls, target_os: Optional[str] = None
) -> List[str]:
    """Method to get the uninstall requirements for the integration.

    Args:
        target_os: The target operating system to get the requirements for.

    Returns:
        A list of requirements.
    """
    ret = []
    for each in cls.get_requirements(target_os=target_os):
        is_ignored = False
        for ignored in cls.REQUIREMENTS_IGNORED_ON_UNINSTALL:
            if each.startswith(ignored):
                is_ignored = True
                break
        if not is_ignored:
            ret.append(each)
    return ret
plugin_flavors() -> List[Type[BasePluginFlavor]] classmethod

Abstract method to declare new plugin flavors.

Returns:

Type Description
List[Type[BasePluginFlavor]]

A list of new plugin flavors.

Source code in src/zenml/integrations/integration.py
188
189
190
191
192
193
194
195
@classmethod
def plugin_flavors(cls) -> List[Type["BasePluginFlavor"]]:
    """Abstract method to declare new plugin flavors.

    Returns:
        A list of new plugin flavors.
    """
    return []

Modules

artifact_stores

Initialization of the Azure Artifact Store integration.

Classes
AzureArtifactStore(*args: Any, **kwargs: Any)

Bases: BaseArtifactStore, AuthenticationMixin

Artifact Store for Microsoft Azure based artifacts.

Source code in src/zenml/artifact_stores/base_artifact_store.py
430
431
432
433
434
435
436
437
438
439
440
441
442
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initiate the Pydantic object and register the corresponding filesystem.

    Args:
        *args: The positional arguments to pass to the Pydantic object.
        **kwargs: The keyword arguments to pass to the Pydantic object.
    """
    super(BaseArtifactStore, self).__init__(*args, **kwargs)

    # If running in a ZenML server environment, we don't register
    # the filesystems. We always use the artifact stores directly.
    if ENV_ZENML_SERVER not in os.environ:
        self._register()
Attributes
config: AzureArtifactStoreConfig property

Returns the AzureArtifactStoreConfig config.

Returns:

Type Description
AzureArtifactStoreConfig

The configuration.

filesystem: adlfs.AzureBlobFileSystem property

The adlfs filesystem to access this artifact store.

Returns:

Type Description
AzureBlobFileSystem

The adlfs filesystem to access this artifact store.

Functions
copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file.

Parameters:

Name Type Description Default
src PathType

The path to copy from.

required
dst PathType

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(path: PathType) -> bool

Check whether a path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
189
190
191
192
193
194
195
196
197
198
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials() -> Optional[AzureSecretSchema]

Returns the credentials for the Azure Artifact Store if configured.

Returns:

Type Description
Optional[AzureSecretSchema]

The credentials.

Raises:

Type Description
RuntimeError

If the connector is not configured with Azure service principal credentials.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_credentials(self) -> Optional[AzureSecretSchema]:
    """Returns the credentials for the Azure Artifact Store if configured.

    Returns:
        The credentials.

    Raises:
        RuntimeError: If the connector is not configured with Azure service
            principal credentials.
    """
    connector = self.get_connector()
    if connector:
        from azure.identity import (
            ClientSecretCredential,
            DefaultAzureCredential,
        )
        from azure.storage.blob import BlobServiceClient

        client = connector.connect()
        if not isinstance(client, BlobServiceClient):
            raise RuntimeError(
                f"Expected a {BlobServiceClient.__module__}."
                f"{BlobServiceClient.__name__} object while "
                f"trying to use the linked connector, but got "
                f"{type(client)}."
            )
        # Get the credentials from the client
        credentials = client.credential

        if isinstance(credentials, ClientSecretCredential):
            return AzureSecretSchema(
                client_id=credentials._client_id,
                client_secret=credentials._client_credential,
                tenant_id=credentials._tenant_id,
                account_name=client.account_name,
            )

        elif isinstance(credentials, DefaultAzureCredential):
            return AzureSecretSchema(account_name=client.account_name)

        else:
            raise RuntimeError(
                "The Azure Artifact Store connector can only be used "
                "with a service connector that is configured with "
                "Azure service principal credentials or implicit authentication"
            )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AzureSecretSchema
    )
    return secret
glob(pattern: PathType) -> List[PathType]

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern PathType

The glob pattern to match, see details above.

required

Returns:

Type Description
List[PathType]

A list of paths that match the given glob pattern.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    prefix, _ = self._split_path(pattern)
    return [
        f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
    ]
isdir(path: PathType) -> bool

Check whether a path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
221
222
223
224
225
226
227
228
229
230
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(path: PathType) -> List[PathType]

Return a list of files in a directory.

Parameters:

Name Type Description Default
path PathType

The path to list.

required

Returns:

Type Description
List[PathType]

A list of files in the given directory.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of files in the given directory.
    """
    _, path = self._split_path(path)

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a dictionary returned by the Azure filesystem.

        Args:
            file_dict: A dictionary returned by the Azure filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(path: PathType) -> None

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
261
262
263
264
265
266
267
268
269
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(path: PathType) -> None

Create a directory at the given path.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
271
272
273
274
275
276
277
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path, exist_ok=True)
open(path: PathType, mode: str = 'r') -> Any

Open a file at the given path.

Parameters:

Name Type Description Default
path PathType

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
150
151
152
153
154
155
156
157
158
159
160
161
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(path: PathType) -> None

Remove the file at the given path.

Parameters:

Name Type Description Default
path PathType

The path to remove.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
279
280
281
282
283
284
285
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path to remove.
    """
    self.filesystem.rm_file(path=path)
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename source file to destination file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(path: PathType) -> None

Remove the given directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to remove.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
313
314
315
316
317
318
319
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(path: PathType) -> int

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
332
333
334
335
336
337
338
339
340
341
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(path: PathType) -> Dict[str, Any]

Return stat info for the given path.

Parameters:

Name Type Description Default
path PathType

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

Stat info.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
321
322
323
324
325
326
327
328
329
330
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        Stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., None]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

directory path, a list of directories inside the current directory

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

and a list of files inside the current directory.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    prefix, _ = self._split_path(top)
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{prefix}{directory}", subdirectories, files
Modules
azure_artifact_store

Implementation of the Azure Artifact Store integration.

Classes
AzureArtifactStore(*args: Any, **kwargs: Any)

Bases: BaseArtifactStore, AuthenticationMixin

Artifact Store for Microsoft Azure based artifacts.

Source code in src/zenml/artifact_stores/base_artifact_store.py
430
431
432
433
434
435
436
437
438
439
440
441
442
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initiate the Pydantic object and register the corresponding filesystem.

    Args:
        *args: The positional arguments to pass to the Pydantic object.
        **kwargs: The keyword arguments to pass to the Pydantic object.
    """
    super(BaseArtifactStore, self).__init__(*args, **kwargs)

    # If running in a ZenML server environment, we don't register
    # the filesystems. We always use the artifact stores directly.
    if ENV_ZENML_SERVER not in os.environ:
        self._register()
Attributes
config: AzureArtifactStoreConfig property

Returns the AzureArtifactStoreConfig config.

Returns:

Type Description
AzureArtifactStoreConfig

The configuration.

filesystem: adlfs.AzureBlobFileSystem property

The adlfs filesystem to access this artifact store.

Returns:

Type Description
AzureBlobFileSystem

The adlfs filesystem to access this artifact store.

Functions
copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file.

Parameters:

Name Type Description Default
src PathType

The path to copy from.

required
dst PathType

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(path: PathType) -> bool

Check whether a path exists.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
189
190
191
192
193
194
195
196
197
198
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
get_credentials() -> Optional[AzureSecretSchema]

Returns the credentials for the Azure Artifact Store if configured.

Returns:

Type Description
Optional[AzureSecretSchema]

The credentials.

Raises:

Type Description
RuntimeError

If the connector is not configured with Azure service principal credentials.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get_credentials(self) -> Optional[AzureSecretSchema]:
    """Returns the credentials for the Azure Artifact Store if configured.

    Returns:
        The credentials.

    Raises:
        RuntimeError: If the connector is not configured with Azure service
            principal credentials.
    """
    connector = self.get_connector()
    if connector:
        from azure.identity import (
            ClientSecretCredential,
            DefaultAzureCredential,
        )
        from azure.storage.blob import BlobServiceClient

        client = connector.connect()
        if not isinstance(client, BlobServiceClient):
            raise RuntimeError(
                f"Expected a {BlobServiceClient.__module__}."
                f"{BlobServiceClient.__name__} object while "
                f"trying to use the linked connector, but got "
                f"{type(client)}."
            )
        # Get the credentials from the client
        credentials = client.credential

        if isinstance(credentials, ClientSecretCredential):
            return AzureSecretSchema(
                client_id=credentials._client_id,
                client_secret=credentials._client_credential,
                tenant_id=credentials._tenant_id,
                account_name=client.account_name,
            )

        elif isinstance(credentials, DefaultAzureCredential):
            return AzureSecretSchema(account_name=client.account_name)

        else:
            raise RuntimeError(
                "The Azure Artifact Store connector can only be used "
                "with a service connector that is configured with "
                "Azure service principal credentials or implicit authentication"
            )

    secret = self.get_typed_authentication_secret(
        expected_schema_type=AzureSecretSchema
    )
    return secret
glob(pattern: PathType) -> List[PathType]

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern PathType

The glob pattern to match, see details above.

required

Returns:

Type Description
List[PathType]

A list of paths that match the given glob pattern.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    prefix, _ = self._split_path(pattern)
    return [
        f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
    ]
isdir(path: PathType) -> bool

Check whether a path is a directory.

Parameters:

Name Type Description Default
path PathType

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
221
222
223
224
225
226
227
228
229
230
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(path: PathType) -> List[PathType]

Return a list of files in a directory.

Parameters:

Name Type Description Default
path PathType

The path to list.

required

Returns:

Type Description
List[PathType]

A list of files in the given directory.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of files in the given directory.
    """
    _, path = self._split_path(path)

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a dictionary returned by the Azure filesystem.

        Args:
            file_dict: A dictionary returned by the Azure filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(path: PathType) -> None

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
261
262
263
264
265
266
267
268
269
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(path: PathType) -> None

Create a directory at the given path.

Parameters:

Name Type Description Default
path PathType

The path to create.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
271
272
273
274
275
276
277
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path, exist_ok=True)
open(path: PathType, mode: str = 'r') -> Any

Open a file at the given path.

Parameters:

Name Type Description Default
path PathType

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
150
151
152
153
154
155
156
157
158
159
160
161
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(path: PathType) -> None

Remove the file at the given path.

Parameters:

Name Type Description Default
path PathType

The path to remove.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
279
280
281
282
283
284
285
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path to remove.
    """
    self.filesystem.rm_file(path=path)
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename source file to destination file.

Parameters:

Name Type Description Default
src PathType

The path of the file to rename.

required
dst PathType

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Raises:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(path: PathType) -> None

Remove the given directory.

Parameters:

Name Type Description Default
path PathType

The path of the directory to remove.

required
Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
313
314
315
316
317
318
319
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
size(path: PathType) -> int

Get the size of a file in bytes.

Parameters:

Name Type Description Default
path PathType

The path to the file.

required

Returns:

Type Description
int

The size of the file in bytes.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
332
333
334
335
336
337
338
339
340
341
def size(self, path: PathType) -> int:
    """Get the size of a file in bytes.

    Args:
        path: The path to the file.

    Returns:
        The size of the file in bytes.
    """
    return self.filesystem.size(path=path)  # type: ignore[no-any-return]
stat(path: PathType) -> Dict[str, Any]

Return stat info for the given path.

Parameters:

Name Type Description Default
path PathType

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

Stat info.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
321
322
323
324
325
326
327
328
329
330
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        Stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top PathType

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., None]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[PathType, List[PathType], List[PathType]]]

An Iterable of Tuples, each of which contain the path of the current

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

directory path, a list of directories inside the current directory

Iterable[Tuple[PathType, List[PathType], List[PathType]]]

and a list of files inside the current directory.

Source code in src/zenml/integrations/azure/artifact_stores/azure_artifact_store.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    prefix, _ = self._split_path(top)
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{prefix}{directory}", subdirectories, files
Functions

azureml_utils

AzureML definitions.

Classes
Functions
check_settings_and_compute_configuration(parameter: str, settings: AzureMLComputeSettings, compute: Compute) -> None

Utility function comparing a parameter between settings and compute.

Parameters:

Name Type Description Default
parameter str

the name of the parameter.

required
settings AzureMLComputeSettings

The AzureML orchestrator settings.

required
compute Compute

The compute instance or cluster from AzureML.

required
Source code in src/zenml/integrations/azure/azureml_utils.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def check_settings_and_compute_configuration(
    parameter: str,
    settings: AzureMLComputeSettings,
    compute: Compute,
) -> None:
    """Utility function comparing a parameter between settings and compute.

    Args:
        parameter: the name of the parameter.
        settings: The AzureML orchestrator settings.
        compute: The compute instance or cluster from AzureML.
    """
    # Check the compute size
    compute_value = getattr(compute, parameter)
    settings_value = getattr(settings, parameter)

    if settings_value is not None and settings_value != compute_value:
        logger.warning(
            f"The '{parameter}' defined in the settings '{settings_value}' "
            "does not match the actual parameter of the instance: "
            f"'{compute_value}'. Will ignore this setting for now."
        )
create_or_get_compute(client: MLClient, settings: AzureMLComputeSettings, default_compute_name: str) -> Optional[str]

Creates or fetches the compute target if defined in the settings.

Parameters:

Name Type Description Default
client MLClient

the AzureML client.

required
settings AzureMLComputeSettings

the settings for the orchestrator.

required
default_compute_name str

the default name for the compute target, if one is not provided in the settings.

required

Returns:

Type Description
Optional[str]

None, if the orchestrator is using serverless compute or

Optional[str]

str, the name of the compute target (instance or cluster).

Raises:

Type Description
RuntimeError

if the fetched compute target is unsupported or the mode defined in the setting does not match the type of the compute target.

Source code in src/zenml/integrations/azure/azureml_utils.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def create_or_get_compute(
    client: MLClient,
    settings: AzureMLComputeSettings,
    default_compute_name: str,
) -> Optional[str]:
    """Creates or fetches the compute target if defined in the settings.

    Args:
        client: the AzureML client.
        settings: the settings for the orchestrator.
        default_compute_name: the default name for the compute target, if one
            is not provided in the settings.

    Returns:
        None, if the orchestrator is using serverless compute or
        str, the name of the compute target (instance or cluster).

    Raises:
        RuntimeError: if the fetched compute target is unsupported or the
            mode defined in the setting does not match the type of the
            compute target.
    """
    # If the mode is serverless, we can not fetch anything anyhow
    if settings.mode == AzureMLComputeTypes.SERVERLESS:
        return None

    # If a name is not provided, generate one based on the orchestrator id
    compute_name = settings.compute_name or default_compute_name

    # Try to fetch the compute target
    try:
        compute = client.compute.get(compute_name)

        logger.info(f"Using existing compute target: '{compute_name}'.")

        # Check if compute size matches with the settings
        check_settings_and_compute_configuration(
            parameter="size", settings=settings, compute=compute
        )

        compute_type = compute.type

        # Check the type and matches the settings
        if compute_type == "computeinstance":  # Compute Instance
            if settings.mode != AzureMLComputeTypes.COMPUTE_INSTANCE:
                raise RuntimeError(
                    "The mode of operation for the compute target defined"
                    f"in the settings '{settings.mode}' does not match "
                    f"the type of the compute target: `{compute_name}` "
                    "which is a 'compute-instance'. Please make sure that "
                    "the settings are adjusted properly."
                )

            if compute.state != "Running":
                raise RuntimeError(
                    f"The compute instance `{compute_name}` is not in a "
                    "running state at the moment. Please make sure that "
                    "the compute target is running, before executing the "
                    "pipeline."
                )

            # Idle time before shutdown
            check_settings_and_compute_configuration(
                parameter="idle_time_before_shutdown_minutes",
                settings=settings,
                compute=compute,
            )

        elif compute_type == "amIcompute":  # Compute Cluster
            if settings.mode != AzureMLComputeTypes.COMPUTE_CLUSTER:
                raise RuntimeError(
                    "The mode of operation for the compute target defined "
                    f"in the settings '{settings.mode}' does not match "
                    f"the type of the compute target: `{compute_name}` "
                    "which is a 'compute-cluster'. Please make sure that "
                    "the settings are adjusted properly."
                )

            if compute.provisioning_state != "Succeeded":
                raise RuntimeError(
                    f"The provisioning state '{compute.provisioning_state}'"
                    f"of the compute cluster `{compute_name}` is not "
                    "successful. Please make sure that the compute cluster "
                    "is provisioned properly, before executing the "
                    "pipeline."
                )

            for parameter in [
                "idle_time_before_scale_down",
                "max_instances",
                "min_instances",
                "tier",
                "location",
            ]:
                # Check all possible configurations
                check_settings_and_compute_configuration(
                    parameter=parameter, settings=settings, compute=compute
                )
        else:
            raise RuntimeError(f"Unsupported compute type: {compute_type}")
        return compute_name

    # If the compute target does not exist create it
    except ResourceNotFoundError:
        logger.info(
            f"Can not find the compute target with name: '{compute_name}':"
        )

        if settings.mode == AzureMLComputeTypes.COMPUTE_INSTANCE:
            logger.info(
                "Creating a new compute instance. This might take a "
                "few minutes."
            )

            from azure.ai.ml.entities import ComputeInstance

            compute_instance = ComputeInstance(
                name=compute_name,
                size=settings.size,
                idle_time_before_shutdown_minutes=settings.idle_time_before_shutdown_minutes,
            )
            client.begin_create_or_update(compute_instance).result()
            return compute_name

        elif settings.mode == AzureMLComputeTypes.COMPUTE_CLUSTER:
            logger.info(
                "Creating a new compute cluster. This might take a "
                "few minutes."
            )

            from azure.ai.ml.entities import AmlCompute

            compute_cluster = AmlCompute(
                name=compute_name,
                size=settings.size,
                location=settings.location,
                min_instances=settings.min_instances,
                max_instances=settings.max_instances,
                idle_time_before_scale_down=settings.idle_time_before_scaledown_down,
                tier=settings.tier,
            )
            client.begin_create_or_update(compute_cluster).result()
            return compute_name

    return None

flavors

Azure integration flavors.

Classes
AzureArtifactStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseArtifactStoreConfig, AuthenticationConfigMixin

Configuration class for Azure Artifact Store.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
AzureArtifactStoreFlavor

Bases: BaseArtifactStoreFlavor

Azure Artifact Store flavor.

Attributes
config_class: Type[AzureArtifactStoreConfig] property

Returns AzureArtifactStoreConfig config class.

Returns:

Type Description
Type[AzureArtifactStoreConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureArtifactStore] property

Implementation class.

Returns:

Type Description
Type[AzureArtifactStore]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

AzureMLOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, AzureMLOrchestratorSettings

Configuration for the AzureML orchestrator.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronously or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronously or not.

AzureMLOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the AzureML orchestrator.

Attributes
config_class: Type[AzureMLOrchestratorConfig] property

Returns AzureMLOrchestratorConfig config class.

Returns:

Type Description
Type[AzureMLOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLOrchestrator] property

Implementation class.

Returns:

Type Description
Type[AzureMLOrchestrator]

The implementation class.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

AzureMLOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: AzureMLComputeSettings

Settings for the AzureML orchestrator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
AzureMLStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, AzureMLStepOperatorSettings

Config for the AzureML step operator.

Attributes:

Name Type Description
subscription_id str

The Azure account's subscription ID

resource_group str

The resource group to which the AzureML workspace is deployed.

workspace_name str

The name of the AzureML Workspace.

tenant_id Optional[str]

The Azure Tenant ID.

service_principal_id Optional[str]

The ID for the service principal that is created to allow apps to access secure resources.

service_principal_password Optional[str]

Password for the service principal.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

AzureMLStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Flavor for the AzureML step operator.

Attributes
config_class: Type[AzureMLStepOperatorConfig] property

Returns AzureMLStepOperatorConfig config class.

Returns:

Type Description
Type[AzureMLStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLStepOperator] property

Implementation class.

Returns:

Type Description
Type[AzureMLStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

AzureMLStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: AzureMLComputeSettings

Settings for the AzureML step operator.

Attributes:

Name Type Description
compute_target_name Optional[str]

The name of the configured ComputeTarget. Deprecated in favor of compute_name.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Modules
azure_artifact_store_flavor

Azure artifact store flavor.

Classes
AzureArtifactStoreConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseArtifactStoreConfig, AuthenticationConfigMixin

Configuration class for Azure Artifact Store.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
AzureArtifactStoreFlavor

Bases: BaseArtifactStoreFlavor

Azure Artifact Store flavor.

Attributes
config_class: Type[AzureArtifactStoreConfig] property

Returns AzureArtifactStoreConfig config class.

Returns:

Type Description
Type[AzureArtifactStoreConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureArtifactStore] property

Implementation class.

Returns:

Type Description
Type[AzureArtifactStore]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

azureml

AzureML definitions.

Classes
AzureMLComputeSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseSettings

Settings for the AzureML compute.

These settings adjust the compute resources that will be used by the pipeline execution.

There are three possible use cases for this implementation:

1. Serverless compute (default behavior):
    - The `mode` is set to `serverless` (default behavior).
    - All the other parameters become irrelevant and will throw a
    warning if set.

2. Compute instance:
    - The `mode` is set to `compute-instance`.
    - In this case, users have to provide a `compute-name`.
        - If a compute instance exists with this name, this instance
        will be used and all the other parameters become irrelevant
        and will throw a warning if set.
        - If a compute instance does not already exist, ZenML will
        create it. You can use the parameters `compute_size` and
        `idle_type_before_shutdown_minutes` for this operation.

3. Compute cluster:
    - The `mode` is set to `compute-cluster`.
    - In this case, users have to provide a `compute-name`.
        - If a compute cluster exists with this name, this instance
        will be used and all the other parameters become irrelevant
        and will throw a warning if set.
        - If a compute cluster does not already exist, ZenML will
        create it. You can set the additional parameters for this
        operation.
Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions
azureml_settings_validator() -> AzureMLComputeSettings

Checks whether the right configuration is set based on mode.

Returns:

Type Description
AzureMLComputeSettings

the instance itself.

Raises:

Type Description
AssertionError

if a name is not provided when working with instances and clusters.

Source code in src/zenml/integrations/azure/flavors/azureml.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@model_validator(mode="after")
def azureml_settings_validator(self) -> "AzureMLComputeSettings":
    """Checks whether the right configuration is set based on mode.

    Returns:
        the instance itself.

    Raises:
        AssertionError: if a name is not provided when working with
            instances and clusters.
    """
    viable_configuration_fields = {
        AzureMLComputeTypes.SERVERLESS: {"mode"},
        AzureMLComputeTypes.COMPUTE_INSTANCE: {
            "mode",
            "compute_name",
            "size",
            "idle_time_before_shutdown_minutes",
        },
        AzureMLComputeTypes.COMPUTE_CLUSTER: {
            "mode",
            "compute_name",
            "size",
            "idle_time_before_scaledown_down",
            "location",
            "min_instances",
            "max_instances",
            "tier",
        },
    }
    viable_fields = viable_configuration_fields[self.mode]

    for field in self.model_fields_set:
        if (
            field not in viable_fields
            and field in AzureMLComputeSettings.model_fields
        ):
            logger.warning(
                f"In the {self.__class__.__name__} settings, the mode of "
                f"operation is set to {self.mode}. In this mode, you can "
                f"not configure the parameter '{field}'. This "
                "configuration will be ignored."
            )

    if (
        self.mode == AzureMLComputeTypes.COMPUTE_INSTANCE
        or self.mode == AzureMLComputeTypes.COMPUTE_CLUSTER
    ):
        assert self.compute_name is not None, (
            "When you are working with compute instances and clusters, "
            "please define a name for the compute target."
        )

    return self
AzureMLComputeTypes

Bases: StrEnum

Enum for different types of compute on AzureML.

Functions
azureml_orchestrator_flavor

Implementation of the AzureML Orchestrator flavor.

Classes
AzureMLOrchestratorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseOrchestratorConfig, AzureMLOrchestratorSettings

Configuration for the AzureML orchestrator.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

is_schedulable: bool property

Whether the orchestrator is schedulable or not.

Returns:

Type Description
bool

Whether the orchestrator is schedulable or not.

is_synchronous: bool property

Whether the orchestrator runs synchronously or not.

Returns:

Type Description
bool

Whether the orchestrator runs synchronously or not.

AzureMLOrchestratorFlavor

Bases: BaseOrchestratorFlavor

Flavor for the AzureML orchestrator.

Attributes
config_class: Type[AzureMLOrchestratorConfig] property

Returns AzureMLOrchestratorConfig config class.

Returns:

Type Description
Type[AzureMLOrchestratorConfig]

The config class.

docs_url: Optional[str] property

A URL to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLOrchestrator] property

Implementation class.

Returns:

Type Description
Type[AzureMLOrchestrator]

The implementation class.

logo_url: str property

A URL to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A URL to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

AzureMLOrchestratorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: AzureMLComputeSettings

Settings for the AzureML orchestrator.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions
azureml_step_operator_flavor

AzureML step operator flavor.

Classes
AzureMLStepOperatorConfig(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: BaseStepOperatorConfig, AzureMLStepOperatorSettings

Config for the AzureML step operator.

Attributes:

Name Type Description
subscription_id str

The Azure account's subscription ID

resource_group str

The resource group to which the AzureML workspace is deployed.

workspace_name str

The name of the AzureML Workspace.

tenant_id Optional[str]

The Azure Tenant ID.

service_principal_id Optional[str]

The ID for the service principal that is created to allow apps to access secure resources.

service_principal_password Optional[str]

Password for the service principal.

Source code in src/zenml/stack/stack_component.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using
            plain-text secrets.
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if pydantic_utils.has_validators(
            pydantic_class=self.__class__, field_name=key
        ):
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
Attributes
is_remote: bool property

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

AzureMLStepOperatorFlavor

Bases: BaseStepOperatorFlavor

Flavor for the AzureML step operator.

Attributes
config_class: Type[AzureMLStepOperatorConfig] property

Returns AzureMLStepOperatorConfig config class.

Returns:

Type Description
Type[AzureMLStepOperatorConfig]

The config class.

docs_url: Optional[str] property

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[AzureMLStepOperator] property

Implementation class.

Returns:

Type Description
Type[AzureMLStepOperator]

The implementation class.

logo_url: str property

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

service_connector_requirements: Optional[ServiceConnectorRequirements] property

Service connector resource requirements for service connectors.

Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.

Returns:

Type Description
Optional[ServiceConnectorRequirements]

Requirements for compatible service connectors, if a service

Optional[ServiceConnectorRequirements]

connector is required for this flavor.

AzureMLStepOperatorSettings(warn_about_plain_text_secrets: bool = False, **kwargs: Any)

Bases: AzureMLComputeSettings

Settings for the AzureML step operator.

Attributes:

Name Type Description
compute_target_name Optional[str]

The name of the configured ComputeTarget. Deprecated in favor of compute_name.

Source code in src/zenml/config/secret_reference_mixin.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self, warn_about_plain_text_secrets: bool = False, **kwargs: Any
) -> None:
    """Ensures that secret references are only passed for valid fields.

    This method ensures that secret references are not passed for fields
    that explicitly prevent them or require pydantic validation.

    Args:
        warn_about_plain_text_secrets: If true, then warns about using plain-text secrets.
        **kwargs: Arguments to initialize this object.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            or an attribute which explicitly disallows secret references
            is passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.model_fields[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if (
                secret_utils.is_secret_field(field)
                and warn_about_plain_text_secrets
            ):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}`. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in sensitive information as secrets. Check out the "
                    "documentation on how to configure values with secrets "
                    "here: https://docs.zenml.io/getting-started/deploying-zenml/secret-management"
                )
            continue

        if secret_utils.is_clear_text_field(field):
            raise ValueError(
                f"Passing the `{key}` attribute as a secret reference is "
                "not allowed."
            )

        requires_validation = has_validators(
            pydantic_class=self.__class__, field_name=key
        )
        if requires_validation:
            raise ValueError(
                f"Passing the attribute `{key}` as a secret reference is "
                "not allowed as additional validation is required for "
                "this attribute."
            )

    super().__init__(**kwargs)
Functions

orchestrators

AzureML orchestrator.

Classes
AzureMLOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines on AzureML.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: AzureMLOrchestratorConfig property

Returns the AzureMLOrchestratorConfig config.

Returns:

Type Description
AzureMLOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the AzureML orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
compute_metadata(job: Any) -> Iterator[Dict[str, MetadataType]]

Generate run metadata based on the generated AzureML PipelineJob.

Parameters:

Name Type Description Default
job Any

The corresponding PipelineJob object.

required

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def compute_metadata(self, job: Any) -> Iterator[Dict[str, MetadataType]]:
    """Generate run metadata based on the generated AzureML PipelineJob.

    Args:
        job: The corresponding PipelineJob object.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    # Metadata
    metadata: Dict[str, MetadataType] = {}

    # Orchestrator Run ID
    if run_id := self._compute_orchestrator_run_id(job):
        metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id

    # URL to the AzureML's pipeline view
    if orchestrator_url := self._compute_orchestrator_url(job):
        metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)

    yield metadata
fetch_status(run: PipelineRunResponse) -> ExecutionStatus

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required

Returns:

Type Description
ExecutionStatus

the actual status of the pipeline execution.

Raises:

Type Description
AssertionError

If the run was not executed by to this orchestrator.

ValueError

If it fetches an unknown state or if we can not fetch the orchestrator run ID.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.

    Returns:
        the actual status of the pipeline execution.

    Raises:
        AssertionError: If the run was not executed by to this orchestrator.
        ValueError: If it fetches an unknown state or if we can not fetch
            the orchestrator run ID.
    """
    # Make sure that the stack exists and is accessible
    if run.stack is None:
        raise ValueError(
            "The stack that the run was executed on is not available "
            "anymore."
        )

    # Make sure that the run belongs to this orchestrator
    assert (
        self.id
        == run.stack.components[StackComponentType.ORCHESTRATOR][0].id
    )

    # Initialize the AzureML client
    if connector := self.get_connector():
        credentials = connector.connect()
    else:
        credentials = DefaultAzureCredential()

    ml_client = MLClient(
        credential=credentials,
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace,
    )

    # Fetch the status of the PipelineJob
    if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
        run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
    elif run.orchestrator_run_id is not None:
        run_id = run.orchestrator_run_id
    else:
        raise ValueError(
            "Can not find the orchestrator run ID, thus can not fetch "
            "the status."
        )
    status = ml_client.jobs.get(run_id).status

    # Map the potential outputs to ZenML ExecutionStatus. Potential values:
    # https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.pipelinejob?view=azure-python#azure-ai-ml-entities-pipelinejob-status
    if status in [
        "NotStarted",
        "Starting",
        "Provisioning",
        "Preparing",
        "Queued",
    ]:
        return ExecutionStatus.INITIALIZING
    elif status in ["Running", "Finalizing"]:
        return ExecutionStatus.RUNNING
    elif status in [
        "CancelRequested",
        "Failed",
        "Canceled",
        "NotResponding",
    ]:
        return ExecutionStatus.FAILED
    elif status in ["Completed"]:
        return ExecutionStatus.COMPLETED
    else:
        raise ValueError("Unknown status for the pipeline job.")
get_orchestrator_run_id() -> str

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Raises:

Type Description
RuntimeError

If the run id cannot be read from the environment.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If the run id cannot be read from the environment.
    """
    try:
        return os.environ[ENV_ZENML_AZUREML_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AZUREML_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    try:
        if connector := self.get_connector():
            credentials = connector.connect()
        else:
            credentials = DefaultAzureCredential()

        ml_client = MLClient(
            credential=credentials,
            subscription_id=self.config.subscription_id,
            resource_group_name=self.config.resource_group,
            workspace_name=self.config.workspace,
        )

        azureml_root_run_id = os.environ[ENV_ZENML_AZUREML_RUN_ID]
        azureml_job = ml_client.jobs.get(azureml_root_run_id)

        return {
            METADATA_ORCHESTRATOR_URL: Uri(azureml_job.studio_url),
        }
    except Exception as e:
        logger.warning(
            f"Failed to fetch the Studio URL of the AzureML pipeline "
            f"job: {e}"
        )
        return {}
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Iterator[Dict[str, MetadataType]]

Prepares or runs a pipeline on AzureML.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment to prepare or run.

required
stack Stack

The stack to run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If the creation of the schedule fails.

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Iterator[Dict[str, MetadataType]]:
    """Prepares or runs a pipeline on AzureML.

    Args:
        deployment: The deployment to prepare or run.
        stack: The stack to run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If the creation of the schedule fails.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    # Authentication
    if connector := self.get_connector():
        credentials = connector.connect()
    else:
        credentials = DefaultAzureCredential()

    # Settings
    settings = cast(
        AzureMLOrchestratorSettings,
        self.get_settings(deployment),
    )

    # Client creation
    ml_client = MLClient(
        credential=credentials,
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace,
    )

    # Create components
    components = {}
    for step_name, step in deployment.step_configurations.items():
        # Get the image for each step
        image = self.get_image(deployment=deployment, step_name=step_name)

        # Get the command and arguments
        command = AzureMLEntrypointConfiguration.get_entrypoint_command()
        arguments = (
            AzureMLEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name,
                deployment_id=deployment.id,
                zenml_env_variables=b64_encode(json.dumps(environment)),
            )
        )

        # Generate an AzureML CommandComponent
        components[step_name] = self._create_command_component(
            step=step,
            step_name=step_name,
            env_name=deployment.pipeline_configuration.name,
            image=image,
            command=command,
            arguments=arguments,
        )

    # Pipeline definition
    pipeline_args = dict()
    run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    pipeline_args["name"] = run_name

    if compute_target := create_or_get_compute(
        ml_client, settings, default_compute_name=f"zenml_{self.id}"
    ):
        pipeline_args["compute"] = compute_target

    @pipeline(force_rerun=True, **pipeline_args)  # type: ignore[call-overload, misc]
    def azureml_pipeline() -> None:
        """Create an AzureML pipeline."""
        # Here we have to track the inputs and outputs so that we can bind
        # the components to each other to execute them in a specific order.
        component_outputs: Dict[str, Any] = {}
        for component_name, component in components.items():
            # Inputs
            component_inputs = {}
            if component.inputs:
                component_inputs.update(
                    {i: component_outputs[i] for i in component.inputs}
                )

            # Job
            component_job = component(**component_inputs)

            # Outputs
            if component_job.outputs:
                component_outputs[component_name] = (
                    component_job.outputs.completed
                )

    # Create and execute the pipeline job
    pipeline_job = azureml_pipeline()

    if settings.mode == AzureMLComputeTypes.SERVERLESS:
        pipeline_job.settings.default_compute = "serverless"

    # Scheduling
    if schedule := deployment.schedule:
        try:
            schedule_trigger: Optional[
                Union[CronTrigger, RecurrenceTrigger]
            ] = None

            start_time = None
            if schedule.start_time is not None:
                start_time = schedule.start_time.isoformat()

            end_time = None
            if schedule.end_time is not None:
                end_time = schedule.end_time.isoformat()

            if schedule.cron_expression:
                # If we are working with a cron expression
                schedule_trigger = CronTrigger(
                    expression=schedule.cron_expression,
                    start_time=start_time,
                    end_time=end_time,
                    time_zone=TimeZone.UTC,
                )

            elif schedule.interval_second:
                # If we are working with intervals
                interval = schedule.interval_second.total_seconds()

                if interval % 60 != 0:
                    logger.warning(
                        "The ZenML AzureML orchestrator only works with "
                        "time intervals defined over minutes. Will "
                        f"use a schedule over {int(interval // 60)}."
                    )

                if interval < 60:
                    raise RuntimeError(
                        "Can not create a schedule with an interval less "
                        "than 60 secs."
                    )

                frequency = "minute"
                interval = int(interval // 60)

                schedule_trigger = RecurrenceTrigger(
                    frequency=frequency,
                    interval=interval,
                    start_time=start_time,
                    end_time=end_time,
                    time_zone=TimeZone.UTC,
                )

            if schedule_trigger:
                # Create and execute the job schedule
                job_schedule = JobSchedule(
                    name=run_name,
                    trigger=schedule_trigger,
                    create_job=pipeline_job,
                )
                ml_client.schedules.begin_create_or_update(
                    job_schedule
                ).result()
                logger.info(
                    f"Scheduled pipeline '{run_name}' with recurrence "
                    "or cron expression."
                )
            else:
                raise RuntimeError(
                    "No valid scheduling configuration found for "
                    f"pipeline '{run_name}'."
                )

        except (HttpResponseError, ResourceExistsError) as e:
            raise RuntimeError(
                "Failed to create schedule for the pipeline "
                f"'{run_name}': {str(e)}"
            )

    else:
        job = ml_client.jobs.create_or_update(pipeline_job)
        logger.info(f"Pipeline {run_name} has been started.")

        # Yield metadata based on the generated job object
        yield from self.compute_metadata(job)

        assert job.services is not None
        assert job.name is not None

        logger.info(
            f"Pipeline {run_name} is running. "
            "You can view the pipeline in the AzureML portal at "
            f"{job.services['Studio'].endpoint}"
        )

        if settings.synchronous:
            logger.info("Waiting for pipeline to finish...")
            ml_client.jobs.stream(job.name)
Modules
azureml_orchestrator

Implementation of the AzureML Orchestrator.

Classes
AzureMLOrchestrator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: ContainerizedOrchestrator

Orchestrator responsible for running pipelines on AzureML.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: AzureMLOrchestratorConfig property

Returns the AzureMLOrchestratorConfig config.

Returns:

Type Description
AzureMLOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the AzureML orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

In the remote case, checks that the stack contains a container registry, image builder and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

Functions
compute_metadata(job: Any) -> Iterator[Dict[str, MetadataType]]

Generate run metadata based on the generated AzureML PipelineJob.

Parameters:

Name Type Description Default
job Any

The corresponding PipelineJob object.

required

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
def compute_metadata(self, job: Any) -> Iterator[Dict[str, MetadataType]]:
    """Generate run metadata based on the generated AzureML PipelineJob.

    Args:
        job: The corresponding PipelineJob object.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    # Metadata
    metadata: Dict[str, MetadataType] = {}

    # Orchestrator Run ID
    if run_id := self._compute_orchestrator_run_id(job):
        metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id

    # URL to the AzureML's pipeline view
    if orchestrator_url := self._compute_orchestrator_url(job):
        metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)

    yield metadata
fetch_status(run: PipelineRunResponse) -> ExecutionStatus

Refreshes the status of a specific pipeline run.

Parameters:

Name Type Description Default
run PipelineRunResponse

The run that was executed by this orchestrator.

required

Returns:

Type Description
ExecutionStatus

the actual status of the pipeline execution.

Raises:

Type Description
AssertionError

If the run was not executed by to this orchestrator.

ValueError

If it fetches an unknown state or if we can not fetch the orchestrator run ID.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
    """Refreshes the status of a specific pipeline run.

    Args:
        run: The run that was executed by this orchestrator.

    Returns:
        the actual status of the pipeline execution.

    Raises:
        AssertionError: If the run was not executed by to this orchestrator.
        ValueError: If it fetches an unknown state or if we can not fetch
            the orchestrator run ID.
    """
    # Make sure that the stack exists and is accessible
    if run.stack is None:
        raise ValueError(
            "The stack that the run was executed on is not available "
            "anymore."
        )

    # Make sure that the run belongs to this orchestrator
    assert (
        self.id
        == run.stack.components[StackComponentType.ORCHESTRATOR][0].id
    )

    # Initialize the AzureML client
    if connector := self.get_connector():
        credentials = connector.connect()
    else:
        credentials = DefaultAzureCredential()

    ml_client = MLClient(
        credential=credentials,
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace,
    )

    # Fetch the status of the PipelineJob
    if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
        run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
    elif run.orchestrator_run_id is not None:
        run_id = run.orchestrator_run_id
    else:
        raise ValueError(
            "Can not find the orchestrator run ID, thus can not fetch "
            "the status."
        )
    status = ml_client.jobs.get(run_id).status

    # Map the potential outputs to ZenML ExecutionStatus. Potential values:
    # https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.pipelinejob?view=azure-python#azure-ai-ml-entities-pipelinejob-status
    if status in [
        "NotStarted",
        "Starting",
        "Provisioning",
        "Preparing",
        "Queued",
    ]:
        return ExecutionStatus.INITIALIZING
    elif status in ["Running", "Finalizing"]:
        return ExecutionStatus.RUNNING
    elif status in [
        "CancelRequested",
        "Failed",
        "Canceled",
        "NotResponding",
    ]:
        return ExecutionStatus.FAILED
    elif status in ["Completed"]:
        return ExecutionStatus.COMPLETED
    else:
        raise ValueError("Unknown status for the pipeline job.")
get_orchestrator_run_id() -> str

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Raises:

Type Description
RuntimeError

If the run id cannot be read from the environment.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.

    Raises:
        RuntimeError: If the run id cannot be read from the environment.
    """
    try:
        return os.environ[ENV_ZENML_AZUREML_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AZUREML_RUN_ID}."
        )
get_pipeline_run_metadata(run_id: UUID) -> Dict[str, MetadataType]

Get general component-specific metadata for a pipeline run.

Parameters:

Name Type Description Default
run_id UUID

The ID of the pipeline run.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
def get_pipeline_run_metadata(
    self, run_id: UUID
) -> Dict[str, "MetadataType"]:
    """Get general component-specific metadata for a pipeline run.

    Args:
        run_id: The ID of the pipeline run.

    Returns:
        A dictionary of metadata.
    """
    try:
        if connector := self.get_connector():
            credentials = connector.connect()
        else:
            credentials = DefaultAzureCredential()

        ml_client = MLClient(
            credential=credentials,
            subscription_id=self.config.subscription_id,
            resource_group_name=self.config.resource_group,
            workspace_name=self.config.workspace,
        )

        azureml_root_run_id = os.environ[ENV_ZENML_AZUREML_RUN_ID]
        azureml_job = ml_client.jobs.get(azureml_root_run_id)

        return {
            METADATA_ORCHESTRATOR_URL: Uri(azureml_job.studio_url),
        }
    except Exception as e:
        logger.warning(
            f"Failed to fetch the Studio URL of the AzureML pipeline "
            f"job: {e}"
        )
        return {}
prepare_or_run_pipeline(deployment: PipelineDeploymentResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None) -> Iterator[Dict[str, MetadataType]]

Prepares or runs a pipeline on AzureML.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponse

The deployment to prepare or run.

required
stack Stack

The stack to run on.

required
environment Dict[str, str]

Environment variables to set in the orchestration environment.

required
placeholder_run Optional[PipelineRunResponse]

An optional placeholder run for the deployment.

None

Raises:

Type Description
RuntimeError

If the creation of the schedule fails.

Yields:

Type Description
Dict[str, MetadataType]

A dictionary of metadata related to the pipeline run.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponse",
    stack: "Stack",
    environment: Dict[str, str],
    placeholder_run: Optional["PipelineRunResponse"] = None,
) -> Iterator[Dict[str, MetadataType]]:
    """Prepares or runs a pipeline on AzureML.

    Args:
        deployment: The deployment to prepare or run.
        stack: The stack to run on.
        environment: Environment variables to set in the orchestration
            environment.
        placeholder_run: An optional placeholder run for the deployment.

    Raises:
        RuntimeError: If the creation of the schedule fails.

    Yields:
        A dictionary of metadata related to the pipeline run.
    """
    # Authentication
    if connector := self.get_connector():
        credentials = connector.connect()
    else:
        credentials = DefaultAzureCredential()

    # Settings
    settings = cast(
        AzureMLOrchestratorSettings,
        self.get_settings(deployment),
    )

    # Client creation
    ml_client = MLClient(
        credential=credentials,
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace,
    )

    # Create components
    components = {}
    for step_name, step in deployment.step_configurations.items():
        # Get the image for each step
        image = self.get_image(deployment=deployment, step_name=step_name)

        # Get the command and arguments
        command = AzureMLEntrypointConfiguration.get_entrypoint_command()
        arguments = (
            AzureMLEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name,
                deployment_id=deployment.id,
                zenml_env_variables=b64_encode(json.dumps(environment)),
            )
        )

        # Generate an AzureML CommandComponent
        components[step_name] = self._create_command_component(
            step=step,
            step_name=step_name,
            env_name=deployment.pipeline_configuration.name,
            image=image,
            command=command,
            arguments=arguments,
        )

    # Pipeline definition
    pipeline_args = dict()
    run_name = get_orchestrator_run_name(
        pipeline_name=deployment.pipeline_configuration.name
    )
    pipeline_args["name"] = run_name

    if compute_target := create_or_get_compute(
        ml_client, settings, default_compute_name=f"zenml_{self.id}"
    ):
        pipeline_args["compute"] = compute_target

    @pipeline(force_rerun=True, **pipeline_args)  # type: ignore[call-overload, misc]
    def azureml_pipeline() -> None:
        """Create an AzureML pipeline."""
        # Here we have to track the inputs and outputs so that we can bind
        # the components to each other to execute them in a specific order.
        component_outputs: Dict[str, Any] = {}
        for component_name, component in components.items():
            # Inputs
            component_inputs = {}
            if component.inputs:
                component_inputs.update(
                    {i: component_outputs[i] for i in component.inputs}
                )

            # Job
            component_job = component(**component_inputs)

            # Outputs
            if component_job.outputs:
                component_outputs[component_name] = (
                    component_job.outputs.completed
                )

    # Create and execute the pipeline job
    pipeline_job = azureml_pipeline()

    if settings.mode == AzureMLComputeTypes.SERVERLESS:
        pipeline_job.settings.default_compute = "serverless"

    # Scheduling
    if schedule := deployment.schedule:
        try:
            schedule_trigger: Optional[
                Union[CronTrigger, RecurrenceTrigger]
            ] = None

            start_time = None
            if schedule.start_time is not None:
                start_time = schedule.start_time.isoformat()

            end_time = None
            if schedule.end_time is not None:
                end_time = schedule.end_time.isoformat()

            if schedule.cron_expression:
                # If we are working with a cron expression
                schedule_trigger = CronTrigger(
                    expression=schedule.cron_expression,
                    start_time=start_time,
                    end_time=end_time,
                    time_zone=TimeZone.UTC,
                )

            elif schedule.interval_second:
                # If we are working with intervals
                interval = schedule.interval_second.total_seconds()

                if interval % 60 != 0:
                    logger.warning(
                        "The ZenML AzureML orchestrator only works with "
                        "time intervals defined over minutes. Will "
                        f"use a schedule over {int(interval // 60)}."
                    )

                if interval < 60:
                    raise RuntimeError(
                        "Can not create a schedule with an interval less "
                        "than 60 secs."
                    )

                frequency = "minute"
                interval = int(interval // 60)

                schedule_trigger = RecurrenceTrigger(
                    frequency=frequency,
                    interval=interval,
                    start_time=start_time,
                    end_time=end_time,
                    time_zone=TimeZone.UTC,
                )

            if schedule_trigger:
                # Create and execute the job schedule
                job_schedule = JobSchedule(
                    name=run_name,
                    trigger=schedule_trigger,
                    create_job=pipeline_job,
                )
                ml_client.schedules.begin_create_or_update(
                    job_schedule
                ).result()
                logger.info(
                    f"Scheduled pipeline '{run_name}' with recurrence "
                    "or cron expression."
                )
            else:
                raise RuntimeError(
                    "No valid scheduling configuration found for "
                    f"pipeline '{run_name}'."
                )

        except (HttpResponseError, ResourceExistsError) as e:
            raise RuntimeError(
                "Failed to create schedule for the pipeline "
                f"'{run_name}': {str(e)}"
            )

    else:
        job = ml_client.jobs.create_or_update(pipeline_job)
        logger.info(f"Pipeline {run_name} has been started.")

        # Yield metadata based on the generated job object
        yield from self.compute_metadata(job)

        assert job.services is not None
        assert job.name is not None

        logger.info(
            f"Pipeline {run_name} is running. "
            "You can view the pipeline in the AzureML portal at "
            f"{job.services['Studio'].endpoint}"
        )

        if settings.synchronous:
            logger.info("Waiting for pipeline to finish...")
            ml_client.jobs.stream(job.name)
Functions
azureml_orchestrator_entrypoint_config

Entrypoint configuration for ZenML AzureML pipeline steps.

Classes
AzureMLEntrypointConfiguration(arguments: List[str])

Bases: StepEntrypointConfiguration

Entrypoint configuration for ZenML AzureML pipeline steps.

Source code in src/zenml/entrypoints/base_entrypoint_configuration.py
60
61
62
63
64
65
66
def __init__(self, arguments: List[str]):
    """Initializes the entrypoint configuration.

    Args:
        arguments: Command line arguments to configure this object.
    """
    self.entrypoint_args = self._parse_arguments(arguments)
Functions
get_entrypoint_arguments(**kwargs: Any) -> List[str] classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
**kwargs Any

Kwargs, can include the environmental variables.

{}

Returns:

Type Description
List[str]

The superclass arguments as well as arguments for environmental

List[str]

variables.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        **kwargs: Kwargs, can include the environmental variables.

    Returns:
        The superclass arguments as well as arguments for environmental
        variables.
    """
    return super().get_entrypoint_arguments(**kwargs) + [
        f"--{ZENML_ENV_VARIABLES}",
        kwargs[ZENML_ENV_VARIABLES],
    ]
get_entrypoint_options() -> Set[str] classmethod

Gets all options required for running with this configuration.

Returns:

Type Description
Set[str]

The superclass options as well as an option for the

Set[str]

environmental variables.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
32
33
34
35
36
37
38
39
40
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all options required for running with this configuration.

    Returns:
        The superclass options as well as an option for the
        environmental variables.
    """
    return super().get_entrypoint_options() | {ZENML_ENV_VARIABLES}
run() -> None

Runs the step.

Source code in src/zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def run(self) -> None:
    """Runs the step."""
    # Set the environmental variables first
    self._set_env_variables()

    # Run the step
    super().run()

    # Unfortunately, in AzureML's Python SDK v2, there is no native way
    # to execute steps/components in a specific sequence. In order to
    # establish the correct order, we are using dummy inputs and
    # outputs. However, these steps only execute if the inputs and outputs
    # actually exist. This is why we create a dummy file and write to it and
    # use it as the output of the steps.
    if completed := os.environ.get(AZURE_ML_OUTPUT_COMPLETED):
        os.makedirs(os.path.dirname(completed), exist_ok=True)
        with open(completed, "w") as f:
            f.write("Component completed!")
Functions

service_connectors

Azure Service Connector.

Classes
AzureServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Azure service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Attributes
subscription: Tuple[str, str] property

Get the Azure subscription ID and name.

Returns:

Type Description
Tuple[str, str]

The Azure subscription ID and name.

Raises:

Type Description
AuthorizationException

If the Azure subscription could not be determined or doesn't match the configured subscription ID.

tenant_id: str property

Get the Azure tenant ID.

Returns:

Type Description
str

The Azure tenant ID.

Raises:

Type Description
AuthorizationException

If the Azure tenant ID could not be determined or doesn't match the configured tenant ID.

Functions
get_azure_credential(auth_method: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None) -> Tuple[TokenCredential, Optional[datetime.datetime]]

Get an Azure credential for the specified resource.

Parameters:

Name Type Description Default
auth_method str

The authentication method to use.

required
resource_type Optional[str]

The resource type to get a credential for.

None
resource_id Optional[str]

The resource ID to get a credential for.

None

Returns:

Type Description
TokenCredential

An Azure credential for the specified resource and its expiration

Optional[datetime]

timestamp, if applicable.

Source code in src/zenml/integrations/azure/service_connectors/azure_service_connector.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def get_azure_credential(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
    """Get an Azure credential for the specified resource.

    Args:
        auth_method: The authentication method to use.
        resource_type: The resource type to get a credential for.
        resource_id: The resource ID to get a credential for.

    Returns:
        An Azure credential for the specified resource and its expiration
        timestamp, if applicable.
    """
    # We maintain a cache of all sessions to avoid re-authenticating
    # multiple times for the same resource
    key = auth_method
    if key in self._session_cache:
        session, expires_at = self._session_cache[key]
        if expires_at is None:
            return session, None

        # Refresh expired sessions

        # check if the token expires in the near future
        if expires_at > utc_now(tz_aware=expires_at) + datetime.timedelta(
            minutes=AZURE_SESSION_EXPIRATION_BUFFER
        ):
            return session, expires_at

    logger.debug(
        f"Creating Azure credential for auth method '{auth_method}', "
        f"resource type '{resource_type}' and resource ID "
        f"'{resource_id}'..."
    )
    session, expires_at = self._authenticate(
        auth_method, resource_type, resource_id
    )
    self._session_cache[key] = (session, expires_at)
    return session, expires_at
Modules
azure_service_connector

Azure Service Connector.

Classes
AzureAccessToken

Bases: AuthenticationConfig

Azure access token credentials.

AzureAccessTokenConfig

Bases: AzureBaseConfig, AzureAccessToken

Azure token configuration.

AzureAuthenticationMethods

Bases: StrEnum

Azure Authentication methods.

AzureBaseConfig

Bases: AuthenticationConfig

Azure base configuration.

AzureClientConfig

Bases: AzureBaseConfig

Azure client configuration.

AzureClientSecret

Bases: AuthenticationConfig

Azure client secret credentials.

AzureServiceConnector(**kwargs: Any)

Bases: ServiceConnector

Azure service connector.

Source code in src/zenml/service_connectors/service_connector.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
Attributes
subscription: Tuple[str, str] property

Get the Azure subscription ID and name.

Returns:

Type Description
Tuple[str, str]

The Azure subscription ID and name.

Raises:

Type Description
AuthorizationException

If the Azure subscription could not be determined or doesn't match the configured subscription ID.

tenant_id: str property

Get the Azure tenant ID.

Returns:

Type Description
str

The Azure tenant ID.

Raises:

Type Description
AuthorizationException

If the Azure tenant ID could not be determined or doesn't match the configured tenant ID.

Functions
get_azure_credential(auth_method: str, resource_type: Optional[str] = None, resource_id: Optional[str] = None) -> Tuple[TokenCredential, Optional[datetime.datetime]]

Get an Azure credential for the specified resource.

Parameters:

Name Type Description Default
auth_method str

The authentication method to use.

required
resource_type Optional[str]

The resource type to get a credential for.

None
resource_id Optional[str]

The resource ID to get a credential for.

None

Returns:

Type Description
TokenCredential

An Azure credential for the specified resource and its expiration

Optional[datetime]

timestamp, if applicable.

Source code in src/zenml/integrations/azure/service_connectors/azure_service_connector.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def get_azure_credential(
    self,
    auth_method: str,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
    """Get an Azure credential for the specified resource.

    Args:
        auth_method: The authentication method to use.
        resource_type: The resource type to get a credential for.
        resource_id: The resource ID to get a credential for.

    Returns:
        An Azure credential for the specified resource and its expiration
        timestamp, if applicable.
    """
    # We maintain a cache of all sessions to avoid re-authenticating
    # multiple times for the same resource
    key = auth_method
    if key in self._session_cache:
        session, expires_at = self._session_cache[key]
        if expires_at is None:
            return session, None

        # Refresh expired sessions

        # check if the token expires in the near future
        if expires_at > utc_now(tz_aware=expires_at) + datetime.timedelta(
            minutes=AZURE_SESSION_EXPIRATION_BUFFER
        ):
            return session, expires_at

    logger.debug(
        f"Creating Azure credential for auth method '{auth_method}', "
        f"resource type '{resource_type}' and resource ID "
        f"'{resource_id}'..."
    )
    session, expires_at = self._authenticate(
        auth_method, resource_type, resource_id
    )
    self._session_cache[key] = (session, expires_at)
    return session, expires_at
AzureServicePrincipalConfig

Bases: AzureClientConfig, AzureClientSecret

Azure service principal configuration.

ZenMLAzureTokenCredential(token: str, expires_at: datetime.datetime)

Bases: TokenCredential

ZenML Azure token credential.

This class is used to provide a pre-configured token credential to Azure clients. Tokens are generated from other Azure credentials and are served to Azure clients to authenticate requests.

Initialize ZenML Azure token credential.

Parameters:

Name Type Description Default
token str

The token to use for authentication

required
expires_at datetime

The expiration time of the token

required
Source code in src/zenml/integrations/azure/service_connectors/azure_service_connector.py
168
169
170
171
172
173
174
175
176
177
178
def __init__(self, token: str, expires_at: datetime.datetime):
    """Initialize ZenML Azure token credential.

    Args:
        token: The token to use for authentication
        expires_at: The expiration time of the token
    """
    self.token = token

    # Convert the expiration time from UTC to local time
    self.expires_on = int(to_local_tz(expires_at).timestamp())
Functions
get_token(*scopes: str, **kwargs: Any) -> Any

Get token.

Parameters:

Name Type Description Default
*scopes str

Scopes

()
**kwargs Any

Keyword arguments

{}

Returns:

Type Description
Any

Token

Source code in src/zenml/integrations/azure/service_connectors/azure_service_connector.py
180
181
182
183
184
185
186
187
188
189
190
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
    """Get token.

    Args:
        *scopes: Scopes
        **kwargs: Keyword arguments

    Returns:
        Token
    """
    return AccessToken(token=self.token, expires_on=self.expires_on)
Functions

step_operators

Initialization of AzureML Step Operator integration.

Classes
AzureMLStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run a step on AzureML.

This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: AzureMLStepOperatorConfig property

Returns the AzureMLStepOperatorConfig config.

Returns:

Type Description
AzureMLStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the AzureML step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/azure/step_operators/azureml_step_operator.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on AzureML.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/integrations/azure/step_operators/azureml_step_operator.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on AzureML.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    # Client creation
    ml_client = MLClient(
        credential=self._get_credentials(),
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace_name,
    )

    env = Environment(name=f"zenml-{info.run_name}", image=image_name)

    compute_target = create_or_get_compute(
        ml_client, settings, default_compute_name=f"zenml_{self.id}"
    )

    command_job = command(
        name=info.run_name,
        command=" ".join(entrypoint_command),
        environment=env,
        environment_variables=environment,
        compute=compute_target,
        experiment_name=info.pipeline.name,
    )

    job = ml_client.jobs.create_or_update(command_job)

    logger.info(f"AzureML job created with id: {job.id}")
    ml_client.jobs.stream(info.run_name)
Modules
azureml_step_operator

Implementation of the ZenML AzureML Step Operator.

Classes
AzureMLStepOperator(name: str, id: UUID, config: StackComponentConfig, flavor: str, type: StackComponentType, user: Optional[UUID], created: datetime, updated: datetime, labels: Optional[Dict[str, Any]] = None, connector_requirements: Optional[ServiceConnectorRequirements] = None, connector: Optional[UUID] = None, connector_resource_id: Optional[str] = None, *args: Any, **kwargs: Any)

Bases: BaseStepOperator

Step operator to run a step on AzureML.

This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.

Source code in src/zenml/stack/stack_component.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def __init__(
    self,
    name: str,
    id: UUID,
    config: StackComponentConfig,
    flavor: str,
    type: StackComponentType,
    user: Optional[UUID],
    created: datetime,
    updated: datetime,
    labels: Optional[Dict[str, Any]] = None,
    connector_requirements: Optional[ServiceConnectorRequirements] = None,
    connector: Optional[UUID] = None,
    connector_resource_id: Optional[str] = None,
    *args: Any,
    **kwargs: Any,
):
    """Initializes a StackComponent.

    Args:
        name: The name of the component.
        id: The unique ID of the component.
        config: The config of the component.
        flavor: The flavor of the component.
        type: The type of the component.
        user: The ID of the user who created the component.
        created: The creation time of the component.
        updated: The last update time of the component.
        labels: The labels of the component.
        connector_requirements: The requirements for the connector.
        connector: The ID of a connector linked to the component.
        connector_resource_id: The custom resource ID to access through
            the connector.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Raises:
        ValueError: If a secret reference is passed as name.
    """
    if secret_utils.is_secret_reference(name):
        raise ValueError(
            "Passing the `name` attribute of a stack component as a "
            "secret reference is not allowed."
        )

    self.id = id
    self.name = name
    self._config = config
    self.flavor = flavor
    self.type = type
    self.user = user
    self.created = created
    self.updated = updated
    self.labels = labels
    self.connector_requirements = connector_requirements
    self.connector = connector
    self.connector_resource_id = connector_resource_id
    self._connector_instance: Optional[ServiceConnector] = None
Attributes
config: AzureMLStepOperatorConfig property

Returns the AzureMLStepOperatorConfig config.

Returns:

Type Description
AzureMLStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property

Settings class for the AzureML step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property

Validates the stack.

Returns:

Type Description
Optional[StackValidator]

A validator that checks that the stack contains a remote container

Optional[StackValidator]

registry and a remote artifact store.

Functions
get_docker_builds(deployment: PipelineDeploymentBase) -> List[BuildConfiguration]

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBase

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in src/zenml/integrations/azure/step_operators/azureml_step_operator.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def get_docker_builds(
    self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.step_operator == self.name:
            build = BuildConfiguration(
                key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
launch(info: StepRunInfo, entrypoint_command: List[str], environment: Dict[str, str]) -> None

Launches a step on AzureML.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
environment Dict[str, str]

Environment variables to set in the step operator environment.

required
Source code in src/zenml/integrations/azure/step_operators/azureml_step_operator.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
    environment: Dict[str, str],
) -> None:
    """Launches a step on AzureML.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
        environment: Environment variables to set in the step operator
            environment.
    """
    settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
    image_name = info.get_image(key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY)

    # Client creation
    ml_client = MLClient(
        credential=self._get_credentials(),
        subscription_id=self.config.subscription_id,
        resource_group_name=self.config.resource_group,
        workspace_name=self.config.workspace_name,
    )

    env = Environment(name=f"zenml-{info.run_name}", image=image_name)

    compute_target = create_or_get_compute(
        ml_client, settings, default_compute_name=f"zenml_{self.id}"
    )

    command_job = command(
        name=info.run_name,
        command=" ".join(entrypoint_command),
        environment=env,
        environment_variables=environment,
        compute=compute_target,
        experiment_name=info.pipeline.name,
    )

    job = ml_client.jobs.create_or_update(command_job)

    logger.info(f"AzureML job created with id: {job.id}")
    ml_client.jobs.stream(info.run_name)
Functions