Azure
zenml.integrations.azure
special
Initialization of the ZenML Azure integration.
The Azure integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores,
and an io
module to handle file operations on Azure Blob Storage.
The Azure Step Operator integration submodule provides a way to run ZenML steps
in AzureML.
AzureIntegration (Integration)
Definition of Azure integration for ZenML.
Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
"""Definition of Azure integration for ZenML."""
NAME = AZURE
REQUIREMENTS = [
"adlfs>=2021.10.0",
"azure-keyvault-keys",
"azure-keyvault-secrets",
"azure-identity",
"azureml-core==1.56.0",
"azure-mgmt-containerservice>=20.0.0",
"azure-storage-blob==12.17.0", # temporary fix for https://github.com/Azure/azure-sdk-for-python/issues/32056
"kubernetes",
"azure-ai-ml==1.18.0",
# In azureml/core/_metrics.py:212 of azureml-core 1.56.0, they use
# an attribute that was removed in Numpy 2.0. However, AzureML itself
# does not have a limitation on numpy.
"numpy<2.0",
]
REQUIREMENTS_IGNORED_ON_UNINSTALL = ["kubernetes", "numpy"]
@classmethod
def activate(cls) -> None:
"""Activate the Azure integration."""
from zenml.integrations.azure import service_connectors # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.azure.flavors import (
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
AzureMLOrchestratorFlavor,
)
return [
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
AzureMLOrchestratorFlavor,
]
activate()
classmethod
Activate the Azure integration.
Source code in zenml/integrations/azure/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the Azure integration."""
from zenml.integrations.azure import service_connectors # noqa
flavors()
classmethod
Declares the flavors for the integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.azure.flavors import (
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
AzureMLOrchestratorFlavor,
)
return [
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
AzureMLOrchestratorFlavor,
]
artifact_stores
special
Initialization of the Azure Artifact Store integration.
azure_artifact_store
Implementation of the Azure Artifact Store integration.
AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)
Artifact Store for Microsoft Azure based artifacts.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for Microsoft Azure based artifacts."""
_filesystem: Optional[adlfs.AzureBlobFileSystem] = None
@property
def config(self) -> AzureArtifactStoreConfig:
"""Returns the `AzureArtifactStoreConfig` config.
Returns:
The configuration.
"""
return cast(AzureArtifactStoreConfig, self._config)
def get_credentials(self) -> Optional[AzureSecretSchema]:
"""Returns the credentials for the Azure Artifact Store if configured.
Returns:
The credentials.
Raises:
RuntimeError: If the connector is not configured with Azure service
principal credentials.
"""
connector = self.get_connector()
if connector:
from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
client = connector.connect()
if not isinstance(client, BlobServiceClient):
raise RuntimeError(
f"Expected a {BlobServiceClient.__module__}."
f"{BlobServiceClient.__name__} object while "
f"trying to use the linked connector, but got "
f"{type(client)}."
)
# Get the credentials from the client
credentials = client.credential
if not isinstance(credentials, ClientSecretCredential):
raise RuntimeError(
"The Azure Artifact Store connector can only be used "
"with a service connector that is configured with "
"Azure service principal credentials."
)
return AzureSecretSchema(
client_id=credentials._client_id,
client_secret=credentials._client_credential,
tenant_id=credentials._tenant_id,
account_name=client.account_name,
)
secret = self.get_typed_authentication_secret(
expected_schema_type=AzureSecretSchema
)
return secret
@property
def filesystem(self) -> adlfs.AzureBlobFileSystem:
"""The adlfs filesystem to access this artifact store.
Returns:
The adlfs filesystem to access this artifact store.
"""
if not self._filesystem:
secret = self.get_credentials()
credentials = secret.get_values() if secret else {}
self._filesystem = adlfs.AzureBlobFileSystem(
**credentials,
anon=False,
use_listings_cache=False,
)
return self._filesystem
def _split_path(self, path: PathType) -> Tuple[str, str]:
"""Splits a path into the filesystem prefix and remainder.
Example:
```python
prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
print(prefix, remainder) # "az://" "my_container/test.txt"
```
Args:
path: The path to split.
Returns:
A tuple of the filesystem prefix and the remainder.
"""
path = convert_to_str(path)
prefix = ""
for potential_prefix in self.config.SUPPORTED_SCHEMES:
if path.startswith(potential_prefix):
prefix = potential_prefix
path = path[len(potential_prefix) :]
break
return prefix, path
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path, exist_ok=True)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def size(self, path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return self.filesystem.size(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
config: AzureArtifactStoreConfig
property
readonly
Returns the AzureArtifactStoreConfig
config.
Returns:
Type | Description |
---|---|
AzureArtifactStoreConfig |
The configuration. |
filesystem: adlfs.AzureBlobFileSystem
property
readonly
The adlfs filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
adlfs.AzureBlobFileSystem |
The adlfs filesystem to access this artifact store. |
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
get_credentials(self)
Returns the credentials for the Azure Artifact Store if configured.
Returns:
Type | Description |
---|---|
Optional[zenml.secret.schemas.azure_secret_schema.AzureSecretSchema] |
The credentials. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the connector is not configured with Azure service principal credentials. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def get_credentials(self) -> Optional[AzureSecretSchema]:
"""Returns the credentials for the Azure Artifact Store if configured.
Returns:
The credentials.
Raises:
RuntimeError: If the connector is not configured with Azure service
principal credentials.
"""
connector = self.get_connector()
if connector:
from azure.identity import ClientSecretCredential
from azure.storage.blob import BlobServiceClient
client = connector.connect()
if not isinstance(client, BlobServiceClient):
raise RuntimeError(
f"Expected a {BlobServiceClient.__module__}."
f"{BlobServiceClient.__name__} object while "
f"trying to use the linked connector, but got "
f"{type(client)}."
)
# Get the credentials from the client
credentials = client.credential
if not isinstance(credentials, ClientSecretCredential):
raise RuntimeError(
"The Azure Artifact Store connector can only be used "
"with a service connector that is configured with "
"Azure service principal credentials."
)
return AzureSecretSchema(
client_id=credentials._client_id,
client_secret=credentials._client_credential,
tenant_id=credentials._tenant_id,
account_name=client.account_name,
)
secret = self.get_typed_authentication_secret(
expected_schema_type=AzureSecretSchema
)
return secret
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of files in the given directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path, exist_ok=True)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
size(self, path)
Get the size of a file in bytes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to the file. |
required |
Returns:
Type | Description |
---|---|
int |
The size of the file in bytes. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def size(self, path: PathType) -> int:
"""Get the size of a file in bytes.
Args:
path: The path to the file.
Returns:
The size of the file in bytes.
"""
return self.filesystem.size(path=path) # type: ignore[no-any-return]
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Stat info. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
azureml_utils
AzureML definitions.
check_settings_and_compute_configuration(parameter, settings, compute)
Utility function comparing a parameter between settings and compute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parameter |
str |
the name of the parameter. |
required |
settings |
AzureMLComputeSettings |
The AzureML orchestrator settings. |
required |
compute |
azure.ai.ml.entities.Compute |
The compute instance or cluster from AzureML. |
required |
Source code in zenml/integrations/azure/azureml_utils.py
def check_settings_and_compute_configuration(
parameter: str,
settings: AzureMLComputeSettings,
compute: Compute,
) -> None:
"""Utility function comparing a parameter between settings and compute.
Args:
parameter: the name of the parameter.
settings: The AzureML orchestrator settings.
compute: The compute instance or cluster from AzureML.
"""
# Check the compute size
compute_value = getattr(compute, parameter)
settings_value = getattr(settings, parameter)
if settings_value is not None and settings_value != compute_value:
logger.warning(
f"The '{parameter}' defined in the settings '{settings_value}' "
"does not match the actual parameter of the instance: "
f"'{compute_value}'. Will ignore this setting for now."
)
create_or_get_compute(client, settings, default_compute_name)
Creates or fetches the compute target if defined in the settings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
client |
azure.ai.ml.MLClient |
the AzureML client. |
required |
settings |
AzureMLComputeSettings |
the settings for the orchestrator. |
required |
default_compute_name |
str |
the default name for the compute target, if one is not provided in the settings. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
None, if the orchestrator is using serverless compute or str, the name of the compute target (instance or cluster). |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the fetched compute target is unsupported or the mode defined in the setting does not match the type of the compute target. |
Source code in zenml/integrations/azure/azureml_utils.py
def create_or_get_compute(
client: MLClient,
settings: AzureMLComputeSettings,
default_compute_name: str,
) -> Optional[str]:
"""Creates or fetches the compute target if defined in the settings.
Args:
client: the AzureML client.
settings: the settings for the orchestrator.
default_compute_name: the default name for the compute target, if one
is not provided in the settings.
Returns:
None, if the orchestrator is using serverless compute or
str, the name of the compute target (instance or cluster).
Raises:
RuntimeError: if the fetched compute target is unsupported or the
mode defined in the setting does not match the type of the
compute target.
"""
# If the mode is serverless, we can not fetch anything anyhow
if settings.mode == AzureMLComputeTypes.SERVERLESS:
return None
# If a name is not provided, generate one based on the orchestrator id
compute_name = settings.compute_name or default_compute_name
# Try to fetch the compute target
try:
compute = client.compute.get(compute_name)
logger.info(f"Using existing compute target: '{compute_name}'.")
# Check if compute size matches with the settings
check_settings_and_compute_configuration(
parameter="size", settings=settings, compute=compute
)
compute_type = compute.type
# Check the type and matches the settings
if compute_type == "computeinstance": # Compute Instance
if settings.mode != AzureMLComputeTypes.COMPUTE_INSTANCE:
raise RuntimeError(
"The mode of operation for the compute target defined"
f"in the settings '{settings.mode}' does not match "
f"the type of the compute target: `{compute_name}` "
"which is a 'compute-instance'. Please make sure that "
"the settings are adjusted properly."
)
if compute.state != "Running":
raise RuntimeError(
f"The compute instance `{compute_name}` is not in a "
"running state at the moment. Please make sure that "
"the compute target is running, before executing the "
"pipeline."
)
# Idle time before shutdown
check_settings_and_compute_configuration(
parameter="idle_time_before_shutdown_minutes",
settings=settings,
compute=compute,
)
elif compute_type == "amIcompute": # Compute Cluster
if settings.mode != AzureMLComputeTypes.COMPUTE_CLUSTER:
raise RuntimeError(
"The mode of operation for the compute target defined "
f"in the settings '{settings.mode}' does not match "
f"the type of the compute target: `{compute_name}` "
"which is a 'compute-cluster'. Please make sure that "
"the settings are adjusted properly."
)
if compute.provisioning_state != "Succeeded":
raise RuntimeError(
f"The provisioning state '{compute.provisioning_state}'"
f"of the compute cluster `{compute_name}` is not "
"successful. Please make sure that the compute cluster "
"is provisioned properly, before executing the "
"pipeline."
)
for parameter in [
"idle_time_before_scale_down",
"max_instances",
"min_instances",
"tier",
"location",
]:
# Check all possible configurations
check_settings_and_compute_configuration(
parameter=parameter, settings=settings, compute=compute
)
else:
raise RuntimeError(f"Unsupported compute type: {compute_type}")
return compute_name
# If the compute target does not exist create it
except ResourceNotFoundError:
logger.info(
"Can not find the compute target with name: " f"'{compute_name}':"
)
if settings.mode == AzureMLComputeTypes.COMPUTE_INSTANCE:
logger.info(
"Creating a new compute instance. This might take a "
"few minutes."
)
from azure.ai.ml.entities import ComputeInstance
compute_instance = ComputeInstance(
name=compute_name,
size=settings.size,
idle_time_before_shutdown_minutes=settings.idle_time_before_shutdown_minutes,
)
client.begin_create_or_update(compute_instance).result()
return compute_name
elif settings.mode == AzureMLComputeTypes.COMPUTE_CLUSTER:
logger.info(
"Creating a new compute cluster. This might take a "
"few minutes."
)
from azure.ai.ml.entities import AmlCompute
compute_cluster = AmlCompute(
name=compute_name,
size=settings.size,
location=settings.location,
min_instances=settings.min_instances,
max_instances=settings.max_instances,
idle_time_before_scale_down=settings.idle_time_before_scaledown_down,
tier=settings.tier,
)
client.begin_create_or_update(compute_cluster).result()
return compute_name
return None
flavors
special
Azure integration flavors.
azure_artifact_store_flavor
Azure artifact store flavor.
AzureArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)
Configuration class for Azure Artifact Store.
Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreConfig(
BaseArtifactStoreConfig, AuthenticationConfigMixin
):
"""Configuration class for Azure Artifact Store."""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
AzureArtifactStoreFlavor (BaseArtifactStoreFlavor)
Azure Artifact Store flavor.
Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreFlavor(BaseArtifactStoreFlavor):
"""Azure Artifact Store flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZURE_ARTIFACT_STORE_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(
connector_type=AZURE_CONNECTOR_TYPE,
resource_type=BLOB_RESOURCE_TYPE,
resource_id_attr="path",
)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/artifact_store/azure.png"
@property
def config_class(self) -> Type[AzureArtifactStoreConfig]:
"""Returns AzureArtifactStoreConfig config class.
Returns:
The config class.
"""
return AzureArtifactStoreConfig
@property
def implementation_class(self) -> Type["AzureArtifactStore"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.artifact_stores import AzureArtifactStore
return AzureArtifactStore
config_class: Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig]
property
readonly
Returns AzureArtifactStoreConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[AzureArtifactStore]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureArtifactStore] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
azureml
AzureML definitions.
AzureMLComputeSettings (BaseSettings)
Settings for the AzureML compute.
These settings adjust the compute resources that will be used by the pipeline execution.
There are three possible use cases for this implementation:
1. Serverless compute (default behavior):
- The `mode` is set to `serverless` (default behavior).
- All the other parameters become irrelevant and will throw a
warning if set.
2. Compute instance:
- The `mode` is set to `compute-instance`.
- In this case, users have to provide a `compute-name`.
- If a compute instance exists with this name, this instance
will be used and all the other parameters become irrelevant
and will throw a warning if set.
- If a compute instance does not already exist, ZenML will
create it. You can use the parameters `compute_size` and
`idle_type_before_shutdown_minutes` for this operation.
3. Compute cluster:
- The `mode` is set to `compute-cluster`.
- In this case, users have to provide a `compute-name`.
- If a compute cluster exists with this name, this instance
will be used and all the other parameters become irrelevant
and will throw a warning if set.
- If a compute cluster does not already exist, ZenML will
create it. You can set the additional parameters for this
operation.
Source code in zenml/integrations/azure/flavors/azureml.py
class AzureMLComputeSettings(BaseSettings):
"""Settings for the AzureML compute.
These settings adjust the compute resources that will be used by the
pipeline execution.
There are three possible use cases for this implementation:
1. Serverless compute (default behavior):
- The `mode` is set to `serverless` (default behavior).
- All the other parameters become irrelevant and will throw a
warning if set.
2. Compute instance:
- The `mode` is set to `compute-instance`.
- In this case, users have to provide a `compute-name`.
- If a compute instance exists with this name, this instance
will be used and all the other parameters become irrelevant
and will throw a warning if set.
- If a compute instance does not already exist, ZenML will
create it. You can use the parameters `compute_size` and
`idle_type_before_shutdown_minutes` for this operation.
3. Compute cluster:
- The `mode` is set to `compute-cluster`.
- In this case, users have to provide a `compute-name`.
- If a compute cluster exists with this name, this instance
will be used and all the other parameters become irrelevant
and will throw a warning if set.
- If a compute cluster does not already exist, ZenML will
create it. You can set the additional parameters for this
operation.
"""
# Mode for compute
mode: AzureMLComputeTypes = AzureMLComputeTypes.SERVERLESS
# Common Configuration for Compute Instances and Clusters
compute_name: Optional[str] = None
size: Optional[str] = None
# Additional configuration for a Compute Instance
idle_time_before_shutdown_minutes: Optional[int] = None
# Additional configuration for a Compute Cluster
idle_time_before_scaledown_down: Optional[int] = None
location: Optional[str] = None
min_instances: Optional[int] = None
max_instances: Optional[int] = None
tier: Optional[str] = None
@model_validator(mode="after")
def azureml_settings_validator(self) -> "AzureMLComputeSettings":
"""Checks whether the right configuration is set based on mode.
Returns:
the instance itself.
Raises:
AssertionError: if a name is not provided when working with
instances and clusters.
"""
viable_configuration_fields = {
AzureMLComputeTypes.SERVERLESS: {"mode"},
AzureMLComputeTypes.COMPUTE_INSTANCE: {
"mode",
"compute_name",
"size",
"idle_time_before_shutdown_minutes",
},
AzureMLComputeTypes.COMPUTE_CLUSTER: {
"mode",
"compute_name",
"size",
"idle_time_before_scaledown_down",
"location",
"min_instances",
"max_instances",
"tier",
},
}
viable_fields = viable_configuration_fields[self.mode]
for field in self.model_fields_set:
if (
field not in viable_fields
and field in AzureMLComputeSettings.model_fields
):
logger.warning(
f"In the {self.__class__.__name__} settings, the mode of "
f"operation is set to {self.mode}. In this mode, you can "
f"not configure the parameter '{field}'. This "
"configuration will be ignored."
)
if (
self.mode == AzureMLComputeTypes.COMPUTE_INSTANCE
or self.mode == AzureMLComputeTypes.COMPUTE_CLUSTER
):
assert self.compute_name is not None, (
"When you are working with compute instances and clusters, "
"please define a name for the compute target."
)
return self
azureml_settings_validator(self)
Checks whether the right configuration is set based on mode.
Returns:
Type | Description |
---|---|
AzureMLComputeSettings |
the instance itself. |
Exceptions:
Type | Description |
---|---|
AssertionError |
if a name is not provided when working with instances and clusters. |
Source code in zenml/integrations/azure/flavors/azureml.py
@model_validator(mode="after")
def azureml_settings_validator(self) -> "AzureMLComputeSettings":
"""Checks whether the right configuration is set based on mode.
Returns:
the instance itself.
Raises:
AssertionError: if a name is not provided when working with
instances and clusters.
"""
viable_configuration_fields = {
AzureMLComputeTypes.SERVERLESS: {"mode"},
AzureMLComputeTypes.COMPUTE_INSTANCE: {
"mode",
"compute_name",
"size",
"idle_time_before_shutdown_minutes",
},
AzureMLComputeTypes.COMPUTE_CLUSTER: {
"mode",
"compute_name",
"size",
"idle_time_before_scaledown_down",
"location",
"min_instances",
"max_instances",
"tier",
},
}
viable_fields = viable_configuration_fields[self.mode]
for field in self.model_fields_set:
if (
field not in viable_fields
and field in AzureMLComputeSettings.model_fields
):
logger.warning(
f"In the {self.__class__.__name__} settings, the mode of "
f"operation is set to {self.mode}. In this mode, you can "
f"not configure the parameter '{field}'. This "
"configuration will be ignored."
)
if (
self.mode == AzureMLComputeTypes.COMPUTE_INSTANCE
or self.mode == AzureMLComputeTypes.COMPUTE_CLUSTER
):
assert self.compute_name is not None, (
"When you are working with compute instances and clusters, "
"please define a name for the compute target."
)
return self
AzureMLComputeTypes (StrEnum)
Enum for different types of compute on AzureML.
Source code in zenml/integrations/azure/flavors/azureml.py
class AzureMLComputeTypes(StrEnum):
"""Enum for different types of compute on AzureML."""
SERVERLESS = "serverless"
COMPUTE_INSTANCE = "compute-instance"
COMPUTE_CLUSTER = "compute-cluster"
azureml_orchestrator_flavor
Implementation of the AzureML Orchestrator flavor.
AzureMLOrchestratorConfig (BaseOrchestratorConfig, AzureMLOrchestratorSettings)
Configuration for the AzureML orchestrator.
Source code in zenml/integrations/azure/flavors/azureml_orchestrator_flavor.py
class AzureMLOrchestratorConfig(
BaseOrchestratorConfig, AzureMLOrchestratorSettings
):
"""Configuration for the AzureML orchestrator."""
subscription_id: str = Field(
description="Subscription ID that AzureML is running on."
)
resource_group: str = Field(
description="Name of the resource group that AzureML is running on.",
)
workspace: str = Field(
description="Name of the workspace that AzureML is running on."
)
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
@property
def is_synchronous(self) -> bool:
"""Whether the orchestrator runs synchronously or not.
Returns:
Whether the orchestrator runs synchronously or not.
"""
return self.synchronous
@property
def is_schedulable(self) -> bool:
"""Whether the orchestrator is schedulable or not.
Returns:
Whether the orchestrator is schedulable or not.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
is_schedulable: bool
property
readonly
Whether the orchestrator is schedulable or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator is schedulable or not. |
is_synchronous: bool
property
readonly
Whether the orchestrator runs synchronously or not.
Returns:
Type | Description |
---|---|
bool |
Whether the orchestrator runs synchronously or not. |
AzureMLOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the AzureML orchestrator.
Source code in zenml/integrations/azure/flavors/azureml_orchestrator_flavor.py
class AzureMLOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the AzureML orchestrator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZUREML_ORCHESTRATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(resource_type=AZURE_RESOURCE_TYPE)
@property
def docs_url(self) -> Optional[str]:
"""A URL to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A URL to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A URL to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/azureml.png"
@property
def config_class(self) -> Type[AzureMLOrchestratorConfig]:
"""Returns AzureMLOrchestratorConfig config class.
Returns:
The config class.
"""
return AzureMLOrchestratorConfig
@property
def implementation_class(self) -> Type["AzureMLOrchestrator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.orchestrators import AzureMLOrchestrator
return AzureMLOrchestrator
config_class: Type[zenml.integrations.azure.flavors.azureml_orchestrator_flavor.AzureMLOrchestratorConfig]
property
readonly
Returns AzureMLOrchestratorConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azureml_orchestrator_flavor.AzureMLOrchestratorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A URL to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[AzureMLOrchestrator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureMLOrchestrator] |
The implementation class. |
logo_url: str
property
readonly
A URL to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A URL to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
AzureMLOrchestratorSettings (AzureMLComputeSettings)
Settings for the AzureML orchestrator.
Source code in zenml/integrations/azure/flavors/azureml_orchestrator_flavor.py
class AzureMLOrchestratorSettings(AzureMLComputeSettings):
"""Settings for the AzureML orchestrator."""
synchronous: bool = Field(
default=True,
description="Whether the orchestrator runs synchronously or not.",
)
azureml_step_operator_flavor
AzureML step operator flavor.
AzureMLStepOperatorConfig (BaseStepOperatorConfig, AzureMLStepOperatorSettings)
Config for the AzureML step operator.
Attributes:
Name | Type | Description |
---|---|---|
subscription_id |
str |
The Azure account's subscription ID |
resource_group |
str |
The resource group to which the AzureML workspace is deployed. |
workspace_name |
str |
The name of the AzureML Workspace. |
tenant_id |
Optional[str] |
The Azure Tenant ID. |
service_principal_id |
Optional[str] |
The ID for the service principal that is created to allow apps to access secure resources. |
service_principal_password |
Optional[str] |
Password for the service principal. |
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorConfig(
BaseStepOperatorConfig, AzureMLStepOperatorSettings
):
"""Config for the AzureML step operator.
Attributes:
subscription_id: The Azure account's subscription ID
resource_group: The resource group to which the AzureML workspace
is deployed.
workspace_name: The name of the AzureML Workspace.
tenant_id: The Azure Tenant ID.
service_principal_id: The ID for the service principal that is created
to allow apps to access secure resources.
service_principal_password: Password for the service principal.
"""
subscription_id: str = Field(
description="Subscription ID that AzureML is running on."
)
resource_group: str = Field(
description="Name of the resource group that AzureML is running on.",
)
workspace_name: str = Field(
description="Name of the workspace that AzureML is running on."
)
# Service principal authentication
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
tenant_id: Optional[str] = SecretField(default=None)
service_principal_id: Optional[str] = SecretField(default=None)
service_principal_password: Optional[str] = SecretField(default=None)
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
AzureMLStepOperatorFlavor (BaseStepOperatorFlavor)
Flavor for the AzureML step operator.
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorFlavor(BaseStepOperatorFlavor):
"""Flavor for the AzureML step operator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZUREML_STEP_OPERATOR_FLAVOR
@property
def service_connector_requirements(
self,
) -> Optional[ServiceConnectorRequirements]:
"""Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available
service connector types that are compatible with this flavor.
Returns:
Requirements for compatible service connectors, if a service
connector is required for this flavor.
"""
return ServiceConnectorRequirements(resource_type=AZURE_RESOURCE_TYPE)
@property
def docs_url(self) -> Optional[str]:
"""A url to point at docs explaining this flavor.
Returns:
A flavor docs url.
"""
return self.generate_default_docs_url()
@property
def sdk_docs_url(self) -> Optional[str]:
"""A url to point at SDK docs explaining this flavor.
Returns:
A flavor SDK docs url.
"""
return self.generate_default_sdk_docs_url()
@property
def logo_url(self) -> str:
"""A url to represent the flavor in the dashboard.
Returns:
The flavor logo.
"""
return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/step_operator/azureml.png"
@property
def config_class(self) -> Type[AzureMLStepOperatorConfig]:
"""Returns AzureMLStepOperatorConfig config class.
Returns:
The config class.
"""
return AzureMLStepOperatorConfig
@property
def implementation_class(self) -> Type["AzureMLStepOperator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.step_operators import AzureMLStepOperator
return AzureMLStepOperator
config_class: Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig]
property
readonly
Returns AzureMLStepOperatorConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig] |
The config class. |
docs_url: Optional[str]
property
readonly
A url to point at docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor docs url. |
implementation_class: Type[AzureMLStepOperator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureMLStepOperator] |
The implementation class. |
logo_url: str
property
readonly
A url to represent the flavor in the dashboard.
Returns:
Type | Description |
---|---|
str |
The flavor logo. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
sdk_docs_url: Optional[str]
property
readonly
A url to point at SDK docs explaining this flavor.
Returns:
Type | Description |
---|---|
Optional[str] |
A flavor SDK docs url. |
service_connector_requirements: Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements]
property
readonly
Service connector resource requirements for service connectors.
Specifies resource requirements that are used to filter the available service connector types that are compatible with this flavor.
Returns:
Type | Description |
---|---|
Optional[zenml.models.v2.misc.service_connector_type.ServiceConnectorRequirements] |
Requirements for compatible service connectors, if a service connector is required for this flavor. |
AzureMLStepOperatorSettings (AzureMLComputeSettings)
Settings for the AzureML step operator.
Attributes:
Name | Type | Description |
---|---|---|
compute_target_name |
Optional[str] |
The name of the configured ComputeTarget.
Deprecated in favor of |
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorSettings(AzureMLComputeSettings):
"""Settings for the AzureML step operator.
Attributes:
compute_target_name: The name of the configured ComputeTarget.
Deprecated in favor of `compute_name`.
"""
compute_target_name: Optional[str] = Field(
default=None,
description="Name of the configured ComputeTarget. Deprecated in favor "
"of `compute_name`.",
)
@model_validator(mode="before")
@classmethod
@before_validator_handler
def _migrate_compute_name(cls, data: Dict[str, Any]) -> Dict[str, Any]:
"""Backward compatibility for compute_target_name.
Args:
data: The model data.
Returns:
The migrated data.
"""
if (
"compute_target_name" in data
and "compute_name" not in data
and "mode" not in data
):
data["compute_name"] = data.pop("compute_target_name")
data["mode"] = AzureMLComputeTypes.COMPUTE_INSTANCE
return data
orchestrators
special
AzureML orchestrator.
azureml_orchestrator
Implementation of the AzureML Orchestrator.
AzureMLOrchestrator (ContainerizedOrchestrator)
Orchestrator responsible for running pipelines on AzureML.
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
class AzureMLOrchestrator(ContainerizedOrchestrator):
"""Orchestrator responsible for running pipelines on AzureML."""
@property
def config(self) -> AzureMLOrchestratorConfig:
"""Returns the `AzureMLOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(AzureMLOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the AzureML orchestrator.
Returns:
The settings class.
"""
return AzureMLOrchestratorSettings
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry,
image builder and only remote components.
Returns:
A `StackValidator` instance.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
return False, (
f"The AzureML orchestrator runs pipelines remotely, "
f"but the '{component.name}' {component.type.value} is "
"a local stack component and will not be available in "
"the AzureML step.\nPlease ensure that you always "
"use non-local stack components with the AzureML "
"orchestrator."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def get_orchestrator_run_id(self) -> str:
"""Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for
all steps of a pipeline run.
Returns:
The orchestrator run id.
Raises:
RuntimeError: If the run id cannot be read from the environment.
"""
try:
return os.environ[ENV_ZENML_AZUREML_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_AZUREML_RUN_ID}."
)
@staticmethod
def _create_command_component(
step: Step,
step_name: str,
env_name: str,
image: str,
command: List[str],
arguments: List[str],
) -> CommandComponent:
"""Creates a CommandComponent to run on AzureML Pipelines.
Args:
step: The step definition in ZenML.
step_name: The name of the step.
env_name: The name of the environment.
image: The image to use in the environment
command: The command to execute the entrypoint with.
arguments: The arguments to pass into the command.
Returns:
the generated AzureML CommandComponent.
"""
env = Environment(name=env_name, image=image)
outputs = {"completed": Output(type="uri_file")}
inputs = {}
if step.spec.upstream_steps:
inputs = {
f"{upstream_step}": Input(type="uri_file")
for upstream_step in step.spec.upstream_steps
}
return CommandComponent(
name=step_name,
display_name=step_name,
description=f"AzureML CommandComponent for {step_name}.",
inputs=inputs,
outputs=outputs,
environment=env,
command=" ".join(command + arguments),
)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Iterator[Dict[str, MetadataType]]:
"""Prepares or runs a pipeline on AzureML.
Args:
deployment: The deployment to prepare or run.
stack: The stack to run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
RuntimeError: If the creation of the schedule fails.
Yields:
A dictionary of metadata related to the pipeline run.
"""
# Authentication
if connector := self.get_connector():
credentials = connector.connect()
else:
credentials = DefaultAzureCredential()
# Settings
settings = cast(
AzureMLOrchestratorSettings,
self.get_settings(deployment),
)
# Client creation
ml_client = MLClient(
credential=credentials,
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace,
)
# Create components
components = {}
for step_name, step in deployment.step_configurations.items():
# Get the image for each step
image = self.get_image(deployment=deployment, step_name=step_name)
# Get the command and arguments
command = AzureMLEntrypointConfiguration.get_entrypoint_command()
arguments = (
AzureMLEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
deployment_id=deployment.id,
zenml_env_variables=b64_encode(json.dumps(environment)),
)
)
# Generate an AzureML CommandComponent
components[step_name] = self._create_command_component(
step=step,
step_name=step_name,
env_name=deployment.pipeline_configuration.name,
image=image,
command=command,
arguments=arguments,
)
# Pipeline definition
pipeline_args = dict()
run_name = get_orchestrator_run_name(
pipeline_name=deployment.pipeline_configuration.name
)
pipeline_args["name"] = run_name
if compute_target := create_or_get_compute(
ml_client, settings, default_compute_name=f"zenml_{self.id}"
):
pipeline_args["compute"] = compute_target
@pipeline(force_rerun=True, **pipeline_args) # type: ignore[call-overload, misc]
def azureml_pipeline() -> None:
"""Create an AzureML pipeline."""
# Here we have to track the inputs and outputs so that we can bind
# the components to each other to execute them in a specific order.
component_outputs: Dict[str, Any] = {}
for component_name, component in components.items():
# Inputs
component_inputs = {}
if component.inputs:
component_inputs.update(
{i: component_outputs[i] for i in component.inputs}
)
# Job
component_job = component(**component_inputs)
# Outputs
if component_job.outputs:
component_outputs[component_name] = (
component_job.outputs.completed
)
# Create and execute the pipeline job
pipeline_job = azureml_pipeline()
if settings.mode == AzureMLComputeTypes.SERVERLESS:
pipeline_job.settings.default_compute = "serverless"
# Scheduling
if schedule := deployment.schedule:
try:
schedule_trigger: Optional[
Union[CronTrigger, RecurrenceTrigger]
] = None
start_time = None
if schedule.start_time is not None:
start_time = schedule.start_time.isoformat()
end_time = None
if schedule.end_time is not None:
end_time = schedule.end_time.isoformat()
if schedule.cron_expression:
# If we are working with a cron expression
schedule_trigger = CronTrigger(
expression=schedule.cron_expression,
start_time=start_time,
end_time=end_time,
time_zone=TimeZone.UTC,
)
elif schedule.interval_second:
# If we are working with intervals
interval = schedule.interval_second.total_seconds()
if interval % 60 != 0:
logger.warning(
"The ZenML AzureML orchestrator only works with "
"time intervals defined over minutes. Will "
f"use a schedule over {int(interval // 60)}."
)
if interval < 60:
raise RuntimeError(
"Can not create a schedule with an interval less "
"than 60 secs."
)
frequency = "minute"
interval = int(interval // 60)
schedule_trigger = RecurrenceTrigger(
frequency=frequency,
interval=interval,
start_time=start_time,
end_time=end_time,
time_zone=TimeZone.UTC,
)
if schedule_trigger:
# Create and execute the job schedule
job_schedule = JobSchedule(
name=run_name,
trigger=schedule_trigger,
create_job=pipeline_job,
)
ml_client.schedules.begin_create_or_update(
job_schedule
).result()
logger.info(
f"Scheduled pipeline '{run_name}' with recurrence "
"or cron expression."
)
else:
raise RuntimeError(
"No valid scheduling configuration found for "
f"pipeline '{run_name}'."
)
except (HttpResponseError, ResourceExistsError) as e:
raise RuntimeError(
"Failed to create schedule for the pipeline "
f"'{run_name}': {str(e)}"
)
else:
job = ml_client.jobs.create_or_update(pipeline_job)
logger.info(f"Pipeline {run_name} has been started.")
# Yield metadata based on the generated job object
yield from self.compute_metadata(job)
assert job.services is not None
assert job.name is not None
logger.info(
f"Pipeline {run_name} is running. "
"You can view the pipeline in the AzureML portal at "
f"{job.services['Studio'].endpoint}"
)
if settings.synchronous:
logger.info("Waiting for pipeline to finish...")
ml_client.jobs.stream(job.name)
def get_pipeline_run_metadata(
self, run_id: UUID
) -> Dict[str, "MetadataType"]:
"""Get general component-specific metadata for a pipeline run.
Args:
run_id: The ID of the pipeline run.
Returns:
A dictionary of metadata.
"""
try:
if connector := self.get_connector():
credentials = connector.connect()
else:
credentials = DefaultAzureCredential()
ml_client = MLClient(
credential=credentials,
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace,
)
azureml_root_run_id = os.environ[ENV_ZENML_AZUREML_RUN_ID]
azureml_job = ml_client.jobs.get(azureml_root_run_id)
return {
METADATA_ORCHESTRATOR_URL: Uri(azureml_job.studio_url),
}
except Exception as e:
logger.warning(
f"Failed to fetch the Studio URL of the AzureML pipeline "
f"job: {e}"
)
return {}
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
"""Refreshes the status of a specific pipeline run.
Args:
run: The run that was executed by this orchestrator.
Returns:
the actual status of the pipeline execution.
Raises:
AssertionError: If the run was not executed by to this orchestrator.
ValueError: If it fetches an unknown state or if we can not fetch
the orchestrator run ID.
"""
# Make sure that the stack exists and is accessible
if run.stack is None:
raise ValueError(
"The stack that the run was executed on is not available "
"anymore."
)
# Make sure that the run belongs to this orchestrator
assert (
self.id
== run.stack.components[StackComponentType.ORCHESTRATOR][0].id
)
# Initialize the AzureML client
if connector := self.get_connector():
credentials = connector.connect()
else:
credentials = DefaultAzureCredential()
ml_client = MLClient(
credential=credentials,
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace,
)
# Fetch the status of the PipelineJob
if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
elif run.orchestrator_run_id is not None:
run_id = run.orchestrator_run_id
else:
raise ValueError(
"Can not find the orchestrator run ID, thus can not fetch "
"the status."
)
status = ml_client.jobs.get(run_id).status
# Map the potential outputs to ZenML ExecutionStatus. Potential values:
# https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.pipelinejob?view=azure-python#azure-ai-ml-entities-pipelinejob-status
if status in [
"NotStarted",
"Starting",
"Provisioning",
"Preparing",
"Queued",
]:
return ExecutionStatus.INITIALIZING
elif status in ["Running", "Finalizing"]:
return ExecutionStatus.RUNNING
elif status in [
"CancelRequested",
"Failed",
"Canceled",
"NotResponding",
]:
return ExecutionStatus.FAILED
elif status in ["Completed"]:
return ExecutionStatus.COMPLETED
else:
raise ValueError("Unknown status for the pipeline job.")
def compute_metadata(self, job: Any) -> Iterator[Dict[str, MetadataType]]:
"""Generate run metadata based on the generated AzureML PipelineJob.
Args:
job: The corresponding PipelineJob object.
Yields:
A dictionary of metadata related to the pipeline run.
"""
# Metadata
metadata: Dict[str, MetadataType] = {}
# Orchestrator Run ID
if run_id := self._compute_orchestrator_run_id(job):
metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id
# URL to the AzureML's pipeline view
if orchestrator_url := self._compute_orchestrator_url(job):
metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)
yield metadata
@staticmethod
def _compute_orchestrator_url(job: Any) -> Optional[str]:
"""Generate the Orchestrator Dashboard URL upon pipeline execution.
Args:
job: The corresponding PipelineJob object.
Returns:
the URL to the dashboard view in AzureML.
"""
try:
if job.studio_url:
return str(job.studio_url)
return None
except Exception as e:
logger.warning(
f"There was an issue while extracting the pipeline url: {e}"
)
return None
@staticmethod
def _compute_orchestrator_run_id(job: Any) -> Optional[str]:
"""Generate the Orchestrator Dashboard URL upon pipeline execution.
Args:
job: The corresponding PipelineJob object.
Returns:
the URL to the dashboard view in AzureML.
"""
try:
if job.name:
return str(job.name)
return None
except Exception as e:
logger.warning(
f"There was an issue while extracting the pipeline run ID: {e}"
)
return None
config: AzureMLOrchestratorConfig
property
readonly
Returns the AzureMLOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
AzureMLOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the AzureML orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry, image builder and only remote components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
compute_metadata(self, job)
Generate run metadata based on the generated AzureML PipelineJob.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job |
Any |
The corresponding PipelineJob object. |
required |
Yields:
Type | Description |
---|---|
Iterator[Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]] |
A dictionary of metadata related to the pipeline run. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def compute_metadata(self, job: Any) -> Iterator[Dict[str, MetadataType]]:
"""Generate run metadata based on the generated AzureML PipelineJob.
Args:
job: The corresponding PipelineJob object.
Yields:
A dictionary of metadata related to the pipeline run.
"""
# Metadata
metadata: Dict[str, MetadataType] = {}
# Orchestrator Run ID
if run_id := self._compute_orchestrator_run_id(job):
metadata[METADATA_ORCHESTRATOR_RUN_ID] = run_id
# URL to the AzureML's pipeline view
if orchestrator_url := self._compute_orchestrator_url(job):
metadata[METADATA_ORCHESTRATOR_URL] = Uri(orchestrator_url)
yield metadata
fetch_status(self, run)
Refreshes the status of a specific pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run |
PipelineRunResponse |
The run that was executed by this orchestrator. |
required |
Returns:
Type | Description |
---|---|
ExecutionStatus |
the actual status of the pipeline execution. |
Exceptions:
Type | Description |
---|---|
AssertionError |
If the run was not executed by to this orchestrator. |
ValueError |
If it fetches an unknown state or if we can not fetch the orchestrator run ID. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def fetch_status(self, run: "PipelineRunResponse") -> ExecutionStatus:
"""Refreshes the status of a specific pipeline run.
Args:
run: The run that was executed by this orchestrator.
Returns:
the actual status of the pipeline execution.
Raises:
AssertionError: If the run was not executed by to this orchestrator.
ValueError: If it fetches an unknown state or if we can not fetch
the orchestrator run ID.
"""
# Make sure that the stack exists and is accessible
if run.stack is None:
raise ValueError(
"The stack that the run was executed on is not available "
"anymore."
)
# Make sure that the run belongs to this orchestrator
assert (
self.id
== run.stack.components[StackComponentType.ORCHESTRATOR][0].id
)
# Initialize the AzureML client
if connector := self.get_connector():
credentials = connector.connect()
else:
credentials = DefaultAzureCredential()
ml_client = MLClient(
credential=credentials,
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace,
)
# Fetch the status of the PipelineJob
if METADATA_ORCHESTRATOR_RUN_ID in run.run_metadata:
run_id = run.run_metadata[METADATA_ORCHESTRATOR_RUN_ID]
elif run.orchestrator_run_id is not None:
run_id = run.orchestrator_run_id
else:
raise ValueError(
"Can not find the orchestrator run ID, thus can not fetch "
"the status."
)
status = ml_client.jobs.get(run_id).status
# Map the potential outputs to ZenML ExecutionStatus. Potential values:
# https://learn.microsoft.com/en-us/python/api/azure-ai-ml/azure.ai.ml.entities.pipelinejob?view=azure-python#azure-ai-ml-entities-pipelinejob-status
if status in [
"NotStarted",
"Starting",
"Provisioning",
"Preparing",
"Queued",
]:
return ExecutionStatus.INITIALIZING
elif status in ["Running", "Finalizing"]:
return ExecutionStatus.RUNNING
elif status in [
"CancelRequested",
"Failed",
"Canceled",
"NotResponding",
]:
return ExecutionStatus.FAILED
elif status in ["Completed"]:
return ExecutionStatus.COMPLETED
else:
raise ValueError("Unknown status for the pipeline job.")
get_orchestrator_run_id(self)
Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the run id cannot be read from the environment. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the run id of the active orchestrator run.
Important: This needs to be a unique ID and return the same value for
all steps of a pipeline run.
Returns:
The orchestrator run id.
Raises:
RuntimeError: If the run id cannot be read from the environment.
"""
try:
return os.environ[ENV_ZENML_AZUREML_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_AZUREML_RUN_ID}."
)
get_pipeline_run_metadata(self, run_id)
Get general component-specific metadata for a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
The ID of the pipeline run. |
required |
Returns:
Type | Description |
---|---|
Dict[str, MetadataType] |
A dictionary of metadata. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def get_pipeline_run_metadata(
self, run_id: UUID
) -> Dict[str, "MetadataType"]:
"""Get general component-specific metadata for a pipeline run.
Args:
run_id: The ID of the pipeline run.
Returns:
A dictionary of metadata.
"""
try:
if connector := self.get_connector():
credentials = connector.connect()
else:
credentials = DefaultAzureCredential()
ml_client = MLClient(
credential=credentials,
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace,
)
azureml_root_run_id = os.environ[ENV_ZENML_AZUREML_RUN_ID]
azureml_job = ml_client.jobs.get(azureml_root_run_id)
return {
METADATA_ORCHESTRATOR_URL: Uri(azureml_job.studio_url),
}
except Exception as e:
logger.warning(
f"Failed to fetch the Studio URL of the AzureML pipeline "
f"job: {e}"
)
return {}
prepare_or_run_pipeline(self, deployment, stack, environment)
Prepares or runs a pipeline on AzureML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentResponse |
The deployment to prepare or run. |
required |
stack |
Stack |
The stack to run on. |
required |
environment |
Dict[str, str] |
Environment variables to set in the orchestration environment. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the creation of the schedule fails. |
Yields:
Type | Description |
---|---|
Iterator[Dict[str, Union[str, int, float, bool, Dict[Any, Any], List[Any], Set[Any], Tuple[Any, ...], zenml.metadata.metadata_types.Uri, zenml.metadata.metadata_types.Path, zenml.metadata.metadata_types.DType, zenml.metadata.metadata_types.StorageSize]]] |
A dictionary of metadata related to the pipeline run. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeploymentResponse",
stack: "Stack",
environment: Dict[str, str],
) -> Iterator[Dict[str, MetadataType]]:
"""Prepares or runs a pipeline on AzureML.
Args:
deployment: The deployment to prepare or run.
stack: The stack to run on.
environment: Environment variables to set in the orchestration
environment.
Raises:
RuntimeError: If the creation of the schedule fails.
Yields:
A dictionary of metadata related to the pipeline run.
"""
# Authentication
if connector := self.get_connector():
credentials = connector.connect()
else:
credentials = DefaultAzureCredential()
# Settings
settings = cast(
AzureMLOrchestratorSettings,
self.get_settings(deployment),
)
# Client creation
ml_client = MLClient(
credential=credentials,
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace,
)
# Create components
components = {}
for step_name, step in deployment.step_configurations.items():
# Get the image for each step
image = self.get_image(deployment=deployment, step_name=step_name)
# Get the command and arguments
command = AzureMLEntrypointConfiguration.get_entrypoint_command()
arguments = (
AzureMLEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
deployment_id=deployment.id,
zenml_env_variables=b64_encode(json.dumps(environment)),
)
)
# Generate an AzureML CommandComponent
components[step_name] = self._create_command_component(
step=step,
step_name=step_name,
env_name=deployment.pipeline_configuration.name,
image=image,
command=command,
arguments=arguments,
)
# Pipeline definition
pipeline_args = dict()
run_name = get_orchestrator_run_name(
pipeline_name=deployment.pipeline_configuration.name
)
pipeline_args["name"] = run_name
if compute_target := create_or_get_compute(
ml_client, settings, default_compute_name=f"zenml_{self.id}"
):
pipeline_args["compute"] = compute_target
@pipeline(force_rerun=True, **pipeline_args) # type: ignore[call-overload, misc]
def azureml_pipeline() -> None:
"""Create an AzureML pipeline."""
# Here we have to track the inputs and outputs so that we can bind
# the components to each other to execute them in a specific order.
component_outputs: Dict[str, Any] = {}
for component_name, component in components.items():
# Inputs
component_inputs = {}
if component.inputs:
component_inputs.update(
{i: component_outputs[i] for i in component.inputs}
)
# Job
component_job = component(**component_inputs)
# Outputs
if component_job.outputs:
component_outputs[component_name] = (
component_job.outputs.completed
)
# Create and execute the pipeline job
pipeline_job = azureml_pipeline()
if settings.mode == AzureMLComputeTypes.SERVERLESS:
pipeline_job.settings.default_compute = "serverless"
# Scheduling
if schedule := deployment.schedule:
try:
schedule_trigger: Optional[
Union[CronTrigger, RecurrenceTrigger]
] = None
start_time = None
if schedule.start_time is not None:
start_time = schedule.start_time.isoformat()
end_time = None
if schedule.end_time is not None:
end_time = schedule.end_time.isoformat()
if schedule.cron_expression:
# If we are working with a cron expression
schedule_trigger = CronTrigger(
expression=schedule.cron_expression,
start_time=start_time,
end_time=end_time,
time_zone=TimeZone.UTC,
)
elif schedule.interval_second:
# If we are working with intervals
interval = schedule.interval_second.total_seconds()
if interval % 60 != 0:
logger.warning(
"The ZenML AzureML orchestrator only works with "
"time intervals defined over minutes. Will "
f"use a schedule over {int(interval // 60)}."
)
if interval < 60:
raise RuntimeError(
"Can not create a schedule with an interval less "
"than 60 secs."
)
frequency = "minute"
interval = int(interval // 60)
schedule_trigger = RecurrenceTrigger(
frequency=frequency,
interval=interval,
start_time=start_time,
end_time=end_time,
time_zone=TimeZone.UTC,
)
if schedule_trigger:
# Create and execute the job schedule
job_schedule = JobSchedule(
name=run_name,
trigger=schedule_trigger,
create_job=pipeline_job,
)
ml_client.schedules.begin_create_or_update(
job_schedule
).result()
logger.info(
f"Scheduled pipeline '{run_name}' with recurrence "
"or cron expression."
)
else:
raise RuntimeError(
"No valid scheduling configuration found for "
f"pipeline '{run_name}'."
)
except (HttpResponseError, ResourceExistsError) as e:
raise RuntimeError(
"Failed to create schedule for the pipeline "
f"'{run_name}': {str(e)}"
)
else:
job = ml_client.jobs.create_or_update(pipeline_job)
logger.info(f"Pipeline {run_name} has been started.")
# Yield metadata based on the generated job object
yield from self.compute_metadata(job)
assert job.services is not None
assert job.name is not None
logger.info(
f"Pipeline {run_name} is running. "
"You can view the pipeline in the AzureML portal at "
f"{job.services['Studio'].endpoint}"
)
if settings.synchronous:
logger.info("Waiting for pipeline to finish...")
ml_client.jobs.stream(job.name)
azureml_orchestrator_entrypoint_config
Entrypoint configuration for ZenML AzureML pipeline steps.
AzureMLEntrypointConfiguration (StepEntrypointConfiguration)
Entrypoint configuration for ZenML AzureML pipeline steps.
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
class AzureMLEntrypointConfiguration(StepEntrypointConfiguration):
"""Entrypoint configuration for ZenML AzureML pipeline steps."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the
environmental variables.
"""
return super().get_entrypoint_options() | {ZENML_ENV_VARIABLES}
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, can include the environmental variables.
Returns:
The superclass arguments as well as arguments for environmental
variables.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{ZENML_ENV_VARIABLES}",
kwargs[ZENML_ENV_VARIABLES],
]
def _set_env_variables(self) -> None:
"""Sets the environmental variables before executing the step."""
env_variables = json.loads(
b64_decode(self.entrypoint_args[ZENML_ENV_VARIABLES])
)
os.environ.update(env_variables)
def run(self) -> None:
"""Runs the step."""
# Set the environmental variables first
self._set_env_variables()
# Azure automatically changes the working directory, we have to set it
# back to /app before running the step.
os.makedirs("/app", exist_ok=True)
os.chdir("/app")
# Run the step
super().run()
# Unfortunately, in AzureML's Python SDK v2, there is no native way
# to execute steps/components in a specific sequence. In order to
# establish the correct order, we are using dummy inputs and
# outputs. However, these steps only execute if the inputs and outputs
# actually exist. This is why we create a dummy file and write to it and
# use it as the output of the steps.
if completed := os.environ.get(AZURE_ML_OUTPUT_COMPLETED):
os.makedirs(os.path.dirname(completed), exist_ok=True)
with open(completed, "w") as f:
f.write("Component completed!")
get_entrypoint_arguments(**kwargs)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwargs |
Any |
Kwargs, can include the environmental variables. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
The superclass arguments as well as arguments for environmental variables. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
@classmethod
def get_entrypoint_arguments(cls, **kwargs: Any) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
**kwargs: Kwargs, can include the environmental variables.
Returns:
The superclass arguments as well as arguments for environmental
variables.
"""
return super().get_entrypoint_arguments(**kwargs) + [
f"--{ZENML_ENV_VARIABLES}",
kwargs[ZENML_ENV_VARIABLES],
]
get_entrypoint_options()
classmethod
Gets all options required for running with this configuration.
Returns:
Type | Description |
---|---|
Set[str] |
The superclass options as well as an option for the environmental variables. |
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all options required for running with this configuration.
Returns:
The superclass options as well as an option for the
environmental variables.
"""
return super().get_entrypoint_options() | {ZENML_ENV_VARIABLES}
run(self)
Runs the step.
Source code in zenml/integrations/azure/orchestrators/azureml_orchestrator_entrypoint_config.py
def run(self) -> None:
"""Runs the step."""
# Set the environmental variables first
self._set_env_variables()
# Azure automatically changes the working directory, we have to set it
# back to /app before running the step.
os.makedirs("/app", exist_ok=True)
os.chdir("/app")
# Run the step
super().run()
# Unfortunately, in AzureML's Python SDK v2, there is no native way
# to execute steps/components in a specific sequence. In order to
# establish the correct order, we are using dummy inputs and
# outputs. However, these steps only execute if the inputs and outputs
# actually exist. This is why we create a dummy file and write to it and
# use it as the output of the steps.
if completed := os.environ.get(AZURE_ML_OUTPUT_COMPLETED):
os.makedirs(os.path.dirname(completed), exist_ok=True)
with open(completed, "w") as f:
f.write("Component completed!")
service_connectors
special
Azure Service Connector.
azure_service_connector
Azure Service Connector.
AzureAccessToken (AuthenticationConfig)
Azure access token credentials.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessToken(AuthenticationConfig):
"""Azure access token credentials."""
token: PlainSerializedSecretStr = Field(
title="Azure Access Token",
description="The Azure access token to use for authentication",
)
AzureAccessTokenConfig (AzureBaseConfig, AzureAccessToken)
Azure token configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAccessTokenConfig(AzureBaseConfig, AzureAccessToken):
"""Azure token configuration."""
AzureAuthenticationMethods (StrEnum)
Azure Authentication methods.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureAuthenticationMethods(StrEnum):
"""Azure Authentication methods."""
IMPLICIT = "implicit"
SERVICE_PRINCIPAL = "service-principal"
ACCESS_TOKEN = "access-token"
AzureBaseConfig (AuthenticationConfig)
Azure base configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureBaseConfig(AuthenticationConfig):
"""Azure base configuration."""
subscription_id: Optional[UUID] = Field(
default=None,
title="Azure Subscription ID",
description="The subscription ID of the Azure account. If not "
"specified, ZenML will attempt to retrieve the subscription ID from "
"Azure using the configured credentials.",
)
tenant_id: Optional[UUID] = Field(
default=None,
title="Azure Tenant ID",
description="The tenant ID of the Azure account. If not specified, "
"ZenML will attempt to retrieve the tenant from Azure using the "
"configured credentials.",
)
resource_group: Optional[str] = Field(
default=None,
title="Azure Resource Group",
description="A resource group may be used to restrict the scope of "
"Azure resources like AKS clusters and ACR repositories. If not "
"specified, ZenML will retrieve resources from all resource groups "
"accessible with the configured credentials.",
)
storage_account: Optional[str] = Field(
default=None,
title="Azure Storage Account",
description="The name of an Azure storage account may be used to "
"restrict the scope of Azure Blob storage containers. If not "
"specified, ZenML will retrieve blob containers from all storage "
"accounts accessible with the configured credentials.",
)
AzureClientConfig (AzureBaseConfig)
Azure client configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientConfig(AzureBaseConfig):
"""Azure client configuration."""
client_id: UUID = Field(
title="Azure Client ID",
description="The service principal's client ID",
)
tenant_id: UUID = Field(
title="Azure Tenant ID",
description="ID of the service principal's tenant. Also called its "
"'directory' ID.",
)
AzureClientSecret (AuthenticationConfig)
Azure client secret credentials.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureClientSecret(AuthenticationConfig):
"""Azure client secret credentials."""
client_secret: PlainSerializedSecretStr = Field(
title="Service principal client secret",
description="The client secret of the service principal",
)
AzureServiceConnector (ServiceConnector)
Azure service connector.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServiceConnector(ServiceConnector):
"""Azure service connector."""
config: AzureBaseConfig
_subscription_id: Optional[str] = None
_subscription_name: Optional[str] = None
_tenant_id: Optional[str] = None
_session_cache: Dict[
str,
Tuple[TokenCredential, Optional[datetime.datetime]],
] = {}
@classmethod
def _get_connector_type(cls) -> ServiceConnectorTypeModel:
"""Get the service connector type specification.
Returns:
The service connector type specification.
"""
return AZURE_SERVICE_CONNECTOR_TYPE_SPEC
@property
def subscription(self) -> Tuple[str, str]:
"""Get the Azure subscription ID and name.
Returns:
The Azure subscription ID and name.
Raises:
AuthorizationException: If the Azure subscription could not be
determined or doesn't match the configured subscription ID.
"""
if self._subscription_id is None or self._subscription_name is None:
logger.debug("Getting subscription from Azure...")
try:
credential, _ = self.get_azure_credential(self.auth_method)
subscription_client = SubscriptionClient(credential)
if self.config.subscription_id is not None:
subscription = subscription_client.subscriptions.get(
str(self.config.subscription_id)
)
self._subscription_name = subscription.display_name
self._subscription_id = subscription.subscription_id
else:
subscriptions = list(
subscription_client.subscriptions.list()
)
if len(subscriptions) == 0:
raise AuthorizationException(
"no Azure subscriptions found for the configured "
"credentials."
)
if len(subscriptions) > 1:
raise AuthorizationException(
"multiple Azure subscriptions found for the "
"configured credentials. Please configure the "
"subscription ID explicitly in the connector."
)
subscription = subscriptions[0]
self._subscription_id = subscription.subscription_id
self._subscription_name = subscription.display_name
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure subscription: {e}"
) from e
assert self._subscription_id is not None
assert self._subscription_name is not None
return self._subscription_id, self._subscription_name
@property
def tenant_id(self) -> str:
"""Get the Azure tenant ID.
Returns:
The Azure tenant ID.
Raises:
AuthorizationException: If the Azure tenant ID could not be
determined or doesn't match the configured tenant ID.
"""
if self._tenant_id is None:
logger.debug("Getting tenant ID from Azure...")
try:
credential, _ = self.get_azure_credential(self.auth_method)
subscription_client = SubscriptionClient(credential)
tenants = subscription_client.tenants.list()
if self.config.tenant_id is not None:
for tenant in tenants:
if str(tenant.tenant_id) == str(self.config.tenant_id):
self._tenant_id = str(tenant.tenant_id)
break
else:
raise AuthorizationException(
"the configured tenant ID is not associated with "
"the configured credentials."
)
else:
tenants = list(tenants)
if len(tenants) == 0:
raise AuthorizationException(
"no Azure tenants found for the configured "
"credentials."
)
if len(tenants) > 1:
raise AuthorizationException(
"multiple Azure tenants found for the "
"configured credentials. Please configure the "
"tenant ID explicitly in the connector."
)
tenant = tenants[0]
self._tenant_id = tenant.tenant_id
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure tenant: {e}"
) from e
assert self._tenant_id is not None
return self._tenant_id
def get_azure_credential(
self,
auth_method: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
"""Get an Azure credential for the specified resource.
Args:
auth_method: The authentication method to use.
resource_type: The resource type to get a credential for.
resource_id: The resource ID to get a credential for.
Returns:
An Azure credential for the specified resource and its expiration
timestamp, if applicable.
"""
# We maintain a cache of all sessions to avoid re-authenticating
# multiple times for the same resource
key = auth_method
if key in self._session_cache:
session, expires_at = self._session_cache[key]
if expires_at is None:
return session, None
# Refresh expired sessions
now = datetime.datetime.now(datetime.timezone.utc)
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
# check if the token expires in the near future
if expires_at > now + datetime.timedelta(
minutes=AZURE_SESSION_EXPIRATION_BUFFER
):
return session, expires_at
logger.debug(
f"Creating Azure credential for auth method '{auth_method}', "
f"resource type '{resource_type}' and resource ID "
f"'{resource_id}'..."
)
session, expires_at = self._authenticate(
auth_method, resource_type, resource_id
)
self._session_cache[key] = (session, expires_at)
return session, expires_at
def _authenticate(
self,
auth_method: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
"""Authenticate to Azure and return a token credential.
Args:
auth_method: The authentication method to use.
resource_type: The resource type to authenticate for.
resource_id: The resource ID to authenticate for.
Returns:
An Azure token credential and the expiration time of the
temporary credentials if applicable.
Raises:
NotImplementedError: If the authentication method is not supported.
AuthorizationException: If the authentication failed.
"""
cfg = self.config
credential: TokenCredential
if auth_method == AzureAuthenticationMethods.IMPLICIT:
self._check_implicit_auth_method_allowed()
try:
credential = DefaultAzureCredential()
except AzureError as e:
raise AuthorizationException(
f"Failed to authenticate to Azure using implicit "
f"authentication. Please check your local Azure CLI "
f"configuration or managed identity setup: {e}"
)
return credential, None
if auth_method == AzureAuthenticationMethods.SERVICE_PRINCIPAL:
assert isinstance(cfg, AzureServicePrincipalConfig)
try:
credential = ClientSecretCredential(
tenant_id=str(cfg.tenant_id),
client_id=str(cfg.client_id),
client_secret=cfg.client_secret.get_secret_value(),
)
except AzureError as e:
raise AuthorizationException(
f"Failed to authenticate to Azure using the provided "
f"service principal credentials. Please check your Azure "
f"configuration: {e}"
)
return credential, None
if auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
assert isinstance(cfg, AzureAccessTokenConfig)
expires_at = self.expires_at
assert expires_at is not None
credential = ZenMLAzureTokenCredential(
token=cfg.token.get_secret_value(),
expires_at=expires_at,
)
return credential, expires_at
raise NotImplementedError(
f"Authentication method '{auth_method}' is not supported by "
"the Azure connector."
)
def _parse_blob_container_resource_id(self, resource_id: str) -> str:
"""Validate and convert an Azure blob resource ID to a container name.
The resource ID could mean different things:
- Azure blob container URI: `{az|abfs}://{container-name}`
- Azure blob container name: `{container-name}`
This method extracts the container name from the provided resource ID.
Args:
resource_id: The resource ID to convert.
Returns:
The container name.
Raises:
ValueError: If the provided resource ID is not a valid Azure blob
resource ID.
"""
container_name: Optional[str] = None
if re.match(
r"^(az|abfs)://[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9](/.*)*$",
resource_id,
):
# The resource ID is an Azure blob container URI
container_name = resource_id.split("/")[2]
elif re.match(
r"^[a-z0-9](?!.*--)[a-z0-9-]{1,61}[a-z0-9]$",
resource_id,
):
# The resource ID is the Azure blob container name
container_name = resource_id
else:
raise ValueError(
f"Invalid resource ID for an Azure blob storage container: "
f"{resource_id}. "
"Supported formats are:\n"
"Azure blob container URI: {az|abfs}://<container-name>\n"
"Azure blob container name: <container-name>"
)
return container_name
def _parse_acr_resource_id(
self,
resource_id: str,
) -> str:
"""Validate and convert an ACR resource ID to an ACR registry name.
The resource ID could mean different things:
- ACR registry URI: `[https://]{registry-name}.azurecr.io`
- ACR registry name: `{registry-name}`
This method extracts the registry name from the provided resource ID.
Args:
resource_id: The resource ID to convert.
Returns:
The ACR registry name.
Raises:
ValueError: If the provided resource ID is not a valid ACR
resource ID.
"""
registry_name: Optional[str] = None
if re.match(
r"^(https?://)?[a-zA-Z0-9]+\.azurecr\.io(/.+)*$",
resource_id,
):
# The resource ID is an ACR registry URI
registry_name = resource_id.split(".")[0].split("/")[-1]
elif re.match(
r"^[a-zA-Z0-9]+$",
resource_id,
):
# The resource ID is an ACR registry name
registry_name = resource_id
else:
raise ValueError(
f"Invalid resource ID for a ACR registry: {resource_id}. "
f"Supported formats are:\n"
"ACR registry URI: [https://]{registry-name}.azurecr.io\n"
"ACR registry name: {registry-name}"
)
return registry_name
def _parse_aks_resource_id(
self, resource_id: str
) -> Tuple[Optional[str], str]:
"""Validate and convert an AKS resource ID to an AKS cluster name.
The resource ID could mean different things:
- resource group scoped AKS cluster name (canonical): `{resource-group}/{cluster-name}`
- AKS cluster name: `{cluster-name}`
This method extracts the resource group name and cluster name from the
provided resource ID.
Args:
resource_id: The resource ID to convert.
Returns:
The Azure resource group and AKS cluster name.
Raises:
ValueError: If the provided resource ID is not a valid AKS cluster
name.
"""
resource_group: Optional[str] = self.config.resource_group
if re.match(
r"^[a-zA-Z0-9_.()-]+/[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
resource_id,
):
# The resource ID is an AKS cluster name including the resource
# group
resource_group, cluster_name = resource_id.split("/")
elif re.match(
r"^[a-zA-Z0-9]+[a-zA-Z0-9_-]*[a-zA-Z0-9]+$",
resource_id,
):
# The resource ID is an AKS cluster name without the resource group
cluster_name = resource_id
else:
raise ValueError(
f"Invalid resource ID for a AKS cluster: {resource_id}. "
f"Supported formats are:\n"
"resource group scoped AKS cluster name (canonical): {resource-group}/{cluster-name}\n"
"AKS cluster name: {cluster-name}"
)
if (
self.config.resource_group
and self.config.resource_group != resource_group
):
raise ValueError(
f"Invalid resource ID for an AKS cluster: {resource_id}. "
f"The resource group '{resource_group}' does not match the "
f"resource group configured in the connector: "
f"'{self.config.resource_group}'."
)
return resource_group, cluster_name
def _canonical_resource_id(
self, resource_type: str, resource_id: str
) -> str:
"""Convert a resource ID to its canonical form.
Args:
resource_type: The resource type to canonicalize.
resource_id: The resource ID to canonicalize.
Returns:
The canonical resource ID.
"""
if resource_type == BLOB_RESOURCE_TYPE:
container_name = self._parse_blob_container_resource_id(
resource_id
)
return f"az://{container_name}"
elif resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
resource_group, cluster_name = self._parse_aks_resource_id(
resource_id
)
if resource_group:
return f"{resource_group}/{cluster_name}"
return cluster_name
elif resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
registry_name = self._parse_acr_resource_id(
resource_id,
)
return f"{registry_name}.azurecr.io"
else:
return resource_id
def _get_default_resource_id(self, resource_type: str) -> str:
"""Get the default resource ID for a resource type.
Args:
resource_type: The type of the resource to get a default resource ID
for. Only called with resource types that do not support
multiple instances.
Returns:
The default resource ID for the resource type.
Raises:
RuntimeError: If the ECR registry ID (Azure account ID)
cannot be retrieved from Azure because the connector is not
authorized.
"""
if resource_type == AZURE_RESOURCE_TYPE:
_, subscription_name = self.subscription
return subscription_name
raise RuntimeError(
f"Default resource ID for '{resource_type}' not available."
)
def _connect_to_resource(
self,
**kwargs: Any,
) -> Any:
"""Authenticate and connect to an Azure resource.
Initialize and return a session or client object depending on the
connector configuration:
- initialize and return an Azure TokenCredential if the resource type
is a generic Azure resource
- initialize and return an Azure BlobServiceClient for an Azure blob
storage container resource type
For the Docker and Kubernetes resource types, the connector does not
support connecting to the resource directly. Instead, the connector
supports generating a connector client object for the resource type
in question.
Args:
kwargs: Additional implementation specific keyword arguments to pass
to the session or client constructor.
Returns:
A TokenCredential for Azure generic resources and a
BlobServiceClient client for an Azure blob storage container.
Raises:
NotImplementedError: If the connector instance does not support
directly connecting to the indicated resource type.
"""
resource_type = self.resource_type
resource_id = self.resource_id
assert resource_type is not None
assert resource_id is not None
# Regardless of the resource type, we must authenticate to Azure first
# before we can connect to any Azure resource
credential, _ = self.get_azure_credential(
self.auth_method,
resource_type=resource_type,
resource_id=resource_id,
)
if resource_type == BLOB_RESOURCE_TYPE:
container_name = self._parse_blob_container_resource_id(
resource_id
)
containers = self._list_blob_containers(
credential,
container_name=container_name,
)
container_name, storage_account = containers.popitem()
account_url = f"https://{storage_account}.blob.core.windows.net/"
blob_client = BlobServiceClient(
account_url=account_url,
credential=credential,
)
return blob_client
if resource_type == AZURE_RESOURCE_TYPE:
return credential
raise NotImplementedError(
f"Connecting to {resource_type} resources is not directly "
"supported by the Azure connector. Please call the "
f"`get_connector_client` method to get a {resource_type} connector "
"instance for the resource."
)
def _configure_local_client(
self,
**kwargs: Any,
) -> None:
"""Configure a local client to authenticate and connect to a resource.
This method uses the connector's configuration to configure a local
client or SDK installed on the localhost for the indicated resource.
Args:
kwargs: Additional implementation specific keyword arguments to use
to configure the client.
Raises:
NotImplementedError: If the connector instance does not support
local configuration for the configured resource type or
authentication method.registry
AuthorizationException: If the connector instance does not support
local configuration for the configured authentication method.
"""
resource_type = self.resource_type
if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
if (
self.auth_method
== AzureAuthenticationMethods.SERVICE_PRINCIPAL
):
# Use the service principal credentials to configure the local
# Azure CLI
assert isinstance(self.config, AzureServicePrincipalConfig)
command = [
"az",
"login",
"--service-principal",
"-u",
str(self.config.client_id),
"-p",
self.config.client_secret.get_secret_value(),
"--tenant",
str(self.config.tenant_id),
]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
raise AuthorizationException(
f"Failed to update the local Azure CLI with the "
f"connector service principal credentials: {e}"
) from e
logger.info(
"Updated the local Azure CLI configuration with the "
"connector's service principal credentials."
)
return
raise NotImplementedError(
f"Local Azure client configuration for resource type "
f"{resource_type} is only supported if the "
f"'{AzureAuthenticationMethods.SERVICE_PRINCIPAL}' "
f"authentication method is used."
)
raise NotImplementedError(
f"Configuring the local client for {resource_type} resources is "
"not directly supported by the Azure connector. Please call the "
f"`get_connector_client` method to get a {resource_type} connector "
"instance for the resource."
)
@classmethod
def _auto_configure(
cls,
auth_method: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
resource_group: Optional[str] = None,
storage_account: Optional[str] = None,
**kwargs: Any,
) -> "AzureServiceConnector":
"""Auto-configure the connector.
Instantiate an Azure connector with a configuration extracted from the
authentication configuration available in the environment (e.g.
environment variables or local Azure client/SDK configuration files).
Args:
auth_method: The particular authentication method to use. If not
specified, the connector implementation must decide which
authentication method to use or raise an exception.
resource_type: The type of resource to configure.
resource_id: The ID of the resource to configure. The implementation
may choose to either require or ignore this parameter if it does
not support or detect an resource type that supports multiple
instances.
resource_group: A resource group may be used to restrict the scope
of Azure resources like AKS clusters and ACR repositories. If
not specified, ZenML will retrieve resources from all resource
groups accessible with the discovered credentials.
storage_account: The name of an Azure storage account may be used to
restrict the scope of Azure Blob storage containers. If not
specified, ZenML will retrieve blob containers from all storage
accounts accessible with the discovered credentials.
kwargs: Additional implementation specific keyword arguments to use.
Returns:
An Azure connector instance configured with authentication
credentials automatically extracted from the environment.
Raises:
NotImplementedError: If the connector implementation does not
support auto-configuration for the specified authentication
method.
AuthorizationException: If no Azure credentials can be loaded from
the environment.
"""
auth_config: AzureBaseConfig
expiration_seconds: Optional[int] = None
expires_at: Optional[datetime.datetime] = None
if auth_method == AzureAuthenticationMethods.IMPLICIT:
auth_config = AzureBaseConfig(
resource_group=resource_group,
storage_account=storage_account,
)
else:
# Initialize an Azure credential with the default configuration
# loaded from the environment.
try:
credential = DefaultAzureCredential()
except AzureError as e:
raise AuthorizationException(
f"Failed to authenticate to Azure using implicit "
f"authentication. Please check your local Azure CLI "
f"configuration: {e}"
)
if (
auth_method
and auth_method != AzureAuthenticationMethods.ACCESS_TOKEN
):
raise NotImplementedError(
f"The specified authentication method '{auth_method}' "
"could not be used to auto-configure the connector. "
)
auth_method = AzureAuthenticationMethods.ACCESS_TOKEN
if resource_type == BLOB_RESOURCE_TYPE:
raise AuthorizationException(
f"Auto-configuration for {resource_type} resources is not "
"supported by the Azure connector."
)
else:
token = credential.get_token(
AZURE_MANAGEMENT_TOKEN_SCOPE,
)
# Convert the expiration timestamp from Unix time to datetime
# format.
expires_at = datetime.datetime.fromtimestamp(token.expires_on)
# Convert the expiration timestamp from local time to UTC time.
expires_at = expires_at.astimezone(datetime.timezone.utc)
auth_config = AzureAccessTokenConfig(
token=token.token,
resource_group=resource_group,
storage_account=storage_account,
)
return cls(
auth_method=auth_method,
resource_type=resource_type,
resource_id=resource_id
if resource_type not in [AZURE_RESOURCE_TYPE, None]
else None,
expiration_seconds=expiration_seconds,
expires_at=expires_at,
config=auth_config,
)
@classmethod
def _get_resource_group(cls, resource_id: str) -> str:
"""Get the resource group of an Azure resource.
Args:
resource_id: The ID of the Azure resource.
Returns:
The resource group of the Azure resource.
"""
# The resource group is the fourth component of the resource ID.
return resource_id.split("/")[4]
def _list_blob_containers(
self, credential: TokenCredential, container_name: Optional[str] = None
) -> Dict[str, str]:
"""Get the list of blob storage containers that the connector can access.
Args:
credential: The Azure credential to use to access the blob storage
containers.
container_name: The name of the blob container to get. If omitted,
all accessible blob containers are returned.
Returns:
The list of blob storage containers that the connector can access
as a dictionary mapping container names to their corresponding
storage account names. If the `container_name` argument was
specified, the dictionary will contain a single entry.
Raises:
AuthorizationException: If the connector does not have access to
access the target blob storage containers.
"""
subscription_id, _ = self.subscription
# Azure blob storage containers are scoped to a storage account. We
# need to figure out which storage accounts the connector can access
# and then list the containers in each of them. If a container name
# is provided, we only need to find the storage account that contains
# it.
storage_accounts: List[str] = []
if self.config.storage_account:
storage_accounts = [self.config.storage_account]
else:
try:
storage_client = StorageManagementClient(
credential, subscription_id
)
accounts = list(storage_client.storage_accounts.list())
except AzureError as e:
raise AuthorizationException(
f"failed to list available Azure storage accounts. Please "
f"check that the credentials have permissions to list "
f"storage accounts or consider configuring a storage "
f"account in the connector: {e}"
) from e
if not accounts:
raise AuthorizationException(
"no storage accounts were found. Please check that the "
"credentials have permissions to access one or more "
"storage accounts."
)
if self.config.resource_group:
# Keep only the storage accounts in the specified resource
# group.
accounts = [
account
for account in accounts
if self._get_resource_group(account.id)
== self.config.resource_group
]
if not accounts:
raise AuthorizationException(
f"no storage accounts were found in the "
f"'{self.config.resource_group}' resource group "
f"specified in the connector configuration. Please "
f"check that resource group contains one or more "
f"storage accounts and that the connector credentials "
f"have permissions to access them."
)
storage_accounts = [
account.name for account in accounts if account.name
]
containers: Dict[str, str] = {}
for storage_account in storage_accounts:
account_url = f"https://{storage_account}.blob.core.windows.net/"
try:
blob_client = BlobServiceClient(
account_url, credential=credential
)
response = blob_client.list_containers()
account_containers = [
container.name for container in response if container.name
]
except AzureError as e:
raise AuthorizationException(
f"failed to fetch Azure blob storage containers in the "
f"'{storage_account}' storage account. Please check that "
f"the credentials have permissions to list containers in "
f"that storage account: {e}"
) from e
if container_name:
if container_name not in account_containers:
continue
containers = {container_name: storage_account}
break
containers.update(
{
container: storage_account
for container in account_containers
}
)
if not containers:
if container_name:
if self.config.storage_account:
raise AuthorizationException(
f"the '{container_name}' container was not found in "
f"the '{self.config.storage_account}' storage "
f"account. Please check that container exists and the "
f"credentials have permissions to access that "
f"container."
)
raise AuthorizationException(
f"the '{container_name}' container was not found in any "
f"of the storage accounts accessible to the connector. "
f"Please check that container exists and the credentials "
f"have permissions to access the storage account it "
f"belongs to."
)
raise AuthorizationException(
"no Azure blob storage containers were found in any of the "
"storage accounts accessible to the connector. Please check "
"that the credentials have permissions to access one or more "
"storage accounts."
)
return containers
def _list_acr_registries(
self, credential: TokenCredential, registry_name: Optional[str] = None
) -> Dict[str, str]:
"""Get the list of ACR registries that the connector can access.
Args:
credential: The Azure credential to use.
registry_name: The name of the registry to get. If omitted,
all accessible registries are returned.
Returns:
The list of ACR registries that the connector can access as a
dictionary mapping registry names to their corresponding
resource group names. If the `registry_name` argument was
specified, the dictionary will contain a single entry.
Raises:
AuthorizationException: If the connector does not have access to
access the target ACR registries.
"""
subscription_id, _ = self.subscription
container_registries: Dict[str, str] = {}
if registry_name and self.config.resource_group:
try:
container_client = ContainerRegistryManagementClient(
credential, subscription_id
)
registry = container_client.registries.get(
resource_group_name=self.config.resource_group,
registry_name=registry_name,
)
if registry.name:
container_registries = {
registry.name: self.config.resource_group
}
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure container registry "
f"'{registry_name}' in the '{self.config.resource_group}' "
f"resource group specified in the connector configuration. "
f"Please check that the registry exists in that resource "
f"group and that the connector credentials have "
f"permissions to access that registry: {e}"
) from e
else:
try:
container_client = ContainerRegistryManagementClient(
credential, subscription_id
)
registries = list(container_client.registries.list())
except AzureError as e:
raise AuthorizationException(
"failed to list available Azure container registries. "
"Please check that the credentials have permissions to "
f"list container registries: {e}"
) from e
if not registries:
raise AuthorizationException(
"no container registries were found. Please check that the "
"credentials have permissions to access one or more "
"container registries."
)
if self.config.resource_group:
# Keep only the registries in the specified resource
# group.
registries = [
registry
for registry in registries
if self._get_resource_group(registry.id)
== self.config.resource_group
]
if not registries:
raise AuthorizationException(
f"no container registries were found in the "
f"'{self.config.resource_group}' resource group "
f"specified in the connector configuration. Please "
f"check that resource group contains one or more "
f"container registries and that the connector "
f"credentials have permissions to access them."
)
container_registries = {
registry.name: self._get_resource_group(registry.id)
for registry in registries
if registry.name
}
if registry_name:
if registry_name not in container_registries:
if self.config.resource_group:
raise AuthorizationException(
f"the '{registry_name}' registry was not found in "
f"the '{self.config.resource_group}' resource "
f"group specified in the connector configuration. "
f"Please check that registry exists in that "
f"resource group and that the connector "
f"credentials have permissions to access that "
f"registry."
)
raise AuthorizationException(
f"the '{registry_name}' registry was not found or is "
"not accessible. Please check that registry exists and "
"the credentials have permissions to access it."
)
container_registries = {
registry_name: container_registries[registry_name]
}
return container_registries
def _list_aks_clusters(
self,
credential: TokenCredential,
cluster_name: Optional[str] = None,
resource_group: Optional[str] = None,
) -> List[Tuple[str, str]]:
"""Get the list of AKS clusters that the connector can access.
Args:
credential: The Azure credential to use.
cluster_name: The name of the cluster to get. If omitted,
all accessible clusters are returned.
resource_group: The name of the resource group to which the
search should be limited. If omitted, all accessible
clusters are returned.
Returns:
The list of AKS clusters that the connector can access as a
list of cluster names and their corresponding resource group names.
If the `cluster_name` argument was specified, the dictionary will
contain a single entry.
Raises:
AuthorizationException: If the connector does not have access to
access the target AKS clusters.
"""
subscription_id, _ = self.subscription
clusters: List[Tuple[str, str]] = []
if cluster_name and resource_group:
try:
container_client = ContainerServiceClient(
credential, subscription_id
)
cluster = container_client.managed_clusters.get(
resource_group_name=resource_group,
resource_name=cluster_name,
)
if cluster.name:
clusters = [(cluster.name, resource_group)]
except AzureError as e:
raise AuthorizationException(
f"failed to fetch the Azure AKS cluster '{cluster_name}' "
f"in the '{resource_group}' resource group. Please check "
f"that the cluster exists in that resource group and that "
f"the connector credentials have permissions to access "
f"that cluster: {e}"
) from e
else:
try:
container_client = ContainerServiceClient(
credential, subscription_id
)
aks_clusters = list(container_client.managed_clusters.list())
except AzureError as e:
raise AuthorizationException(
f"failed to list available Azure AKS clusters. Please "
f"check that the credentials have permissions to list "
f"AKS clusters: {e}"
) from e
if not aks_clusters:
raise AuthorizationException(
"no AKS clusters were found. Please check that the "
"credentials have permissions to access one or more "
"AKS clusters."
)
if self.config.resource_group:
# Keep only the clusters in the specified resource
# group.
aks_clusters = [
cluster
for cluster in aks_clusters
if self._get_resource_group(cluster.id)
== self.config.resource_group
]
if not aks_clusters:
raise AuthorizationException(
f"no AKS clusters were found in the "
f"'{self.config.resource_group}' resource group "
f"specified in the connector configuration. Please "
f"check that resource group contains one or more "
f"AKS clusters and that the connector credentials "
f"have permissions to access them."
)
clusters = [
(cluster.name, self._get_resource_group(cluster.id))
for cluster in aks_clusters
if cluster.name
and (not cluster_name or cluster.name == cluster_name)
]
if cluster_name:
if not clusters:
if self.config.resource_group:
raise AuthorizationException(
f"the '{cluster_name}' AKS cluster was not found "
f"in the '{self.config.resource_group}' resource "
f"group specified in the connector configuration. "
f"Please check that cluster exists in that "
f"resource group and that the connector "
f"credentials have permissions to access that "
f"cluster."
)
raise AuthorizationException(
f"the '{cluster_name}' AKS cluster was not found or "
"is not accessible. Please check that cluster exists "
"and the credentials have permissions to access it."
)
if len(clusters) > 1:
resource_groups = [cluster[1] for cluster in clusters]
raise AuthorizationException(
f"the '{cluster_name}' AKS cluster was found in "
f"multiple resource groups: "
f"{', '.join(resource_groups)}. "
f"Please specify a resource group in the connector "
f"configuration or include the resource group in the "
"resource name to disambiguate."
)
return clusters
def _verify(
self,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> List[str]:
"""Verify and list all the resources that the connector can access.
Args:
resource_type: The type of the resource to verify. If omitted and
if the connector supports multiple resource types, the
implementation must verify that it can authenticate and connect
to any and all of the supported resource types.
resource_id: The ID of the resource to connect to. Omitted if a
resource type is not specified. It has the same value as the
default resource ID if the supplied resource type doesn't
support multiple instances. If the supplied resource type does
allows multiple instances, this parameter may still be omitted
to fetch a list of resource IDs identifying all the resources
of the indicated type that the connector can access.
Returns:
The list of resources IDs in canonical format identifying the
resources that the connector can access. This list is empty only
if the resource type is not specified (i.e. for multi-type
connectors).
Raises:
AuthorizationException: If the connector cannot authenticate or
access the specified resource.
"""
# If the resource type is not specified, treat this the
# same as a generic Azure connector.
if not resource_type:
# fetch the Azure subscription and tenant IDs to verify the
# credentials globally
subscription_id, subscription_name = self.subscription
return []
if resource_type == AZURE_RESOURCE_TYPE:
assert resource_id is not None
return [resource_id]
credential, _ = self.get_azure_credential(
self.auth_method,
resource_type=resource_type or AZURE_RESOURCE_TYPE,
resource_id=resource_id,
)
if resource_type == BLOB_RESOURCE_TYPE:
if self.auth_method == AzureAuthenticationMethods.ACCESS_TOKEN:
raise AuthorizationException(
f"the '{self.auth_method}' authentication method is not "
"supported for blob storage resources"
)
container_name: Optional[str] = None
if resource_id:
container_name = self._parse_blob_container_resource_id(
resource_id
)
containers = self._list_blob_containers(
credential,
container_name=container_name,
)
return [f"az://{container}" for container in containers.keys()]
if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
registry_name: Optional[str] = None
if resource_id:
registry_name = self._parse_acr_resource_id(resource_id)
registries = self._list_acr_registries(
credential,
registry_name=registry_name,
)
return [f"{registry}.azurecr.io" for registry in registries.keys()]
if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
cluster_name: Optional[str] = None
resource_group = self.config.resource_group
if resource_id:
resource_group, cluster_name = self._parse_aks_resource_id(
resource_id
)
clusters = self._list_aks_clusters(
credential,
cluster_name=cluster_name,
resource_group=resource_group,
)
return [
f"{resource_group}/{cluster_name}"
for cluster_name, resource_group in clusters
]
return []
def _get_connector_client(
self,
resource_type: str,
resource_id: str,
) -> "ServiceConnector":
"""Get a connector instance that can be used to connect to a resource.
This method generates a client-side connector instance that can be used
to connect to a resource of the given type. The client-side connector
is configured with temporary Azure credentials extracted from the
current connector and, depending on resource type, it may also be
of a different connector type:
- a Kubernetes connector for Kubernetes clusters
- a Docker connector for Docker registries
Args:
resource_type: The type of the resources to connect to.
resource_id: The ID of a particular resource to connect to.
Returns:
An Azure, Kubernetes or Docker connector instance that can be used to
connect to the specified resource.
Raises:
AuthorizationException: If authentication failed.
ValueError: If the resource type is not supported.
RuntimeError: If the Kubernetes connector is not installed and the
resource type is Kubernetes.
"""
connector_name = ""
if self.name:
connector_name = self.name
if resource_id:
connector_name += f" ({resource_type} | {resource_id} client)"
else:
connector_name += f" ({resource_type} client)"
logger.debug(f"Getting connector client for {connector_name}")
if resource_type in [AZURE_RESOURCE_TYPE, BLOB_RESOURCE_TYPE]:
auth_method = self.auth_method
if (
self.resource_type == resource_type
and self.resource_id == resource_id
):
# If the requested type and resource ID are the same as
# those configured, we can return the current connector
# instance because it's fully formed and ready to use
# to connect to the specified resource
return self
config = self.config
expires_at = self.expires_at
# Create a client-side Azure connector instance that is fully formed
# and ready to use to connect to the specified resource (i.e. has
# all the necessary configuration and credentials, a resource type
# and a resource ID where applicable)
return AzureServiceConnector(
id=self.id,
name=connector_name,
auth_method=auth_method,
resource_type=resource_type,
resource_id=resource_id,
config=config,
expires_at=expires_at,
)
subscription_id, _ = self.subscription
credential, expires_at = self.get_azure_credential(
self.auth_method,
resource_type=resource_type,
resource_id=resource_id,
)
resource_group: Optional[str] = None
if resource_type == DOCKER_REGISTRY_RESOURCE_TYPE:
registry_name = self._parse_acr_resource_id(resource_id)
# If a service principal is used for authentication, the client ID
# and client secret can be used to authenticate to the registry, if
# configured.
# https://learn.microsoft.com/en-us/azure/container-registry/container-registry-auth-service-principal#authenticate-with-the-service-principal
if (
self.auth_method
== AzureAuthenticationMethods.SERVICE_PRINCIPAL
):
assert isinstance(self.config, AzureServicePrincipalConfig)
username = str(self.config.client_id)
password = self.config.client_secret
# Without a service principal, this only works if the admin account
# is enabled for the registry, but this is not recommended and
# disabled by default.
# https://docs.microsoft.com/en-us/azure/container-registry/container-registry-authentication#admin-account
else:
registries = self._list_acr_registries(
credential,
registry_name=registry_name,
)
registry_name, resource_group = registries.popitem()
try:
client = ContainerRegistryManagementClient(
credential, subscription_id
)
registry_credentials = client.registries.list_credentials(
resource_group, registry_name
)
username = registry_credentials.username
password = registry_credentials.passwords[0].value
except AzureError as e:
raise AuthorizationException(
f"failed to list admin credentials for Azure Container "
f"Registry '{registry_name}' in resource group "
f"'{resource_group}'. Make sure the registry is "
f"configured with an admin account: {e}"
) from e
# Create a client-side Docker connector instance with the temporary
# Docker credentials
return DockerServiceConnector(
id=self.id,
name=connector_name,
auth_method=DockerAuthenticationMethods.PASSWORD,
resource_type=resource_type,
config=DockerConfiguration(
username=username,
password=password,
registry=f"{registry_name}.azurecr.io",
),
expires_at=expires_at,
)
if resource_type == KUBERNETES_CLUSTER_RESOURCE_TYPE:
resource_group, cluster_name = self._parse_aks_resource_id(
resource_id
)
clusters = self._list_aks_clusters(
credential,
cluster_name=cluster_name,
resource_group=resource_group,
)
cluster_name, resource_group = clusters[0]
try:
client = ContainerServiceClient(credential, subscription_id)
creds = client.managed_clusters.list_cluster_admin_credentials(
resource_group_name=resource_group,
resource_name=cluster_name,
)
kubeconfig_yaml = creds.kubeconfigs[0].value.decode(
encoding="UTF-8"
)
except AzureError as e:
raise AuthorizationException(
f"failed to list credentials for Azure Kubernetes "
f"Service cluster '{cluster_name}' in resource group "
f"'{resource_group}': {e}"
) from e
kubeconfig = yaml.safe_load(kubeconfig_yaml)
# Create a client-side Kubernetes connector instance with the
# Kubernetes credentials
try:
# Import libraries only when needed
from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (
KubernetesAuthenticationMethods,
KubernetesServiceConnector,
KubernetesTokenConfig,
)
except ImportError as e:
raise RuntimeError(
f"The Kubernetes Service Connector functionality could not "
f"be used due to missing dependencies: {e}"
)
cluster_name = kubeconfig["clusters"][0]["name"]
cluster = kubeconfig["clusters"][0]["cluster"]
user = kubeconfig["users"][0]["user"]
return KubernetesServiceConnector(
id=self.id,
name=connector_name,
auth_method=KubernetesAuthenticationMethods.TOKEN,
resource_type=resource_type,
config=KubernetesTokenConfig(
cluster_name=cluster_name,
certificate_authority=cluster.get(
"certificate-authority-data"
),
server=cluster["server"],
token=user["token"],
client_certificate=user.get("client-certificate-data"),
client_key=user.get("client-key-data"),
),
expires_at=expires_at,
)
raise ValueError(f"Unsupported resource type: {resource_type}")
subscription: Tuple[str, str]
property
readonly
Get the Azure subscription ID and name.
Returns:
Type | Description |
---|---|
Tuple[str, str] |
The Azure subscription ID and name. |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If the Azure subscription could not be determined or doesn't match the configured subscription ID. |
tenant_id: str
property
readonly
Get the Azure tenant ID.
Returns:
Type | Description |
---|---|
str |
The Azure tenant ID. |
Exceptions:
Type | Description |
---|---|
AuthorizationException |
If the Azure tenant ID could not be determined or doesn't match the configured tenant ID. |
get_azure_credential(self, auth_method, resource_type=None, resource_id=None)
Get an Azure credential for the specified resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
auth_method |
str |
The authentication method to use. |
required |
resource_type |
Optional[str] |
The resource type to get a credential for. |
None |
resource_id |
Optional[str] |
The resource ID to get a credential for. |
None |
Returns:
Type | Description |
---|---|
Tuple[azure.core.credentials.TokenCredential, Optional[datetime.datetime]] |
An Azure credential for the specified resource and its expiration timestamp, if applicable. |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_azure_credential(
self,
auth_method: str,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
) -> Tuple[TokenCredential, Optional[datetime.datetime]]:
"""Get an Azure credential for the specified resource.
Args:
auth_method: The authentication method to use.
resource_type: The resource type to get a credential for.
resource_id: The resource ID to get a credential for.
Returns:
An Azure credential for the specified resource and its expiration
timestamp, if applicable.
"""
# We maintain a cache of all sessions to avoid re-authenticating
# multiple times for the same resource
key = auth_method
if key in self._session_cache:
session, expires_at = self._session_cache[key]
if expires_at is None:
return session, None
# Refresh expired sessions
now = datetime.datetime.now(datetime.timezone.utc)
expires_at = expires_at.replace(tzinfo=datetime.timezone.utc)
# check if the token expires in the near future
if expires_at > now + datetime.timedelta(
minutes=AZURE_SESSION_EXPIRATION_BUFFER
):
return session, expires_at
logger.debug(
f"Creating Azure credential for auth method '{auth_method}', "
f"resource type '{resource_type}' and resource ID "
f"'{resource_id}'..."
)
session, expires_at = self._authenticate(
auth_method, resource_type, resource_id
)
self._session_cache[key] = (session, expires_at)
return session, expires_at
model_post_init(/, self, context)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
self |
BaseModel |
The BaseModel instance. |
required |
context |
Any |
The context. |
required |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def init_private_attributes(self: BaseModel, context: Any, /) -> None:
"""This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args:
self: The BaseModel instance.
context: The context.
"""
if getattr(self, '__pydantic_private__', None) is None:
pydantic_private = {}
for name, private_attr in self.__private_attributes__.items():
default = private_attr.get_default()
if default is not PydanticUndefined:
pydantic_private[name] = default
object_setattr(self, '__pydantic_private__', pydantic_private)
AzureServicePrincipalConfig (AzureClientConfig, AzureClientSecret)
Azure service principal configuration.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class AzureServicePrincipalConfig(AzureClientConfig, AzureClientSecret):
"""Azure service principal configuration."""
ZenMLAzureTokenCredential (TokenCredential)
ZenML Azure token credential.
This class is used to provide a pre-configured token credential to Azure clients. Tokens are generated from other Azure credentials and are served to Azure clients to authenticate requests.
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
class ZenMLAzureTokenCredential(TokenCredential):
"""ZenML Azure token credential.
This class is used to provide a pre-configured token credential to Azure
clients. Tokens are generated from other Azure credentials and are served
to Azure clients to authenticate requests.
"""
def __init__(self, token: str, expires_at: datetime.datetime):
"""Initialize ZenML Azure token credential.
Args:
token: The token to use for authentication
expires_at: The expiration time of the token
"""
self.token = token
# Convert the expiration time from UTC to local time
expires_at.replace(tzinfo=datetime.timezone.utc)
expires_at = expires_at.astimezone(
datetime.datetime.now().astimezone().tzinfo
)
self.expires_on = int(expires_at.timestamp())
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
"""Get token.
Args:
*scopes: Scopes
**kwargs: Keyword arguments
Returns:
Token
"""
return AccessToken(token=self.token, expires_on=self.expires_on)
__init__(self, token, expires_at)
special
Initialize ZenML Azure token credential.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
token |
str |
The token to use for authentication |
required |
expires_at |
datetime |
The expiration time of the token |
required |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def __init__(self, token: str, expires_at: datetime.datetime):
"""Initialize ZenML Azure token credential.
Args:
token: The token to use for authentication
expires_at: The expiration time of the token
"""
self.token = token
# Convert the expiration time from UTC to local time
expires_at.replace(tzinfo=datetime.timezone.utc)
expires_at = expires_at.astimezone(
datetime.datetime.now().astimezone().tzinfo
)
self.expires_on = int(expires_at.timestamp())
get_token(self, *scopes, **kwargs)
Get token.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*scopes |
str |
Scopes |
() |
**kwargs |
Any |
Keyword arguments |
{} |
Returns:
Type | Description |
---|---|
Any |
Token |
Source code in zenml/integrations/azure/service_connectors/azure_service_connector.py
def get_token(self, *scopes: str, **kwargs: Any) -> Any:
"""Get token.
Args:
*scopes: Scopes
**kwargs: Keyword arguments
Returns:
Token
"""
return AccessToken(token=self.token, expires_on=self.expires_on)
step_operators
special
Initialization of AzureML Step Operator integration.
azureml_step_operator
Implementation of the ZenML AzureML Step Operator.
AzureMLStepOperator (BaseStepOperator)
Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
"""Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the
ZenML entrypoint command in it.
"""
@property
def config(self) -> AzureMLStepOperatorConfig:
"""Returns the `AzureMLStepOperatorConfig` config.
Returns:
The configuration.
"""
return cast(AzureMLStepOperatorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the AzureML step operator.
Returns:
The settings class.
"""
return AzureMLStepOperatorSettings
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
Returns:
A validator that checks that the stack contains a remote container
registry and a remote artifact store.
"""
def _validate_remote_components(
stack: "Stack",
) -> Tuple[bool, str]:
if stack.artifact_store.config.is_local:
return False, (
"The AzureML step operator runs code remotely and "
"needs to write files into the artifact store, but the "
f"artifact store `{stack.artifact_store.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote artifact store when using the AzureML "
"step operator."
)
container_registry = stack.container_registry
assert container_registry is not None
if container_registry.config.is_local:
return False, (
"The AzureML step operator runs code remotely and "
"needs to push/pull Docker images, but the "
f"container registry `{container_registry.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote container registry when using the "
"AzureML step operator."
)
return True, ""
return StackValidator(
required_components={
StackComponentType.CONTAINER_REGISTRY,
StackComponentType.IMAGE_BUILDER,
},
custom_validation_function=_validate_remote_components,
)
def _get_credentials(self) -> TokenCredential:
"""Returns the authentication object for the AzureML environment.
Returns:
The authentication object for the AzureML environment.
"""
# Authentication
if connector := self.get_connector():
credentials = connector.connect()
assert isinstance(credentials, TokenCredential)
return credentials
elif (
self.config.tenant_id
and self.config.service_principal_id
and self.config.service_principal_password
):
return ClientSecretCredential(
tenant_id=self.config.tenant_id,
client_id=self.config.service_principal_id,
client_secret=self.config.service_principal_password,
)
else:
return DefaultAzureCredential()
def get_docker_builds(
self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.step_operator == self.name:
build = BuildConfiguration(
key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Launches a step on AzureML.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
image_name = info.get_image(key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY)
# Client creation
ml_client = MLClient(
credential=self._get_credentials(),
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace_name,
)
env = Environment(name=f"zenml-{info.run_name}", image=image_name)
compute_target = create_or_get_compute(
ml_client, settings, default_compute_name=f"zenml_{self.id}"
)
command_job = command(
name=info.run_name,
command=" ".join(entrypoint_command),
environment=env,
environment_variables=environment,
compute=compute_target,
experiment_name=info.pipeline.name,
)
job = ml_client.jobs.create_or_update(command_job)
logger.info(f"AzureML job created with id: {job.id}")
ml_client.jobs.stream(info.run_name)
config: AzureMLStepOperatorConfig
property
readonly
Returns the AzureMLStepOperatorConfig
config.
Returns:
Type | Description |
---|---|
AzureMLStepOperatorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the AzureML step operator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a remote container registry and a remote artifact store. |
get_docker_builds(self, deployment)
Gets the Docker builds required for the component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeploymentBase |
The pipeline deployment for which to get the builds. |
required |
Returns:
Type | Description |
---|---|
List[BuildConfiguration] |
The required Docker builds. |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def get_docker_builds(
self, deployment: "PipelineDeploymentBase"
) -> List["BuildConfiguration"]:
"""Gets the Docker builds required for the component.
Args:
deployment: The pipeline deployment for which to get the builds.
Returns:
The required Docker builds.
"""
builds = []
for step_name, step in deployment.step_configurations.items():
if step.config.step_operator == self.name:
build = BuildConfiguration(
key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY,
settings=step.config.docker_settings,
step_name=step_name,
)
builds.append(build)
return builds
launch(self, info, entrypoint_command, environment)
Launches a step on AzureML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
environment |
Dict[str, str] |
Environment variables to set in the step operator environment. |
required |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
environment: Dict[str, str],
) -> None:
"""Launches a step on AzureML.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
environment: Environment variables to set in the step operator
environment.
"""
settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
image_name = info.get_image(key=AZUREML_STEP_OPERATOR_DOCKER_IMAGE_KEY)
# Client creation
ml_client = MLClient(
credential=self._get_credentials(),
subscription_id=self.config.subscription_id,
resource_group_name=self.config.resource_group,
workspace_name=self.config.workspace_name,
)
env = Environment(name=f"zenml-{info.run_name}", image=image_name)
compute_target = create_or_get_compute(
ml_client, settings, default_compute_name=f"zenml_{self.id}"
)
command_job = command(
name=info.run_name,
command=" ".join(entrypoint_command),
environment=env,
environment_variables=environment,
compute=compute_target,
experiment_name=info.pipeline.name,
)
job = ml_client.jobs.create_or_update(command_job)
logger.info(f"AzureML job created with id: {job.id}")
ml_client.jobs.stream(info.run_name)